apache beam

Apache Beam to project rozpoczęty w 2016 roku, bardzo mocno wspierany przez Google. Jego zadaniem jest stworzenie uniwersalnego modelu przetwarzania Big Data. Raz napisany kod, który przetwarza dane, powinien być możliwy to uruchomienia w technologiach, takich jak Apache Spark czy DataFlow. W poniższym tutorialu, zapoznamy się z jego podstawami oraz napiszemy nasz pierwszy kawałek kodu, który uruchomimy na własnym komputerze. Jest to bardzo dobry krok, aby rozpocząć przygodę z Big Data.

Na początek odpowiedzmy sobie na pytanie do czego służy Apache Beam i jak się ma do Apache Spark czy też Pandas. Otóż Apache Beam jest w swym zastosowaniu bardzo podobny do Apache Spark. Za jego pomocą możemy przetwarzać zbiory danych, zarówno z plików jak i ze strumieni danych, w sposób rozproszony. To znaczy na wielu maszynach na raz.

  • W przeciwieństwie do dobrze znanego analitykom danych Pandas, nie jesteśmy ograniczeni do mocy naszego komputera oraz danych w pliku. Z łatwością nawiążemy połączenie z Kafka oraz googlowyh Pub/Sub, oraz będziemy wstanie przetwarzać dane w środowisku rozproszonym.
  • W przeciwieństwie do dobrze znanego inżynierom danych Apache Spark, nie musimy wykorzystywać API Sparka, piszemy nasz kod z wykorzystaniem Apache Beam. Natomiast następnie będziemy mogli go wykonać na swoim komputerze, serwerze Apache Spark, Flirk czy też googlowych DataFlow.

apache beam

Nie wykonamy w nim analizy danych porównywalnej do Pandas, tak jak nie zjemy zupy widelcem, oraz nie napiszemy tak efektywnego kodu jak w Apache Spark, tak jak nie porąbiemy drzewa nożem do masła. Ale nie to jest celem Apache Beam. Celem jest elastyczność i uniezależnienie od technologii. Czyli mamy do czynienia ze swoistym scyzorykiem szwajcarskim przetwarzania big data.

Możemy napisać nasz kod w Python, Java, Go oraz Scala

Możemy go uruchamiać na Flink, Spark, DataFlow, Sanza oraz Twister2.

apache beam python

Poniżej  zainstalujemy Apache Beam jako bibliotekę na naszym komputerze oraz napiszemy proste przykładowe programy z jego użyciem. Na zakończenie, będziemy wiedzieć na czym polega idea Apache Beam, znać jego podstawy oraz wiedzieć jak poszerzać swoje umiejętności z wiązane z tą technologią.

Zaczynajmy!

Instalacja Apache Beam Python

Potrzebujemy dwóch pakietów – Apache Beam oraz wheel

pip install wheel apache-beam

Po ukończonej instalacji, możemy zaimportować bibliotekę apache beam oraz z niej skorzystać:

import apache_beam as beam

var1 = [1,3,5,7]
var2 = var1 | beam.Map(lambda x: x * 2)
print(var2)

To co widzimy w powyższym kodzie, to zastosowanie transformacji dla zbioru var1. W tym przypadku, dla każdej wartości z listy var1, wykonujemy funkcję Map, która jest podobna do funkcji apply znanej z Pandas, mnożymy wartości razy 2 oraz zapisujemy wynik do listy var2.

Apache Beam – podstawowe pojęcia

Zanim przejdziemy do pisania bardziej złożonych programó, musimy zaznajomić się z podstawowymi pojęciami. Nie ma ich dużo:

  • Batch processing – skończony zbiór danych, w postaci pliku
  • Streaming – nieskończony zbiór danych, które napływają cały czas, na w systemach Kafka czy Pub/Sub
  • Pipeline – definicja naszego przetwarzania danych, wliczając ich odczytanie, transformowanie oraz zapisywanie.
  • PCollection – odczytane w pipeline dane. Coś na wzór DataFrame z Pandas. Może zawierać jeden lub więcej rekordów
  • PTransform – operacja którą wykonujemy na PCollection. Może zwracać PCollection które jest takie same, zmienione, zmniejszone lub nie zwracać nic.

