From air
Jump to: navigation, search

Apache Spark™ is a fast and general engine for large-scale data processing. Write applications quickly in Java, Scala, Python, R. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python and R shells. Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.








depuis un Mac

tar xvf spark-1.6.0-bin-hadoop2.6.tgz
cd spark-1.6.0-bin-hadoop2.6
export SPARK_HOME=$(pwd)

Remarque: $SPARK_HOME/bin/spark-shell et $SPARK_HOME/bin/pyspark démarrent une console web Spark UI http://localhost:4040/jobs/

Avec Docker


Programmation interactive en Scala


Browse the Spark UI http://localhost:4040/jobs/

Spark UI
sc.parallelize(1 to 10000000).count()
val NUM_SAMPLES = 10000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
  val x = Math.random()
  val y = Math.random()
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)

Programmation interactive en Python

>>> sc.parallelize(range(1000)).count()

>>> exit()

Lancement d'exemples

$SPARK_HOME/bin/run-example SparkPi

$SPARK_HOME/bin/run-example mllib.LinearRegression --numIterations 1000 data/mllib/sample_linear_regression_data.txt
MASTER=spark://localhost:7077 $SPARK_HOME/bin/run-example SparkPi

Spark Streaming

Changer le niveau de logging (WARN) dans $SPARK_HOME/conf/

A partir des exemples

Dans un terminal 1, lancer l'exemple

$SPARK_HOME/bin/run-example streaming.MQTTWordCount tcp:// "test/spark/wordcount/#"

Dans un terminal 2, lancer les commandes Bash

while true
  while read line           
    mosquitto_pub -h -t test/spark/wordcount/readme -m "$line"
    echo "$line"
  done <$SPARK_HOME/    

Remarque: il faut préalablement installer mosquitto_pub via brew install mosquitto (pour MacOS X) ou sudo apt-get install mosquitto-clients (sur Debian)

En mode interactif

Télécharger les dépendances


Lancer le shell de Spark

$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar

Entrer dans la console le script suivant:

import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf

val brokerUrl = "tcp://"
val topic = "test/spark/wordcount/#"
val ssc = new StreamingContext(sc, Seconds(10))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.split(" "))
val wordCounts = => (x, 1)).reduceByKey(_ + _)

Sauvergarder le script ci-dessus dans mqttcount.scala et lancer la commande suivante

$SPARK_HOME/bin/spark-shell --jars  org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript -i mqttcount.scala

Pour continuer: un exemple orienté capteurs avec Apache Kafka, Akka, Apache Cassandra, Spark :

Déploiement d'un cluster Spark sur Amazon EC2


$SPARK_HOME/ec2/spark-ec2 --help

Récupérer et positionner les crédentials AWS


export IDENTITY_FILE=/.ssh/awskey.pem
export KEY_PAIR=awskey
export REGION=eu-west-1
export ZONE=eu-west-1a
export SLAVES=4
export INSTANCE_TYPE=t2.micro

Création du cluster et démarrage

# launch the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
   --slaves=$SLAVES --instance-type=$INSTANCE_TYPE \
     launch my-spark-cluster

# get the master
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
    get-master my-spark-cluster 

export MASTER=$($SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE    --region=$REGION --zone=$ZONE     get-master my-spark-cluster | tail -n 1)

# browse the Spark UI
open http://$MASTER:8080
python -mwebbrowser http://$MASTER:8080

# browse the Ganglia UI
open http://$MASTER:5080/ganglia
python -mwebbrowser http://$MASTER:5080/ganglia

Lancement d'une application sur le cluster

Depuis le master

Se logger sur le master

# login on the master
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
    login my-spark-cluster 

Lancer le script (préalablement copié avec ses dépendances)

$SPARK_HOME/bin/spark-shell --jars  org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript --files mqttcount.txt

Depuis un hôte distant


$SPARK_HOME/bin/spark-shell --jars  org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript  --master spark://$MASTER:7077

Arrêt du cluster

# stop the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
     stop my-spark-cluster

Redémarrage du cluster

# (re)start the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
     start my-spark-cluster

Redémarrage des workers

# reboot the slaves
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
     reboot-slaves my-spark-cluster

Terminaison et destruction du cluster

# destroy (terminate) the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
   --delete-groups \
     destroy my-spark-cluster

