Spark

From air
Jump to: navigation, search

http://spark.apache.org/

Apache Spark™ is a fast and general engine for large-scale data processing. Write applications quickly in Java, Scala, Python, R. Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python and R shells. Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.

Liens

Livres

code https://github.com/sryza/aas

code https://github.com/databricks/learning-spark

code https://github.com/mahmoudparsian/data-algorithms-book

Cours

Installation

depuis un Mac

wget http://wwwftp.ciril.fr/pub/apache/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
tar xvf spark-1.6.0-bin-hadoop2.6.tgz
cd spark-1.6.0-bin-hadoop2.6
export SPARK_HOME=$(pwd)
more README.md


Remarque: $SPARK_HOME/bin/spark-shell et $SPARK_HOME/bin/pyspark démarrent une console web Spark UI http://localhost:4040/jobs/

Avec Docker

Suivre https://hub.docker.com/r/sequenceiq/spark/

Programmation interactive en Scala

$SPARK_HOME/bin/spark-shell

Browse the Spark UI http://localhost:4040/jobs/

Spark UI
sc.parallelize(1 to 10000000).count()
val NUM_SAMPLES = 10000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
  val x = Math.random()
  val y = Math.random()
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)

Programmation interactive en Python

$SPARK_HOME/bin/pyspark
>>> sc.parallelize(range(1000)).count()

>>> exit()

Lancement d'exemples

$SPARK_HOME/bin/run-example SparkPi

$SPARK_HOME/bin/run-example mllib.LinearRegression --numIterations 1000 data/mllib/sample_linear_regression_data.txt
MASTER=spark://localhost:7077 $SPARK_HOME/bin/run-example SparkPi
$SPARK_HOME/dev/run-tests

Spark Streaming

Changer le niveau de logging (WARN) dans $SPARK_HOME/conf/log4j.properties

A partir des exemples

Dans un terminal 1, lancer l'exemple

$SPARK_HOME/bin/run-example streaming.MQTTWordCount tcp://test.mosquitto.org:1883 "test/spark/wordcount/#"

Dans un terminal 2, lancer les commandes Bash

while true
do
  while read line           
  do           
    mosquitto_pub -h test.mosquitto.org -t test/spark/wordcount/readme -m "$line"
    echo "$line"
  done <$SPARK_HOME/README.md    
done   

Remarque: il faut préalablement installer mosquitto_pub via brew install mosquitto (pour MacOS X) ou sudo apt-get install mosquitto-clients (sur Debian)

En mode interactif

Télécharger les dépendances

wget https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar
wget http://central.maven.org/maven2/org/apache/spark/spark-streaming-mqtt_2.11/1.6.0/spark-streaming-mqtt_2.11-1.6.0.jar

Lancer le shell de Spark

$SPARK_HOME/bin/spark-shell --jars org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar

Entrer dans la console le script suivant:

import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf

val brokerUrl = "tcp://test.mosquitto.org:1883"
val topic = "test/spark/wordcount/#"
val ssc = new StreamingContext(sc, Seconds(10))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

Sauvergarder le script ci-dessus dans mqttcount.scala et lancer la commande suivante

$SPARK_HOME/bin/spark-shell --jars  org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript -i mqttcount.scala

Pour continuer: un exemple orienté capteurs avec Apache Kafka, Akka, Apache Cassandra, Spark : https://github.com/killrweather/killrweather

Déploiement d'un cluster Spark sur Amazon EC2

Voir http://spark.apache.org/docs/latest/ec2-scripts.html

$SPARK_HOME/ec2/spark-ec2 --help

Récupérer et positionner les crédentials AWS

export AWS_ACCESS_KEY_ID=XXXX
export AWS_SECRET_ACCESS_KEY=XXXX

export IDENTITY_FILE=/.ssh/awskey.pem
export KEY_PAIR=awskey
export REGION=eu-west-1
export ZONE=eu-west-1a
export SLAVES=4
export INSTANCE_TYPE=t2.micro

Création du cluster et démarrage

# launch the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
   --slaves=$SLAVES --instance-type=$INSTANCE_TYPE \
     launch my-spark-cluster

# get the master
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
    get-master my-spark-cluster 

export MASTER=$($SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE    --region=$REGION --zone=$ZONE     get-master my-spark-cluster | tail -n 1)

