Difference between revisions of "Spark"
(4 intermediate revisions by the same user not shown) | |||
Line 44: | Line 44: | ||
[[Image:SparkUI.png|300px|right|thumb|Spark UI]] |
[[Image:SparkUI.png|300px|right|thumb|Spark UI]] |
||
+ | <source lang="scala"> |
||
− | <pre> |
||
sc.parallelize(1 to 10000000).count() |
sc.parallelize(1 to 10000000).count() |
||
− | </ |
+ | </source> |
+ | <source lang="scala"> |
||
− | <pre> |
||
val NUM_SAMPLES = 10000 |
val NUM_SAMPLES = 10000 |
||
val count = sc.parallelize(1 to NUM_SAMPLES).map{i => |
val count = sc.parallelize(1 to NUM_SAMPLES).map{i => |
||
Line 56: | Line 56: | ||
}.reduce(_ + _) |
}.reduce(_ + _) |
||
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES) |
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES) |
||
− | </ |
+ | </source> |
=Programmation interactive en [[Python]]= |
=Programmation interactive en [[Python]]= |
||
Line 114: | Line 114: | ||
Lancer le shell de Spark |
Lancer le shell de Spark |
||
<pre> |
<pre> |
||
− | $SPARK_HOME/bin/spark-shell --jars |
+ | $SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar |
</pre> |
</pre> |
||
Entrer dans la console le script suivant: |
Entrer dans la console le script suivant: |
||
+ | <source lang="scala"> |
||
− | <pre> |
||
import org.eclipse.paho.client.mqttv3._ |
import org.eclipse.paho.client.mqttv3._ |
||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence |
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence |
||
Line 136: | Line 136: | ||
ssc.start() |
ssc.start() |
||
ssc.awaitTermination() |
ssc.awaitTermination() |
||
− | </ |
+ | </source> |
Sauvergarder le script ci-dessus dans mqttcount.scala et lancer la commande suivante |
Sauvergarder le script ci-dessus dans mqttcount.scala et lancer la commande suivante |
||
Line 143: | Line 143: | ||
</pre> |
</pre> |
||
− | + | Pour continuer: un exemple orienté capteurs avec [[Apache Kafka]], [[Akka]], [[Apache Cassandra]], [[Spark]] : https://github.com/killrweather/killrweather |
|
=Déploiement d'un cluster Spark sur Amazon EC2= |
=Déploiement d'un cluster Spark sur Amazon EC2= |
||
Line 361: | Line 361: | ||
IAM profile name to launch instances under |
IAM profile name to launch instances under |
||
</pre> |
</pre> |
||
+ | |||
+ | =Programmation= |
||
+ | * [[Spark/DataFrame|DataFrame]] |
||
=Packages= |
=Packages= |
Latest revision as of 21:26, 25 May 2016
Apache Spark™ is a fast and general engine for large-scale data processing. Write applications quickly in Java, Scala, Python, R. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python and R shells. Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
Liens
Livres
- Advanced Analytics with Spark : Patterns for Learning from Data at Scale, http://shop.oreilly.com/product/0636920035091.do
code https://github.com/sryza/aas
- Learning Spark : Lightning-Fast Big Data Analysis, http://shop.oreilly.com/product/0636920028512.do
code https://github.com/databricks/learning-spark
- Data Algorithms : Recipes for Scaling Up with Hadoop and Spark, http://shop.oreilly.com/product/0636920033950.do
code https://github.com/mahmoudparsian/data-algorithms-book
Cours
- Cours RCP216 du CNAM http://cedric.cnam.fr/vertigo/Cours/RCP216/coursIntroduction.html
Installation
depuis un Mac
wget http://wwwftp.ciril.fr/pub/apache/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz tar xvf spark-1.6.0-bin-hadoop2.6.tgz cd spark-1.6.0-bin-hadoop2.6 export SPARK_HOME=$(pwd) more README.md
Remarque: $SPARK_HOME/bin/spark-shell et $SPARK_HOME/bin/pyspark démarrent une console web Spark UI http://localhost:4040/jobs/
Avec Docker
Suivre https://hub.docker.com/r/sequenceiq/spark/
Programmation interactive en Scala
$SPARK_HOME/bin/spark-shell
Browse the Spark UI http://localhost:4040/jobs/
sc.parallelize(1 to 10000000).count()
val NUM_SAMPLES = 10000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
Programmation interactive en Python
$SPARK_HOME/bin/pyspark >>> sc.parallelize(range(1000)).count() >>> exit()
Lancement d'exemples
$SPARK_HOME/bin/run-example SparkPi $SPARK_HOME/bin/run-example mllib.LinearRegression --numIterations 1000 data/mllib/sample_linear_regression_data.txt
MASTER=spark://localhost:7077 $SPARK_HOME/bin/run-example SparkPi
$SPARK_HOME/dev/run-tests
Spark Streaming
Changer le niveau de logging (WARN) dans $SPARK_HOME/conf/log4j.properties
A partir des exemples
Dans un terminal 1, lancer l'exemple
$SPARK_HOME/bin/run-example streaming.MQTTWordCount tcp://test.mosquitto.org:1883 "test/spark/wordcount/#"
Dans un terminal 2, lancer les commandes Bash
while true do while read line do mosquitto_pub -h test.mosquitto.org -t test/spark/wordcount/readme -m "$line" echo "$line" done <$SPARK_HOME/README.md done
Remarque: il faut préalablement installer mosquitto_pub via brew install mosquitto (pour MacOS X) ou sudo apt-get install mosquitto-clients (sur Debian)
En mode interactif
Télécharger les dépendances
wget https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar wget http://central.maven.org/maven2/org/apache/spark/spark-streaming-mqtt_2.11/1.6.0/spark-streaming-mqtt_2.11-1.6.0.jar
Lancer le shell de Spark
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar
Entrer dans la console le script suivant:
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf
val brokerUrl = "tcp://test.mosquitto.org:1883"
val topic = "test/spark/wordcount/#"
val ssc = new StreamingContext(sc, Seconds(10))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Sauvergarder le script ci-dessus dans mqttcount.scala et lancer la commande suivante
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript -i mqttcount.scala
Pour continuer: un exemple orienté capteurs avec Apache Kafka, Akka, Apache Cassandra, Spark : https://github.com/killrweather/killrweather
Déploiement d'un cluster Spark sur Amazon EC2
Voir http://spark.apache.org/docs/latest/ec2-scripts.html
$SPARK_HOME/ec2/spark-ec2 --help
Récupérer et positionner les crédentials AWS
export AWS_ACCESS_KEY_ID=XXXX export AWS_SECRET_ACCESS_KEY=XXXX export IDENTITY_FILE=/.ssh/awskey.pem export KEY_PAIR=awskey export REGION=eu-west-1 export ZONE=eu-west-1a export SLAVES=4 export INSTANCE_TYPE=t2.micro
Création du cluster et démarrage
# launch the cluster $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ --slaves=$SLAVES --instance-type=$INSTANCE_TYPE \ launch my-spark-cluster # get the master $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ get-master my-spark-cluster export MASTER=$($SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE --region=$REGION --zone=$ZONE get-master my-spark-cluster | tail -n 1) # browse the Spark UI open http://$MASTER:8080 python -mwebbrowser http://$MASTER:8080 # browse the Ganglia UI open http://$MASTER:5080/ganglia python -mwebbrowser http://$MASTER:5080/ganglia
Lancement d'une application sur le cluster
Depuis le master
Se logger sur le master
# login on the master $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ login my-spark-cluster
Lancer le script (préalablement copié avec ses dépendances)
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript --files mqttcount.txt
Depuis un hôte distant
A COMPLETER
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript --master spark://$MASTER:7077
Arrêt du cluster
# stop the cluster $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ stop my-spark-cluster
Redémarrage du cluster
# (re)start the cluster $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ start my-spark-cluster
Redémarrage des workers
# reboot the slaves $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ reboot-slaves my-spark-cluster
Terminaison et destruction du cluster
# destroy (terminate) the cluster $SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \ --region=$REGION --zone=$ZONE \ --delete-groups \ destroy my-spark-cluster
Options de la commande
-s SLAVES, --slaves=SLAVES Number of slaves to launch (default: 1) -w WAIT, --wait=WAIT DEPRECATED (no longer necessary) - Seconds to wait for nodes to start -k KEY_PAIR, --key-pair=KEY_PAIR Key pair to use on instances -i IDENTITY_FILE, --identity-file=IDENTITY_FILE SSH private key file to use for logging into instances -p PROFILE, --profile=PROFILE If you have multiple profiles (AWS or boto config), you can configure additional, named profiles by using this option (default: none) -t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE Type of instance to launch (default: m1.large). WARNING: must be 64-bit; small instances won't work -m MASTER_INSTANCE_TYPE, --master-instance-type=MASTER_INSTANCE_TYPE Master instance type (leave empty for same as instance-type) -r REGION, --region=REGION EC2 region used to launch instances in, or to find them in (default: us-east-1) -z ZONE, --zone=ZONE Availability zone to launch instances in, or 'all' to spread slaves across multiple (an additional $0.01/Gb for bandwidthbetween zones applies) (default: a single zone chosen at random) -a AMI, --ami=AMI Amazon Machine Image ID to use -v SPARK_VERSION, --spark-version=SPARK_VERSION Version of Spark to use: 'X.Y.Z' or a specific git hash (default: 1.6.0) --spark-git-repo=SPARK_GIT_REPO Github repo from which to checkout supplied commit hash (default: https://github.com/apache/spark) --spark-ec2-git-repo=SPARK_EC2_GIT_REPO Github repo from which to checkout spark-ec2 (default: https://github.com/amplab/spark-ec2) --spark-ec2-git-branch=SPARK_EC2_GIT_BRANCH Github repo branch of spark-ec2 to use (default: branch-1.5) --deploy-root-dir=DEPLOY_ROOT_DIR A directory to copy into / on the first master. Must be absolute. Note that a trailing slash is handled as per rsync: If you omit it, the last directory of the --deploy-root-dir path will be created in / before copying its contents. If you append the trailing slash, the directory is not created and its contents are copied directly into /. (default: none). --hadoop-major-version=HADOOP_MAJOR_VERSION Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default: 1) -D [ADDRESS:]PORT Use SSH dynamic port forwarding to create a SOCKS proxy at the given local address (for use with login) --resume Resume installation on a previously launched cluster (for debugging) --ebs-vol-size=SIZE Size (in GB) of each EBS volume. --ebs-vol-type=EBS_VOL_TYPE EBS volume type (e.g. 'gp2', 'standard'). --ebs-vol-num=EBS_VOL_NUM Number of EBS volumes to attach to each node as /vol[x]. The volumes will be deleted when the instances terminate. Only possible on EBS-backed AMIs. EBS volumes are only attached if --ebs-vol-size > 0. Only support up to 8 EBS volumes. --placement-group=PLACEMENT_GROUP Which placement group to try and launch instances into. Assumes placement group is already created. --swap=SWAP Swap space to set up per node, in MB (default: 1024) --spot-price=PRICE If specified, launch slaves as spot instances with the given maximum price (in dollars) --ganglia Setup Ganglia monitoring on cluster (default: True). NOTE: the Ganglia page will be publicly accessible --no-ganglia Disable Ganglia monitoring for the cluster -u USER, --user=USER The SSH user you want to connect as (default: root) --delete-groups When destroying a cluster, delete the security groups that were created --use-existing-master Launch fresh slaves, but use an existing stopped master if possible --worker-instances=WORKER_INSTANCES Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN is used as Hadoop major version (default: 1) --master-opts=MASTER_OPTS Extra options to give to master through SPARK_MASTER_OPTS variable (e.g -Dspark.worker.timeout=180) --user-data=USER_DATA Path to a user-data file (most AMIs interpret this as an initialization script) --authorized-address=AUTHORIZED_ADDRESS Address to authorize on created security groups (default: 0.0.0.0/0) --additional-security-group=ADDITIONAL_SECURITY_GROUP Additional security group to place the machines in --additional-tags=ADDITIONAL_TAGS Additional tags to set on the machines; tags are comma-separated, while name and value are colon separated; ex: "Task:MySparkProject,Env:production" --copy-aws-credentials Add AWS credentials to hadoop configuration to allow Spark to access S3 --subnet-id=SUBNET_ID VPC subnet to launch instances in --vpc-id=VPC_ID VPC to launch instances in --private-ips Use private IPs for instances rather than public if VPC/subnet requires that. --instance-initiated-shutdown-behavior=INSTANCE_INITIATED_SHUTDOWN_BEHAVIOR Whether instances should terminate when shut down or just stop --instance-profile-name=INSTANCE_PROFILE_NAME IAM profile name to launch instances under
Programmation
Packages
spark-avro
Integration utilities for using Spark with Apache Avro data
kafka-spark-consumer Receiver Based Low Level Kafka-Spark Consumer with builtin Back-Pressure Controller
spark-perf Performance tests for Spark
deep-spark Connecting Apache Spark with different data stores
spark-mongodb MongoDB data source for Spark SQL
spark-es ElasticSearch integration for Apache Spark
elasticsearch-hadoop Official integration between Apache Spark and Elasticsearch real-time search and analytics
magellan Geo Spatial Data Analytics on Spark
SparkTwitterAnalysis An Apache Spark standalone application using the Spark API in Scala. The application uses Simple Build(SBT) for building the project.
spark-druid-olap Spark Druid Package
SpatialSpark Big Spatial Data Processing using Spark
killrweather KillrWeather is a reference application (in progress) showing how to easily leverage and integrate Apache Spark, Apache Cassandra, and Apache Kafka for fast, streaming computations on time series data in asynchronous Akka event-driven environments.
spark-kafka Low level integration of Spark and Kafka
docker-spark Docker container for spark standalone cluster.
spark-streamingsql Manipulate Apache Spark Streaming by SQL
twitter-stream-ml Machine Learning over Twitter's stream. Using Apache Spark, Web Server and Lightning Graph server.