Standardowa struktura programu z użyciem Apache Beam jest następująca:

  1. Utworzenie pipeline
  2. Odczyt danych z plików, Kafka, PubSub lub innych źródeł
  3. Zapis danych do PCollection
  4. Wykonanie szeregu PTransform na naszym PCollection
  5. Zapis danych w miejsce docelowe, jak bazy danych, pliki czy też Kafka lub Pub/Sub

Ma to sens.

Apache beam – pierwszy program

W celu uproszczenia naszego pierwszego programu, skupimy się na batch processing. Czyli przetwarzaniu danych z plików. W tym celu skorzystamy z prostego zbioru danych Halloween. Zawiera regiony w US oraz informację o najbardziej popularnych kostiumach na Halloween.

Zbiór musimy poprać na nasz komputer, możemy go znaleźć tutaj. Wygląda tak:

apache beam batch processing

Mało biznesowy, ale może być. Taki akurat miałem pod ręką. W sumie na 52 rekordy. Kolumna region to stan w US, kolumna '1′ to najbardziej popularny kostium. Plik należy pobrać i zapisąć na swoim komputerze.

  • Wczytanie oraz wyświetlenie zawartości pliku

Aby wczytać i wyświetlić nasz pliku, musimy – otworzyć pipeline, wczytać plik, zastosować funkcję Map, do której przekażemy funkcję która ma być wykonana dla każdego rekordu:

import apache_beam as beam

with beam.Pipeline() as p:
    lines = p | beam.io.ReadFromText('/home/analityk/projects/beam_new/Halloween.csv') 
    lines | beam.Map(print)

Prościej się już chyba nie da. Funkcja Map jest naszą funkcją PTransform, która jest wykonywana dla 'lines', czyli naszego PCollection.

Zwróćmy uwagę na znak '|'. Powoduje on, że podana po prawej stronie PTransformation jest stosowana na PCollection po lewej stronie. W wyniku tej operacji, dostajemy nowe PCollection.

Apache Beam wspiera dość wiele różnych PTransform które pozwalają nam na manipulacje zbiorem danych w elastyczny sposób. Poniżej znajduje się ich lista:

ParDo
GroupByKey
CoGroupByKey
Combine
Flatten
Partition

Nie będziemy omawiać wszystkich. Zobaczymy tylko podstawowe, aby przekonać się że ich użycie jest dość proste. Najczęściej stosowana z nich to ParDo.

  • Zastosowanie uniwersalnej funkcji PTransform

ParDo jest bardzo podobne do Map. Pozwala nam na zastosowanie dowolnej funkcji na każdym rekordzie z naszego Pcollection. Różnica jest taka, że funkcja Map dla każdego rekordu, musi zwracać dokładnie jeden rekord. Natomiast transformacja ParDo, dla każdego rekordu zwraca 0, 1 lub więcej rekordów, przez co jest o wiele bardziej elastyczna i nadaje się do filtrowania, czyszczenia danych, modyfikacji itd.

Poniżej napiszemy naszą własna funkcję którą następnie za pomocą ParDo, przekażemy do wykonania dla każdego rekordu. Będzie ona robić 2 rzeczy – (1) Filtrować regiony w których najpopularniejszym kostiumem był kostium królika, (2) Dla wszystkich pozostałym regionów zwracać krotkę z nazwą kostiumu oraz nazwą regionu, czyli (kostium, region). Zobaczmy:

import apache_beam as beam

class ToKeyValue(beam.DoFn):
    def process(self, element):
        x = element.split(',')
        if x[1] == 'Rabbit': return
        kv = (x[1],x[0])
        return [kv]