# browse the Spark UI
open http://$MASTER:8080
python -mwebbrowser http://$MASTER:8080

# browse the Ganglia UI
open http://$MASTER:5080/ganglia
python -mwebbrowser http://$MASTER:5080/ganglia

Lancement d'une application sur le cluster

Depuis le master

Se logger sur le master

# login on the master
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
    login my-spark-cluster 

Lancer le script (préalablement copié avec ses dépendances)

$SPARK_HOME/bin/spark-shell --jars  org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript --files mqttcount.txt

Depuis un hôte distant

A COMPLETER

$SPARK_HOME/bin/spark-shell --jars  org.eclipse.paho.client.mqttv3-1.0.2.jar,spark-streaming-mqtt_2.11-1.6.0.jar --name MQTTCountScript  --master spark://$MASTER:7077

Arrêt du cluster

# stop the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
     stop my-spark-cluster

Redémarrage du cluster

# (re)start the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
     start my-spark-cluster

Redémarrage des workers

# reboot the slaves
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
     reboot-slaves my-spark-cluster

Terminaison et destruction du cluster

# destroy (terminate) the cluster
$SPARK_HOME/ec2/spark-ec2 --key-pair=$KEY_PAIR --identity-file=$IDENTITY_FILE \
   --region=$REGION --zone=$ZONE \
   --delete-groups \
     destroy my-spark-cluster

