Apache Beam to project rozpoczęty w 2016 roku, bardzo mocno wspierany przez Google. Jego zadaniem jest stworzenie uniwersalnego modelu przetwarzania Big Data. Raz napisany kod, który przetwarza dane, powinien być możliwy to uruchomienia w technologiach, takich jak Apache Spark czy DataFlow. W poniższym tutorialu, zapoznamy się z jego podstawami oraz napiszemy nasz pierwszy kawałek kodu, który uruchomimy na własnym komputerze. Jest to bardzo dobry krok, aby rozpocząć przygodę z Big Data.
Na początek odpowiedzmy sobie na pytanie do czego służy Apache Beam i jak się ma do Apache Spark czy też Pandas. Otóż Apache Beam jest w swym zastosowaniu bardzo podobny do Apache Spark. Za jego pomocą możemy przetwarzać zbiory danych, zarówno z plików jak i ze strumieni danych, w sposób rozproszony. To znaczy na wielu maszynach na raz.
- W przeciwieństwie do dobrze znanego analitykom danych Pandas, nie jesteśmy ograniczeni do mocy naszego komputera oraz danych w pliku. Z łatwością nawiążemy połączenie z Kafka oraz googlowyh Pub/Sub, oraz będziemy wstanie przetwarzać dane w środowisku rozproszonym.
- W przeciwieństwie do dobrze znanego inżynierom danych Apache Spark, nie musimy wykorzystywać API Sparka, piszemy nasz kod z wykorzystaniem Apache Beam. Natomiast następnie będziemy mogli go wykonać na swoim komputerze, serwerze Apache Spark, Flirk czy też googlowych DataFlow.
Nie wykonamy w nim analizy danych porównywalnej do Pandas, tak jak nie zjemy zupy widelcem, oraz nie napiszemy tak efektywnego kodu jak w Apache Spark, tak jak nie porąbiemy drzewa nożem do masła. Ale nie to jest celem Apache Beam. Celem jest elastyczność i uniezależnienie od technologii. Czyli mamy do czynienia ze swoistym scyzorykiem szwajcarskim przetwarzania big data.
Możemy napisać nasz kod w Python, Java, Go oraz Scala
Możemy go uruchamiać na Flink, Spark, DataFlow, Sanza oraz Twister2.
Poniżej zainstalujemy Apache Beam jako bibliotekę na naszym komputerze oraz napiszemy proste przykładowe programy z jego użyciem. Na zakończenie, będziemy wiedzieć na czym polega idea Apache Beam, znać jego podstawy oraz wiedzieć jak poszerzać swoje umiejętności z wiązane z tą technologią.
Zaczynajmy!
Instalacja Apache Beam Python
Potrzebujemy dwóch pakietów – Apache Beam oraz wheel
pip install wheel apache-beam
Po ukończonej instalacji, możemy zaimportować bibliotekę apache beam oraz z niej skorzystać:
import apache_beam as beam var1 = [1,3,5,7] var2 = var1 | beam.Map(lambda x: x * 2) print(var2)
To co widzimy w powyższym kodzie, to zastosowanie transformacji dla zbioru var1. W tym przypadku, dla każdej wartości z listy var1, wykonujemy funkcję Map, która jest podobna do funkcji apply znanej z Pandas, mnożymy wartości razy 2 oraz zapisujemy wynik do listy var2.
Apache Beam – podstawowe pojęcia
Zanim przejdziemy do pisania bardziej złożonych programó, musimy zaznajomić się z podstawowymi pojęciami. Nie ma ich dużo:
- Batch processing – skończony zbiór danych, w postaci pliku
- Streaming – nieskończony zbiór danych, które napływają cały czas, na w systemach Kafka czy Pub/Sub
- Pipeline – definicja naszego przetwarzania danych, wliczając ich odczytanie, transformowanie oraz zapisywanie.
- PCollection – odczytane w pipeline dane. Coś na wzór DataFrame z Pandas. Może zawierać jeden lub więcej rekordów
- PTransform – operacja którą wykonujemy na PCollection. Może zwracać PCollection które jest takie same, zmienione, zmniejszone lub nie zwracać nic.
Standardowa struktura programu z użyciem Apache Beam jest następująca:
- Utworzenie pipeline
- Odczyt danych z plików, Kafka, PubSub lub innych źródeł
- Zapis danych do PCollection
- Wykonanie szeregu PTransform na naszym PCollection
- Zapis danych w miejsce docelowe, jak bazy danych, pliki czy też Kafka lub Pub/Sub
Ma to sens.
Apache beam – pierwszy program
W celu uproszczenia naszego pierwszego programu, skupimy się na batch processing. Czyli przetwarzaniu danych z plików. W tym celu skorzystamy z prostego zbioru danych Halloween. Zawiera regiony w US oraz informację o najbardziej popularnych kostiumach na Halloween.
Zbiór musimy poprać na nasz komputer, możemy go znaleźć tutaj. Wygląda tak:
Mało biznesowy, ale może być. Taki akurat miałem pod ręką. W sumie na 52 rekordy. Kolumna region to stan w US, kolumna '1′ to najbardziej popularny kostium. Plik należy pobrać i zapisąć na swoim komputerze.
- Wczytanie oraz wyświetlenie zawartości pliku
Aby wczytać i wyświetlić nasz pliku, musimy – otworzyć pipeline, wczytać plik, zastosować funkcję Map, do której przekażemy funkcję która ma być wykonana dla każdego rekordu:
import apache_beam as beam with beam.Pipeline() as p: lines = p | beam.io.ReadFromText('/home/analityk/projects/beam_new/Halloween.csv') lines | beam.Map(print)
Prościej się już chyba nie da. Funkcja Map jest naszą funkcją PTransform, która jest wykonywana dla 'lines’, czyli naszego PCollection.
Zwróćmy uwagę na znak '|’. Powoduje on, że podana po prawej stronie PTransformation jest stosowana na PCollection po lewej stronie. W wyniku tej operacji, dostajemy nowe PCollection.
Apache Beam wspiera dość wiele różnych PTransform które pozwalają nam na manipulacje zbiorem danych w elastyczny sposób. Poniżej znajduje się ich lista:
ParDo
GroupByKey
CoGroupByKey
Combine
Flatten
Partition
Nie będziemy omawiać wszystkich. Zobaczymy tylko podstawowe, aby przekonać się że ich użycie jest dość proste. Najczęściej stosowana z nich to ParDo.
- Zastosowanie uniwersalnej funkcji PTransform
ParDo jest bardzo podobne do Map. Pozwala nam na zastosowanie dowolnej funkcji na każdym rekordzie z naszego Pcollection. Różnica jest taka, że funkcja Map dla każdego rekordu, musi zwracać dokładnie jeden rekord. Natomiast transformacja ParDo, dla każdego rekordu zwraca 0, 1 lub więcej rekordów, przez co jest o wiele bardziej elastyczna i nadaje się do filtrowania, czyszczenia danych, modyfikacji itd.
Poniżej napiszemy naszą własna funkcję którą następnie za pomocą ParDo, przekażemy do wykonania dla każdego rekordu. Będzie ona robić 2 rzeczy – (1) Filtrować regiony w których najpopularniejszym kostiumem był kostium królika, (2) Dla wszystkich pozostałym regionów zwracać krotkę z nazwą kostiumu oraz nazwą regionu, czyli (kostium, region). Zobaczmy:
import apache_beam as beam class ToKeyValue(beam.DoFn): def process(self, element): x = element.split(',') if x[1] == 'Rabbit': return kv = (x[1],x[0]) return [kv] with beam.Pipeline() as p: lines = p | ( beam.io.ReadFromText('/home/analityk/projects/beam_new/Halloween.csv') | beam.ParDo(ToKeyValue()) | beam.Map(print) )
W wyniku otrzymaliśmy:
Czyli dokładnie tak jak chcieliśmy. Mamy tutaj po kolei- kostium oraz region. Strój rabbit został usunięty.
Zwróćmy uwagę że do naszej funkcji ParDo(), przekazaliśmy klasę, która musi dziedziczyć po beam.DoFn. Wewnątrz klasy jest funkcja process. Jest to nazwa którą narzuca nam Beam. Natomiast kod jest funkcji, jest naszym kodem, który chcemy wykonać dla każdego elementu w PCollection.
Spójrzmy jeszcze na inną PTransform
- Grupowanie danych
niewątpliwie bardzo często wykonywaną operacją jest grupowanie danych. w Apache beam możemy to zrobić za pomocą PTransform, takich jak GroupByKey, czy też Combine.
GroupByKey wymaga aby elementy w naszym PCollect były postaci (klucz, wartość). Ale tak już mamy, gdyż zadbaliśmy o to wcześniej. Teraz zgrupujmy wszystkie regiony dla każdego kostiumu:
import apache_beam as beam class ToKeyValue(beam.DoFn): def process(self, element): x = element.split(',') if x[1] == 'Rabbit': return kv = (x[1],x[0]) return [kv] with beam.Pipeline() as p: lines = p | ( beam.io.ReadFromText('/home/analityk/projects/beam_new/Halloween.csv') | beam.ParDo(ToKeyValue()) | beam.GroupByKey() | beam.Map(print) )
W rezultacie otrzymaliśmy kostium oraz listę regionów w którym jest najpopularniejszy. Proste.
Tak naprawdę jest to ten sam kod, w którym dodaliśmy jedną PTransform. GroupByKey(). I na tym polega pisanie kodów z użyciem Apache Beam. Zobaczmy jeszcze na jeden przykład:
Klasyka big data, czyli zliczanie słów
Bardzo często, pierwszym programem który piszemy w świecie Big data jest zliczanie, które słowo, wystąpiło ile razy, w danym pliku txt. Jest to coś w rodzaju Hello World kiedy uczymy się nowego języka programowania.
Jako plik txt weźmiemy tekst z książki Romeo i Julia, który możemy poprać tutaj.
Najprostsza wersja programy z udziałem Apache Beam, który zlicza słowa wygląda tak:
import apache_beam as beam import re with beam.Pipeline() as p: lines = p | "Wczytujemy plik" >> beam.io.ReadFromText('/home/dwc/datasets/romeo.txt')\ | "Każde słowo w nowej linii >>" beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))\ | "Zliczamy wystąpienie każdego słowa" >> beam.combiners.Count.PerElement()\ | "Wynik wypisujemy na ekran" >> beam.Map(print)\ | "Wynik zapisujemy do pliku" >> beam.io.WriteToText('/home/dwc/datasets/wordcount.txt')
Bardziej rozbudowane rozwiązanie, możemy obejrzeć na stronie apache beam, pod tym adresem.
Podsumowanie oraz dalsze kroki
Tak jak widzimy, przetwarzanie danych z użyciem Apache Beam jest relatywnie proste oraz przyjemne. Nie wiem czy google przyćmi Apache Spark czy też nie, jednak na pewno projekt będzie się rozwijać dalej, google będzie w niego mocno inwestować i jeżeli chcemy być inżynierem danych w google cloud platform, jest to technologia którą musimy znać.
Jako dalsze kroki polecam:
- Manual pisania kodu z użyciem Python oraz Apache beam- tutaj
- Absolutnie rewelacyjny artykuł na temat problemów z wiązanych z przetwarzaniem danych – tutaj