VT2018 ApacheBeam

From air
Revision as of 22:15, 16 December 2018 by Benjamin.Besnier (talk | contribs) (Demonstration)
Jump to: navigation, search

Auteur

  • Nom : Benjamin BESNIER
  • Mail : besnierbenjamin73@gmail.com
  • Sujet : Apache Beam

Résumé

Apache Beam permet de créer des pipelines pouvant être exécuté par différents runners. De ce fait, on peut garder le même code pour réaliser différentes chose suivant le runner choisit. Un pipeline reçoit des données en entrée, puis effectue différents traitement sur celle-ci pour ensuite rendre le résultat en sortie. Le pipeline créer peut recevoir deux types de données : Des données dites bornée (i.e Une table de base de données, une fichier texte) ou des données non bornées (i.e un flux de données arrivant constamment). Apache Beam est Open-source, et le code est trouvable sur GitHub.

Abstract

Apache Beam allows the users to create pipeline which can be executed by several runners. According to that, we can keep the same code to do differents things. A pipeline get data as input, then exectue several steps on the data and give the result as output. The pipeline created can have two diffents data types as input : the first is bounded data (i.e database table, text file) and the second is unbounded file (i.e stream of data). Apache Beam is Open-source and the code is available on GitHub.

Synthèse

La création de pipeline permet d'automatiser le traitement de données récurrente. Comme lors du projet Ecom ou on a dû mettre en place un pipeline pour réaliser l'intégration continue

Utilisation

Apache Beam peut être utiliser dans les langages suivants :

  • Java
  • Python
  • Go
  • Scala

Les runners disponibles sont les suivants :

  • Google cloud dataflow
  • Apache samza
  • Apache gearpump
  • Apache flink
  • Apache apex
  • Apache spark

Demonstration

Dans cette démonstration, on va lancer un pipeline permettant de récuperer certains texte de shakespeare commen données d'entrée pour ensuite avec le nombre d'apparition de chaque mot en sortie.

Prérequi pour Java :

  • Java
  • Maven

On lance la commande suivante dans un terminal pour récupérer les codes Java de pipelines d'exemple :

mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.9.0 \
     -DgroupId=org.example \
     -DartifactId=word-count-beam \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

Il y a alors une classe java appelée MinimalWordCount.java

Celle-ci peut être représenté par le pipeline suivant :
PipelineBeam.png

Elle peut être exécuté de la façon suivante :

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount


Si l'on regarde plus attentivement le code de la classe :

  1 /*
  2  * Licensed to the Apache Software Foundation (ASF) under one
  3  * or more contributor license agreements.  See the NOTICE file
  4  * distributed with this work for additional information
  5  * regarding copyright ownership.  The ASF licenses this file
  6  * to you under the Apache License, Version 2.0 (the
  7  * "License"); you may not use this file except in compliance
  8  * with the License.  You may obtain a copy of the License at
  9  *
 10  *     http://www.apache.org/licenses/LICENSE-2.0
 11  *
 12  * Unless required by applicable law or agreed to in writing, software
 13  * distributed under the License is distributed on an "AS IS" BASIS,
 14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  * See the License for the specific language governing permissions and
 16  * limitations under the License.
 17  */
 18 package org.apache.beam.examples;
 19 
 20 import java.util.Arrays;
 21 import org.apache.beam.sdk.Pipeline;
 22 import org.apache.beam.sdk.io.TextIO;
 23 import org.apache.beam.sdk.options.PipelineOptions;
 24 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 25 import org.apache.beam.sdk.transforms.Count;
 26 import org.apache.beam.sdk.transforms.Filter;
 27 import org.apache.beam.sdk.transforms.FlatMapElements;
 28 import org.apache.beam.sdk.transforms.MapElements;
 29 import org.apache.beam.sdk.values.KV;
 30 import org.apache.beam.sdk.values.TypeDescriptors;
 31 
 32 /**
 33  * An example that counts words in Shakespeare.
 34  *
 35  * <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more
 36  * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
 37  * argument processing, and focus on construction of the pipeline, which chains together the
 38  * application of core transforms.
 39  *
 40  * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the
 41  * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
 42  * concepts.
 43  *
 44  * <p>Concepts:
 45  *
 46  * <pre>
 47  *   1. Reading data from text files
 48  *   2. Specifying 'inline' transforms
 49  *   3. Counting items in a PCollection
 50  *   4. Writing data to text files
 51  * </pre>
 52  *
 53  * <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You
 54  * can see the results in the output files in your current working directory, with names like
 55  * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
 56  * file service.
 57  */
 58 public class MinimalWordCount {
 59 
 60   public static void main(String[] args) {
 61 
 62     // Create a PipelineOptions object. This object lets us set various execution
 63     // options for our pipeline, such as the runner you wish to use. This example
 64     // will run with the DirectRunner by default, based on the class path configured
 65     // in its dependencies.
 66     PipelineOptions options = PipelineOptionsFactory.create();
 67 
 68     // In order to run your pipeline, you need to make following runner specific changes:
 69     //
 70     // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
 71     // or FlinkRunner.
 72     // CHANGE 2/3: Specify runner-required options.
 73     // For BlockingDataflowRunner, set project and temp location as follows:
 74     //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
 75     //   dataflowOptions.setRunner(BlockingDataflowRunner.class);
 76     //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
 77     //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
 78     // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
 79     // for more details.
 80     //   options.as(FlinkPipelineOptions.class)
 81     //      .setRunner(FlinkRunner.class);
 82 
 83     // Create the Pipeline object with the options we defined above
 84     Pipeline p = Pipeline.create(options);
 85 
 86     // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
 87     // of input text files. TextIO.Read returns a PCollection where each element is one line from
 88     // the input text (a set of Shakespeare's texts).
 89 
 90     // This example reads a public data set consisting of the complete works of Shakespeare.
 91     p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
 92 
 93         // Concept #2: Apply a FlatMapElements transform the PCollection of text lines.
 94         // This transform splits the lines in PCollection<String>, where each element is an
 95         // individual word in Shakespeare's collected texts.
 96         .apply(
 97             FlatMapElements.into(TypeDescriptors.strings())
 98                 .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
 99         // We use a Filter transform to avoid empty word
100         .apply(Filter.by((String word) -> !word.isEmpty()))
101         // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
102         // transform returns a new PCollection of key/value pairs, where each key represents a
103         // unique word in the text. The associated value is the occurrence count for that word.
104         .apply(Count.perElement())
105         // Apply a MapElements transform that formats our PCollection of word counts into a
106         // printable string, suitable for writing to an output file.
107         .apply(
108             MapElements.into(TypeDescriptors.strings())
109                 .via(
110                     (KV<String, Long> wordCount) ->
111                         wordCount.getKey() + ": " + wordCount.getValue()))
112         // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
113         // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
114         // formatted strings) to a series of text files.
115         //
116         // By default, it will write to a set of files with names like wordcounts-00001-of-00005
117         .apply(TextIO.write().to("wordcounts"));
118 
119     p.run().waitUntilFinish();
120   }
121 }

Références

https://beam.apache.org/documentation/programming-guide/
https://beam.apache.org/documentation/pipelines/design-your-pipeline/
https://meritis.fr/bigdata/apache-beam-projet-dunification-prometteur/
https://www.slideshare.net/JeanBaptisteOnofr/introduction-to-apache-beam
https://github.com/apache/beam
https://beam.apache.org/get-started/wordcount-example/#windowedwordcount-example