Options de la commande

  -s SLAVES, --slaves=SLAVES
                        Number of slaves to launch (default: 1)
  -w WAIT, --wait=WAIT  DEPRECATED (no longer necessary) - Seconds to wait for
                        nodes to start
  -k KEY_PAIR, --key-pair=KEY_PAIR
                        Key pair to use on instances
  -i IDENTITY_FILE, --identity-file=IDENTITY_FILE
                        SSH private key file to use for logging into instances
  -p PROFILE, --profile=PROFILE
                        If you have multiple profiles (AWS or boto config),
                        you can configure additional, named profiles by using
                        this option (default: none)
  -t INSTANCE_TYPE, --instance-type=INSTANCE_TYPE
                        Type of instance to launch (default: m1.large).
                        WARNING: must be 64-bit; small instances won't work
  -m MASTER_INSTANCE_TYPE, --master-instance-type=MASTER_INSTANCE_TYPE
                        Master instance type (leave empty for same as
                        instance-type)
  -r REGION, --region=REGION
                        EC2 region used to launch instances in, or to find
                        them in (default: us-east-1)
  -z ZONE, --zone=ZONE  Availability zone to launch instances in, or 'all' to
                        spread slaves across multiple (an additional $0.01/Gb
                        for bandwidthbetween zones applies) (default: a single
                        zone chosen at random)
  -a AMI, --ami=AMI     Amazon Machine Image ID to use
  -v SPARK_VERSION, --spark-version=SPARK_VERSION
                        Version of Spark to use: 'X.Y.Z' or a specific git
                        hash (default: 1.6.0)
  --spark-git-repo=SPARK_GIT_REPO
                        Github repo from which to checkout supplied commit
                        hash (default: https://github.com/apache/spark)
  --spark-ec2-git-repo=SPARK_EC2_GIT_REPO
                        Github repo from which to checkout spark-ec2 (default:
                        https://github.com/amplab/spark-ec2)
  --spark-ec2-git-branch=SPARK_EC2_GIT_BRANCH
                        Github repo branch of spark-ec2 to use (default:
                        branch-1.5)
  --deploy-root-dir=DEPLOY_ROOT_DIR
                        A directory to copy into / on the first master. Must
                        be absolute. Note that a trailing slash is handled as
                        per rsync: If you omit it, the last directory of the
                        --deploy-root-dir path will be created in / before
                        copying its contents. If you append the trailing
                        slash, the directory is not created and its contents
                        are copied directly into /. (default: none).
  --hadoop-major-version=HADOOP_MAJOR_VERSION
                        Major version of Hadoop. Valid options are 1 (Hadoop
                        1.0.4), 2 (CDH 4.2.0), yarn (Hadoop 2.4.0) (default:
                        1)
  -D [ADDRESS:]PORT     Use SSH dynamic port forwarding to create a SOCKS
                        proxy at the given local address (for use with login)
  --resume              Resume installation on a previously launched cluster
                        (for debugging)
  --ebs-vol-size=SIZE   Size (in GB) of each EBS volume.
  --ebs-vol-type=EBS_VOL_TYPE
                        EBS volume type (e.g. 'gp2', 'standard').
  --ebs-vol-num=EBS_VOL_NUM
                        Number of EBS volumes to attach to each node as
                        /vol[x]. The volumes will be deleted when the
                        instances terminate. Only possible on EBS-backed AMIs.
                        EBS volumes are only attached if --ebs-vol-size > 0.
                        Only support up to 8 EBS volumes.
  --placement-group=PLACEMENT_GROUP
                        Which placement group to try and launch instances
                        into. Assumes placement group is already created.
  --swap=SWAP           Swap space to set up per node, in MB (default: 1024)
  --spot-price=PRICE    If specified, launch slaves as spot instances with the
                        given maximum price (in dollars)
  --ganglia             Setup Ganglia monitoring on cluster (default: True).
                        NOTE: the Ganglia page will be publicly accessible
  --no-ganglia          Disable Ganglia monitoring for the cluster
  -u USER, --user=USER  The SSH user you want to connect as (default: root)
  --delete-groups       When destroying a cluster, delete the security groups
                        that were created
  --use-existing-master
                        Launch fresh slaves, but use an existing stopped
                        master if possible
  --worker-instances=WORKER_INSTANCES
                        Number of instances per worker: variable
                        SPARK_WORKER_INSTANCES. Not used if YARN is used as
                        Hadoop major version (default: 1)
  --master-opts=MASTER_OPTS
                        Extra options to give to master through
                        SPARK_MASTER_OPTS variable (e.g
                        -Dspark.worker.timeout=180)
  --user-data=USER_DATA
                        Path to a user-data file (most AMIs interpret this as
                        an initialization script)
  --authorized-address=AUTHORIZED_ADDRESS
                        Address to authorize on created security groups
                        (default: 0.0.0.0/0)
  --additional-security-group=ADDITIONAL_SECURITY_GROUP
                        Additional security group to place the machines in
  --additional-tags=ADDITIONAL_TAGS
                        Additional tags to set on the machines; tags are
                        comma-separated, while name and value are colon
                        separated; ex: "Task:MySparkProject,Env:production"
  --copy-aws-credentials
                        Add AWS credentials to hadoop configuration to allow
                        Spark to access S3
  --subnet-id=SUBNET_ID
                        VPC subnet to launch instances in
  --vpc-id=VPC_ID       VPC to launch instances in
  --private-ips         Use private IPs for instances rather than public if
                        VPC/subnet requires that.
  --instance-initiated-shutdown-behavior=INSTANCE_INITIATED_SHUTDOWN_BEHAVIOR
                        Whether instances should terminate when shut down or
                        just stop
  --instance-profile-name=INSTANCE_PROFILE_NAME
                        IAM profile name to launch instances under

Programmation

Packages

http://spark-packages.org/


spark-avro Integration utilities for using Spark with Apache Avro data

kafka-spark-consumer Receiver Based Low Level Kafka-Spark Consumer with builtin Back-Pressure Controller

spark-perf Performance tests for Spark

deep-spark Connecting Apache Spark with different data stores

spark-mongodb MongoDB data source for Spark SQL

spark-es ElasticSearch integration for Apache Spark

elasticsearch-hadoop Official integration between Apache Spark and Elasticsearch real-time search and analytics

magellan Geo Spatial Data Analytics on Spark

SparkTwitterAnalysis An Apache Spark standalone application using the Spark API in Scala. The application uses Simple Build(SBT) for building the project.

spark-druid-olap Spark Druid Package

SpatialSpark Big Spatial Data Processing using Spark

killrweather KillrWeather is a reference application (in progress) showing how to easily leverage and integrate Apache Spark, Apache Cassandra, and Apache Kafka for fast, streaming computations on time series data in asynchronous Akka event-driven environments.

spark-kafka Low level integration of Spark and Kafka

docker-spark Docker container for spark standalone cluster.

spark-streamingsql Manipulate Apache Spark Streaming by SQL

twitter-stream-ml Machine Learning over Twitter's stream. Using Apache Spark, Web Server and Lightning Graph server.