Jednym z podstawowych problemów młodego adepta sztuki torturowania danych i analizy dużych zbiorów jest spory próg wejścia. Gdzie się tego można nauczyć ? Jednym z bardziej znanych narzędzi jest Apache Spark. Okazuje się, że uruchomienie tego oprogramowania w usłudze Google Colab jest banalne. Systemem operacyjnych z jakiego korzystamy jest Ubuntu 18.04. Oczywiście taka konfiguracja nie umożliwi nam przetworzenia ogromnych ilości danych, ale pozwoli na rozpoznanie narzędzi, podstawowych pojęć i zbudowania nowych kompetencji.
Po uruchomieniu notatnika w języku Python 3.
W pierwszej komórce wklejamy
!wget https://raw.githubusercontent.com/djkormo/colab-examples/master/spark/install.bash -O install.bash
--2019-10-31 22:32:44-- https://raw.githubusercontent.com/djkormo/colab-examples/master/spark/install.bash Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 382 [text/plain] Saving to: ‘install.bash’ install.bash 100%[===================>] 382 --.-KB/s in 0s 2019-10-31 22:32:44 (59.8 MB/s) - ‘install.bash’ saved [382/382]
Ściągnięty w ten sposób plik instalacyjny zawiera zaledwie kilka linii kodu
!cat install.bash
#!/bin/bash cd / mkdir -p content cd content/ apt-get install openjdk-8-jdk-headless wget http://apache.crihan.fr/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz -O spark-2.4.4-bin-hadoop2.7.tgz tar xf spark-2.4.4-bin-hadoop2.7.tgz pip install -q findspark pip install spark-nlp export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 SPARK_HOME=/content/spark-2.4.4-bin-hadoop2.7
Instalujemy JDK dla Javy 1.8, następnie ściągamy z repozytorium Apache Sparka w postaci pliku .tgz, rozpakowujemy zawartość i ustawiamy dwie zmienne środowiskowe.
Uruchamiany skrypt instalacyjny
!bash install.bash
Nadeszła pora na uruchomienie przykładowego kawałka kodu
Najpierw uruchamiamy sesję Sparka
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_ME'] = '/content/spark-2.4.4-bin-hadoop2.7'
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("First App") \
.master("local[*]")\
.getOrCreate()
print(spark)
Na wyjściu:
pyspark.sql.session.SparkSession object at 0x7fe1c00a5128
Potem uruchamiamy kontekst Sparka.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
print(sc)
Na wyjściu:
SparkContext master=local[*] appName=First App
Musimy mieć świadomość, że każda zmiana kodu, instalacja danych, zastosowane algorytmy są uruchamiane na klastrze obliczeniowym. Dostęp do infrastruktury jest za pomocą kontekstu Sparka. Patrząc z wysokiego poziomu, każda aplikacja Sparka, składa się z programu sterownika, który uruchamia na klastrze różne równoległe procesy.Program sterownika ma dostęp do Sparka przez obiektu kontekstowy (SparkContext). Mająć dostęp do kontekstu mamy już możliwość budowy obiektów zawierających dane, które początkowo mogą się nam wydawać jako zwykłe i znane z Pythona struktury danych. Nic bardziej mylnego, obiekty znane jako Resilent Distributed Datasets (RDD) lub DataFrame (DF) są umieszczane równolegle na wielu węzłach zwanych wykonawcami (worker node).
Na RDD będziemy wykonywać różne metody, które dzielimy na transformacje i akcje. Transformacje przekształcają nam RDD w inne RDD, zaś akcje generują z RDD wynik który może być zwrócony do sterownika bądź zapisany w HDFS’ie lub innym obsługiwanym źródle danych. Wszystkie transformacje są leniwe (lazy), co oznacza , będą wykonane wyłącznie wtedy, gdy będzie to konieczne. Zamiast manipulacji danymi mamy tu jedynie rejestrację metadanych, które pozwalają na zbudowanie RDD.
from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)
print(sqlContext)
Na wyjściu:
pyspark.sql.context.SQLContext object at 0x7fe1cf605438
Wczytajmy nasz plik jaka ramkę danych (dataframe)
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Sprawdźmy jaka jest struktura tych danych.
df.printSchema()
Na wyjściu:
root |-- x: integer (nullable = true) |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- educational-num: integer (nullable = true) |-- marital-status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- gender: string (nullable = true) |-- capital-gain: integer (nullable = true) |-- capital-loss: integer (nullable = true) |-- hours-per-week: integer (nullable = true) |-- native-country: string (nullable = true) |-- income: string (nullable = true)
Zróbmy kilka przykładów z zastosowanie RDD.
Utwórzmy pierwszy obiekt RDD o nazwie rdd-1, do którego załadowaliśmy listę składającą się z 10 cyfr. To prosty obiekt, ale na jego podstawie pokażę podstawowe transformacje i akcje. Utworzenie obiektu z danych wewnętrznych (nie z plików) umożliwia nam funkcja parallelize.
rdd_1=sc.parallelize([0,1,2,3,4,5,6,7,8,9])
print(rdd_1)
Na wyjściu:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
Pierwszą transformację, którą można zrobić jest map.
Map bierze każdy element zbioru RDD jako element wejściowy przekształca go za pomocą funkcji w element wyjściowy. W poniższym przykładzie przemnożyliśmy elementy przez liczbę dwa. Dla osób, które nie znają składni lambda, mamy tu do czynienia z budową funkcji która przekształca element wejściowy na element wejściowy. Nie wiemy jaki jest typ elementu wejściowego, nie musi to być typ skalarny. Nie ma potrzeby budowania tego w postaci def + ciało funkcji. Składnia wygląda w uproszczeniu tak lambda wejscie: wyjscie, gdzie wyjście zawiera algorytm. Myślę, ze po kilku poniższych przykładach stanie się to bardziej czytelne.
rdd_2=rdd_1.map(lambda x: x*2)
print(rdd_2)
Na wyjściu:
PythonRDD[1] at RDD at PythonRDD.scala:53
Wykonajmy akcję collect na obu zbiorach, dzięki czemu będzie można zobaczyć ich zawartość. Uwaga, nie należy stosować tej akcji na bardzo dużych zbiorach danych, gdyż mogą się one nie zmieścić w lokalnej pamięci. Akcje powinny być wykonywane tylko kiedy są konieczne (!).
rdd_1.collect()
rdd_2.collect()
Na wyjściu:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Następną transformację, z której możemy skorzystać jest flatMap.
Funkcja dla każdego elementu na wejściu zwraca kolekcję elementów, w tym przypadku rozkładamy każdy z napisów na wyrazy przyjmująć spację jako separator.
rdd_lines = sc.parallelize(["Pan Tadeusz","Pan Samochodzik i Templariusze", "Poznajemy Sparka", "Dwadzieścia tysięcy mil podmorskiej żeglugi"])
rdd_words = rdd_lines.flatMap(lambda line: line.split(" "))
print(rdd_words.collect())
Na wyjściu:
['Pan', 'Tadeusz', 'Pan', 'Samochodzik', 'i', 'Templariusze', 'Poznajemy', 'Sparka', 'Dwadzieścia', 'tysięcy', 'mil', 'podmorskiej', 'żeglugi']
Kolejną transformacją jest operacja filtrowania filter, która tworzy podzbiór spełniający warunki funkcji lambda zwracącej tylko wartości logiczne. W naszym przypadku wymagamy, by element był mniejszy od sześciu
rdd_numbers=sc.parallelize([0,1,2,3,4,5,6,7,8,9])
rdd_numbers_filtered=rdd_numbers.filter(lambda x:<6)
print(rdd_numbers_filtered.collect())
Na wyjściu:
[0, 1, 2, 3, 4, 5]
Kolejna transformacja o nazwie distinct umożliwia usunięcie zduplikowanych wartości w zbiorze.
rdd_nonunique = sc.parallelize([1,2,2,3,4,4,5,6,7,8,8,9,9,9])
rdd_unique = rdd_nonunique.distinct()
print(rdd_unique.collect())
Na wyjściu:
[2, 4, 6, 8, 1, 3, 5, 7, 9]
Jak wydać podwójne liczby dwa, cztery, osiem i potrójna dziewięć zostały zredukowane do jednego wystąpienia..
Istnieje możliwość utworzenia mniejszego zbioru danych za pomocą transformacji sample, która przyjmuje trzy argumenty
(czy zastępować, ułamek liczby elementów w zbiorze wyjściowym/wejściowym, ziarno dla generatora liczb pseudolosowych)
rdd_numbers = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
rdd_samples = rdd_numbers.sample(False, 0.5,123)
print(rdd_samples.collect())
Na wyjściu.
[1, 2, 3, 4, 6, 7]
Ustawienie ziarna na 123 oznacza, że z tego samego zbioru wejściowego zostanie wylosowany za każdym razem taki sam zbiór wyjściowy. Należy o tym pamiętać.
Wszystkie powyższe przykłady dotyczyły jednego zbioru RDD.
Utwórzmy dwa zbiory rdd_1 i rdd_2. Elementy w nich mogą się pokrywać. Transformacja union umożliwia utworzenie nowego obiektu zawierającego ich sumę.
Składnia wygląda następująco rdd_wy_wyjsciu=rdd_pierwsze.union(rdd_drugie)
rdd_1 = sc.parallelize([1,2,3,4,5]) rdd_2 = sc.parallelize([4,5,6,7,9]) union = rdd_1.union(rdd_2) print(union.collect())
W posobny sposób można skorzystać z operacji intersetion (część wspólna), substract (różnica, czyli elementy, które są w pierwszym obiekcie, a nie ma ich w drugim), cartesian (iloczyn kartezjański). Składnia poleceń jest analogiczna jak dla union.
intersection = rdd_1.intersection(rdd_2) print("intersection:",intersection.collect()) subtract = rdd_1.subtract(rdd_2) print("subtract:",subtract.collect()) cartesian = rdd_1.cartesian(rdd_2) print("cartesian:",cartesian.collect())
Na wyjściu:
intersection: [4, 5] subtract: [1, 2, 3] cartesian: [(1, 4), (1, 5), (2, 4), (2, 5), (1, 6), (1, 7), (2, 6), (2, 7), (1, 9), (2, 9), (3, 4), (3, 5), (4, 4), (4, 5), (5, 4), (5, 5), (3, 6), (3, 7), (4, 6), (4, 7), (3, 9), (4, 9), (5, 6), (5, 7), (5, 9)
Nadeszła pora na prezentację kilku akcji.
Akcja reduce umożliwia zamianę wielu elementów zbioru na jeden. W naszym przykładzie jest to mnożenie
rdd_numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd_multis = rdd_numbers.reduce(lambda x, y: x * y)
print(rdd_multis)
print(type(rdd_multis))
Na wyjściu:
362880 <class 'int'>
Istnieje też możliwość prostego przemnożenia przez stałą wartość. Umożliwia to operacja fold, której pierwszym elementem jest wartość przez którą będzie przemnożony wynik.
rdd_numbers = sc.parallelize([1,2,3,4,5])
rdd_zero = rdd_numbers.fold(0, lambda x, y: x * y)
print(rdd_zero)
print(type(rdd_zero))
Na wyjściu:
0 <class 'int'>
Ten sam kod, ale zamiast zera zastosowaliśmy wartość minus jeden.
rdd_numbers = sc.parallelize([1,2,3,4,5])
rdd_minus = rdd_numbers.fold(-1, lambda x, y: x * y)
print(rdd_minus)
print(type(rdd_minus))
Na wyjściu:
-120 <class 'int'>
Na koniec zostawię dwie akcje count oraz take.
Akcja count zwraca liczbę elementów danego obiektu RDD.
rdd_numbers = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20])
rdd_numbers.count()
Na wyjściu:
20
Akcja take zwraca pierwszych x elementów z obiektu RDD. W naszym przypadku jest to liczba dziesięć.
rdd_numbers = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20])
rdd_numbers.take(10)
Na wyjściu:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Pokazałem jedynie kilka przykładów wykorzystujących stos technologiczny Apache Sparka. Środowisko Colab można wykorzystać jednak jeszcze efektywniej korzystając z SSH i VNC. Wtedy istnieje możliwość uruchamiania gotowych skryptów i kontroli przy wykorzystaniu Spark GUI i prosty podgląd obiektów
O tym będzie kolejny wpis, gdzie spróbujemy przeanalizować pracę nad dużym zbiorem.
Przykłady umieściłem w repozytorium w postaci pliku .ipynb
https://github.com/djkormo/colab-examples/blob/master/spark/Colab_apache_spark_intro_primer.ipynb
Literatura:
https://towardsdatascience.com/how-to-use-pyspark-on-your-computer-9c7180075617
https://www.tutorialspoint.com/pyspark/pyspark_sparkcontext.htm
https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf
Tomasz Cieplak
Kolejny świetny artykuł. Uruchomienie Apache Spark na maszynie lokalnej wymaga sporo przygotowań. A tutaj mamy bardzo pomocny przepis na to jak zacząć, pracować i uczyć się Sparka bez walki z całym otoczeniem. A do tego jeszcze Jupyter i Pyhon3. A co będzie dalej? Spark i GPU na platformie Google Colab? Już nie mogę się doczekać!
djkormo
Apache Spark i GPU razem w połączeniu uczenia głębokiego i big data ? Nie planowałem, ale wezmę to sobie do kolejki realizacji. Znalazłem taką prezentację.
https://fr.slideshare.net/continuumio/gpu-computing-with-apache-spark-and-python. Strona 34.