with beam.Pipeline() as p:
    lines = p | ( beam.io.ReadFromText('/home/analityk/projects/beam_new/Halloween.csv') 
        | beam.ParDo(ToKeyValue()) 
        | beam.Map(print) )

W wyniku otrzymaliśmy:

apache beam i big data

Czyli dokładnie tak jak chcieliśmy. Mamy tutaj po kolei- kostium oraz region. Strój rabbit został usunięty.

Zwróćmy uwagę że do naszej funkcji ParDo(), przekazaliśmy klasę, która musi dziedziczyć po beam.DoFn. Wewnątrz klasy jest funkcja process. Jest to nazwa którą narzuca nam Beam. Natomiast kod jest funkcji, jest naszym kodem, który chcemy wykonać dla każdego elementu w PCollection.

Spójrzmy jeszcze na inną PTransform

  • Grupowanie danych

niewątpliwie bardzo często wykonywaną operacją jest grupowanie danych. w Apache beam możemy to zrobić za pomocą PTransform, takich jak GroupByKey, czy też Combine.

GroupByKey wymaga aby elementy w naszym PCollect były postaci (klucz, wartość). Ale tak już mamy, gdyż zadbaliśmy o to wcześniej. Teraz zgrupujmy wszystkie regiony dla każdego kostiumu:

import apache_beam as beam

class ToKeyValue(beam.DoFn):
    def process(self, element):
        x = element.split(',')
        if x[1] == 'Rabbit': return
        kv = (x[1],x[0])
        return [kv]


with beam.Pipeline() as p:
    lines = p | ( beam.io.ReadFromText('/home/analityk/projects/beam_new/Halloween.csv') 
        | beam.ParDo(ToKeyValue()) 
        | beam.GroupByKey()
        | beam.Map(print) )

W rezultacie otrzymaliśmy kostium oraz listę regionów w którym jest najpopularniejszy. Proste.

apache beam i big data

Tak naprawdę jest to ten sam kod, w którym dodaliśmy jedną PTransform. GroupByKey(). I na tym polega pisanie kodów z użyciem Apache Beam. Zobaczmy jeszcze na jeden przykład:

Klasyka big data, czyli zliczanie słów

Bardzo często, pierwszym programem który piszemy w świecie Big data jest zliczanie, które słowo, wystąpiło ile razy, w danym pliku txt. Jest to coś w rodzaju Hello World kiedy uczymy się nowego języka programowania.

Jako plik txt weźmiemy tekst z książki Romeo i Julia, który możemy poprać tutaj.

Najprostsza wersja programy z udziałem Apache Beam, który zlicza słowa wygląda tak:

import apache_beam as beam
import re

with beam.Pipeline() as p: 
        lines = p | "Wczytujemy plik" >> beam.io.ReadFromText('/home/dwc/datasets/romeo.txt')\
                  | "Każde słowo w nowej linii >>" beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))\
                  | "Zliczamy wystąpienie każdego słowa" >> beam.combiners.Count.PerElement()\
                  | "Wynik wypisujemy na ekran" >> beam.Map(print)\
                  | "Wynik zapisujemy do pliku" >> beam.io.WriteToText('/home/dwc/datasets/wordcount.txt')

Bardziej rozbudowane rozwiązanie, możemy obejrzeć na stronie apache beam, pod tym adresem.

Podsumowanie oraz dalsze kroki

Tak jak widzimy, przetwarzanie danych z użyciem Apache Beam jest relatywnie proste oraz przyjemne. Nie wiem czy google przyćmi Apache Spark czy też nie, jednak na pewno projekt będzie się rozwijać dalej, google będzie w niego mocno inwestować i jeżeli chcemy być inżynierem danych w google cloud platform, jest to technologia którą musimy znać.

Jako dalsze kroki polecam:

  • Manual pisania kodu z użyciem Python oraz Apache beam- tutaj
  • Absolutnie rewelacyjny artykuł na temat problemów z wiązanych z przetwarzaniem danych – tutaj

Facebook Comments