Spark: Difference between revisions
(→Livres) |
|||
| Line 19: | Line 19: | ||
* Cours RCP216 du CNAM http://cedric.cnam.fr/vertigo/Cours/RCP216/coursIntroduction.html |
* Cours RCP216 du CNAM http://cedric.cnam.fr/vertigo/Cours/RCP216/coursIntroduction.html |
||
=Installation |
=Installation= |
||
==depuis un Mac== |
|||
<pre> |
<pre> |
||
wget http://wwwftp.ciril.fr/pub/apache/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz |
wget http://wwwftp.ciril.fr/pub/apache/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz |
||
| Line 30: | Line 31: | ||
Remarque: $SPARK_HOME/bin/spark-shell et $SPARK_HOME/bin/pyspark démarrent une console web Spark UI http://localhost:4040/jobs/ |
Remarque: $SPARK_HOME/bin/spark-shell et $SPARK_HOME/bin/pyspark démarrent une console web Spark UI http://localhost:4040/jobs/ |
||
==Avec [[Docker]]== |
|||
Suivre https://hub.docker.com/r/sequenceiq/spark/ |
|||
=Programmation interactive en [[Scala]]= |
=Programmation interactive en [[Scala]]= |
||
Revision as of 12:07, 26 February 2016
Apache Spark™ is a fast and general engine for large-scale data processing. Write applications quickly in Java, Scala, Python, R. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python and R shells. Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
Liens
Livres
- Advanced Analytics with Spark : Patterns for Learning from Data at Scale, http://shop.oreilly.com/product/0636920035091.do
code https://github.com/sryza/aas
- Learning Spark : Lightning-Fast Big Data Analysis, http://shop.oreilly.com/product/0636920028512.do
code https://github.com/databricks/learning-spark
- Data Algorithms : Recipes for Scaling Up with Hadoop and Spark, http://shop.oreilly.com/product/0636920033950.do
code https://github.com/mahmoudparsian/data-algorithms-book
Cours
- Cours RCP216 du CNAM http://cedric.cnam.fr/vertigo/Cours/RCP216/coursIntroduction.html
Installation
depuis un Mac
wget http://wwwftp.ciril.fr/pub/apache/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz tar xvf spark-1.6.0-bin-hadoop2.6.tgz cd spark-1.6.0-bin-hadoop2.6 export SPARK_HOME=$(pwd) more README.md
Remarque: $SPARK_HOME/bin/spark-shell et $SPARK_HOME/bin/pyspark démarrent une console web Spark UI http://localhost:4040/jobs/
Avec Docker
Suivre https://hub.docker.com/r/sequenceiq/spark/
Programmation interactive en Scala
$SPARK_HOME/bin/spark-shell
Browse the Spark UI http://localhost:4040/jobs/
sc.parallelize(1 to 10000000).count()
val NUM_SAMPLES = 10000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
Programmation interactive en Python
$SPARK_HOME/bin/pyspark >>> sc.parallelize(range(1000)).count() >>> exit()
Lancement d'exemples
$SPARK_HOME/bin/run-example SparkPi $SPARK_HOME/bin/run-example mllib.LinearRegression --numIterations 1000 data/mllib/sample_linear_regression_data.txt
MASTER=spark://localhost:7077 $SPARK_HOME/bin/run-example SparkPi
$SPARK_HOME/dev/run-tests
Spark Streaming
Changer le niveau de logging (WARN) dans $SPARK_HOME/conf/log4j.properties
A partir des exemples
Dans un terminal 1, lancer l'exemple
$SPARK_HOME/bin/run-example streaming.MQTTWordCount tcp://test.mosquitto.org:1883 "test/spark/wordcount/#"
Dans un terminal 2, lancer les commandes Bash
while true
do
while read line
do
mosquitto_pub -h test.mosquitto.org -t test/spark/wordcount/readme -m "$line"
echo "$line"
done <$SPARK_HOME/README.md
done
Remarque: il faut préalablement installer mosquitto_pub via brew install mosquitto (pour MacOS X) ou sudo apt-get install mosquitto-clients (sur Debian)
En mode interactif
Télécharger les dépendances
wget https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar wget http://central.maven.org/maven2/org/apache/spark/spark-streaming-mqtt_2.11/1.6.0/spark-streaming-mqtt_2.11-1.6.0.jar
Lancer le shell de Spark
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar
Entrer dans la console le script suivant:
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf
val brokerUrl = "tcp://test.mosquitto.org:1883"
val topic = "test/spark/wordcount/#"
val ssc = new StreamingContext(sc, Seconds(10))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Sauvergarder le script ci-dessus dans mqttcount.scala et lancer la commande suivante
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript -i mqttcount.scala
Déploiement d'un cluster Spark sur Amazon EC2
Voir http://spark.apache.org/docs/latest/ec2-scripts.html
$SPARK_HOME/ec2/spark-ec2 --help
Récupérer et positionner les crédentials AWS
export AWS_ACCESS_KEY_ID=XXXX export AWS_SECRET_ACCESS_KEY=XXXX export IDENTITY_FILE=/.ssh/awskey.pem export KEY_PAIR=awskey export REGION=eu-west-1 export ZONE=eu-west-1a export SLAVES=4 export INSTANCE_TYPE=t2.micro
Création du cluster et démarrage
# launch the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
--slaves=$SLAVES --instance-type=$INSTANCE_TYPE \
launch my-spark-cluster
# get the master
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
get-master my-spark-cluster
export MASTER=$($SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE --region=$REGION --zone=$ZONE get-master my-spark-cluster | tail -n 1)
# browse the Spark UI
open http://$MASTER:8080
python -mwebbrowser http://$MASTER:8080
# browse the Ganglia UI
open http://$MASTER:5080/ganglia
python -mwebbrowser http://$MASTER:5080/ganglia
Lancement d'une application sur le cluster
Depuis le master
Se logger sur le master
# login on the master
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
login my-spark-cluster
Lancer le script (préalablement copié avec ses dépendances)
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript --files mqttcount.txt
Depuis un hôte distant
A COMPLETER
$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript --master spark://$MASTER:7077
Arrêt du cluster
# stop the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
stop my-spark-cluster
Redémarrage du cluster
# (re)start the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
start my-spark-cluster
Redémarrage des workers
# reboot the slaves
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
reboot-slaves my-spark-cluster
Terminaison et destruction du cluster
# destroy (terminate) the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
--region=$REGION --zone=$ZONE \
--delete-groups \
destroy my-spark-cluster
Options de la commande
-s SLAVES, --slaves=SLAVES
Number of slaves to launch (default: 1)
-w WAIT, --wait=WAIT DEPRECATED (no longer necessary) - Seconds to wait for
nodes to start
-k KEY_PAIR, --key-pair=KEY_PAIR
Key pair to use on instances
-i IDENTITY_FILE, --identity-file=IDENTITY_FILE
SSH private key file to use for logging into instances
-p PROFILE, --profile=PROFILE
If you have multiple profiles (AWS or boto config),
you can configure additional, named profiles by using
this option (default: none)
-t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE
Type of instance to launch (default: m1.large).
WARNING: must be 64-bit; small instances won't work
-m MASTER_INSTANCE_TYPE, --master-instance-type=MASTER_INSTANCE_TYPE
Master instance type (leave empty for same as
instance-type)
-r REGION, --region=REGION
EC2 region used to launch instances in, or to find
them in (default: us-east-1)
-z ZONE, --zone=ZONE Availability zone to launch instances in, or 'all' to
spread slaves across multiple (an additional $0.01/Gb
for bandwidthbetween zones applies) (default: a single
zone chosen at random)
-a AMI, --ami=AMI Amazon Machine Image ID to use
-v SPARK_VERSION, --spark-version=SPARK_VERSION
Version of Spark to use: 'X.Y.Z' or a specific git
hash (default: 1.6.0)
--spark-git-repo=SPARK_GIT_REPO
Github repo from which to checkout supplied commit
hash (default: https://github.com/apache/spark)
--spark-ec2-git-repo=SPARK_EC2_GIT_REPO
Github repo from which to checkout spark-ec2 (default:
https://github.com/amplab/spark-ec2)
--spark-ec2-git-branch=SPARK_EC2_GIT_BRANCH
Github repo branch of spark-ec2 to use (default:
branch-1.5)
--deploy-root-dir=DEPLOY_ROOT_DIR
A directory to copy into / on the first master. Must
be absolute. Note that a trailing slash is handled as
per rsync: If you omit it, the last directory of the
--deploy-root-dir path will be created in / before
copying its contents. If you append the trailing
slash, the directory is not created and its contents
are copied directly into /. (default: none).
--hadoop-major-version=HADOOP_MAJOR_VERSION
Major version of Hadoop. Valid options are 1 (Hadoop
1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
1)
-D [ADDRESS:]PORT Use SSH dynamic port forwarding to create a SOCKS
proxy at the given local address (for use with login)
--resume Resume installation on a previously launched cluster
(for debugging)
--ebs-vol-size=SIZE Size (in GB) of each EBS volume.
--ebs-vol-type=EBS_VOL_TYPE
EBS volume type (e.g. 'gp2', 'standard').
--ebs-vol-num=EBS_VOL_NUM
Number of EBS volumes to attach to each node as
/vol[x]. The volumes will be deleted when the
instances terminate. Only possible on EBS-backed AMIs.
EBS volumes are only attached if --ebs-vol-size > 0.
Only support up to 8 EBS volumes.
--placement-group=PLACEMENT_GROUP
Which placement group to try and launch instances
into. Assumes placement group is already created.
--swap=SWAP Swap space to set up per node, in MB (default: 1024)
--spot-price=PRICE If specified, launch slaves as spot instances with the
given maximum price (in dollars)
--ganglia Setup Ganglia monitoring on cluster (default: True).
NOTE: the Ganglia page will be publicly accessible
--no-ganglia Disable Ganglia monitoring for the cluster
-u USER, --user=USER The SSH user you want to connect as (default: root)
--delete-groups When destroying a cluster, delete the security groups
that were created
--use-existing-master
Launch fresh slaves, but use an existing stopped
master if possible
--worker-instances=WORKER_INSTANCES
Number of instances per worker: variable
SPARK_WORKER_INSTANCES. Not used if YARN is used as
Hadoop major version (default: 1)
--master-opts=MASTER_OPTS
Extra options to give to master through
SPARK_MASTER_OPTS variable (e.g
-Dspark.worker.timeout=180)
--user-data=USER_DATA
Path to a user-data file (most AMIs interpret this as
an initialization script)
--authorized-address=AUTHORIZED_ADDRESS
Address to authorize on created security groups
(default: 0.0.0.0/0)
--additional-security-group=ADDITIONAL_SECURITY_GROUP
Additional security group to place the machines in
--additional-tags=ADDITIONAL_TAGS
Additional tags to set on the machines; tags are
comma-separated, while name and value are colon
separated; ex: "Task:MySparkProject,Env:production"
--copy-aws-credentials
Add AWS credentials to hadoop configuration to allow
Spark to access S3
--subnet-id=SUBNET_ID
VPC subnet to launch instances in
--vpc-id=VPC_ID VPC to launch instances in
--private-ips Use private IPs for instances rather than public if
VPC/subnet requires that.
--instance-initiated-shutdown-behavior=INSTANCE_INITIATED_SHUTDOWN_BEHAVIOR
Whether instances should terminate when shut down or
just stop
--instance-profile-name=INSTANCE_PROFILE_NAME
IAM profile name to launch instances under
Packages
spark-avro
Integration utilities for using Spark with Apache Avro data
kafka-spark-consumer Receiver Based Low Level Kafka-Spark Consumer with builtin Back-Pressure Controller
spark-perf Performance tests for Spark
deep-spark Connecting Apache Spark with different data stores
spark-mongodb MongoDB data source for Spark SQL
spark-es ElasticSearch integration for Apache Spark
elasticsearch-hadoop Official integration between Apache Spark and Elasticsearch real-time search and analytics
magellan Geo Spatial Data Analytics on Spark
SparkTwitterAnalysis An Apache Spark standalone application using the Spark API in Scala. The application uses Simple Build(SBT) for building the project.
spark-druid-olap Spark Druid Package
SpatialSpark Big Spatial Data Processing using Spark
killrweather KillrWeather is a reference application (in progress) showing how to easily leverage and integrate Apache Spark, Apache Cassandra, and Apache Kafka for fast, streaming computations on time series data in asynchronous Akka event-driven environments.
spark-kafka Low level integration of Spark and Kafka
docker-spark Docker container for spark standalone cluster.
spark-streamingsql Manipulate Apache Spark Streaming by SQL
twitter-stream-ml Machine Learning over Twitter's stream. Using Apache Spark, Web Server and Lightning Graph server.