Options de la commande

  -s SLAVES, --slaves=SLAVES
                        Number of slaves to launch (default: 1)
  -w WAIT, --wait=WAIT  DEPRECATED (no longer necessary) - Seconds to wait for
                        nodes to start
  -k KEY_PAIR, --key-pair=KEY_PAIR
                        Key pair to use on instances
  -i IDENTITY_FILE, --identity-file=IDENTITY_FILE
                        SSH private key file to use for logging into instances
  -p PROFILE, --profile=PROFILE
                        If you have multiple profiles (AWS or boto config),
                        you can configure additional, named profiles by using
                        this option (default: none)
  -t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE
                        Type of instance to launch (default: m1.large).
                        WARNING: must be 64-bit; small instances won't work
                        Master instance type (leave empty for same as
  -r REGION, --region=REGION
                        EC2 region used to launch instances in, or to find
                        them in (default: us-east-1)
  -z ZONE, --zone=ZONE  Availability zone to launch instances in, or 'all' to
                        spread slaves across multiple (an additional $0.01/Gb
                        for bandwidthbetween zones applies) (default: a single
                        zone chosen at random)
  -a AMI, --ami=AMI     Amazon Machine Image ID to use
  -v SPARK_VERSION, --spark-version=SPARK_VERSION
                        Version of Spark to use: 'X.Y.Z' or a specific git
                        hash (default: 1.6.0)
                        Github repo from which to checkout supplied commit
                        hash (default:
                        Github repo from which to checkout spark-ec2 (default:
                        Github repo branch of spark-ec2 to use (default:
                        A directory to copy into / on the first master. Must
                        be absolute. Note that a trailing slash is handled as
                        per rsync: If you omit it, the last directory of the
                        --deploy-root-dir path will be created in / before
                        copying its contents. If you append the trailing
                        slash, the directory is not created and its contents
                        are copied directly into /. (default: none).
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
  -D [ADDRESS:]PORT     Use SSH dynamic port forwarding to create a SOCKS
                        proxy at the given local address (for use with login)
  --resume              Resume installation on a previously launched cluster
                        (for debugging)
  --ebs-vol-size=SIZE   Size (in GB) of each EBS volume.
                        EBS volume type (e.g. 'gp2', 'standard').
                        Number of EBS volumes to attach to each node as
                        /vol[x]. The volumes will be deleted when the
                        instances terminate. Only possible on EBS-backed AMIs.
                        EBS volumes are only attached if --ebs-vol-size > 0.
                        Only support up to 8 EBS volumes.
                        Which placement group to try and launch instances
                        into. Assumes placement group is already created.
  --swap=SWAP           Swap space to set up per node, in MB (default: 1024)
  --spot-price=PRICE    If specified, launch slaves as spot instances with the
                        given maximum price (in dollars)
  --ganglia             Setup Ganglia monitoring on cluster (default: True).
                        NOTE: the Ganglia page will be publicly accessible
  --no-ganglia          Disable Ganglia monitoring for the cluster
  -u USER, --user=USER  The SSH user you want to connect as (default: root)
  --delete-groups       When destroying a cluster, delete the security groups
                        that were created
                        Launch fresh slaves, but use an existing stopped
                        master if possible
                        Number of instances per worker: variable
                        SPARK_WORKER_INSTANCES. Not used if YARN is used as
                        Hadoop major version (default: 1)
                        Extra options to give to master through
                        SPARK_MASTER_OPTS variable (e.g
                        Path to a user-data file (most AMIs interpret this as
                        an initialization script)
                        Address to authorize on created security groups
                        Additional security group to place the machines in
                        Additional tags to set on the machines; tags are
                        comma-separated, while name and value are colon
                        separated; ex: "Task:MySparkProject,Env:production"
                        Add AWS credentials to hadoop configuration to allow
                        Spark to access S3
                        VPC subnet to launch instances in
  --vpc-id=VPC_ID       VPC to launch instances in
  --private-ips         Use private IPs for instances rather than public if
                        VPC/subnet requires that.
                        Whether instances should terminate when shut down or
                        just stop
                        IAM profile name to launch instances under



spark-avro Integration utilities for using Spark with Apache Avro data

kafka-spark-consumer Receiver Based Low Level Kafka-Spark Consumer with builtin Back-Pressure Controller

spark-perf Performance tests for Spark

deep-spark Connecting Apache Spark with different data stores

spark-mongodb MongoDB data source for Spark SQL

spark-es ElasticSearch integration for Apache Spark

elasticsearch-hadoop Official integration between Apache Spark and Elasticsearch real-time search and analytics

magellan Geo Spatial Data Analytics on Spark

SparkTwitterAnalysis An Apache Spark standalone application using the Spark API in Scala. The application uses Simple Build(SBT) for building the project.

spark-druid-olap Spark Druid Package

SpatialSpark Big Spatial Data Processing using Spark

killrweather KillrWeather is a reference application (in progress) showing how to easily leverage and integrate Apache Spark, Apache Cassandra, and Apache Kafka for fast, streaming computations on time series data in asynchronous Akka event-driven environments.

spark-kafka Low level integration of Spark and Kafka

docker-spark Docker container for spark standalone cluster.

spark-streamingsql Manipulate Apache Spark Streaming by SQL

twitter-stream-ml Machine Learning over Twitter's stream. Using Apache Spark, Web Server and Lightning Graph server.