Colab zaiskrzył podczas instalacji Apache Sparka

wpis w: Apache, Big Data, chmura, migawka, Python, Spark | 2

Jednym z podstawowych problemów młodego adepta sztuki torturowania danych i  analizy dużych zbiorów jest spory próg wejścia.  Gdzie się tego można nauczyć ?  Jednym z bardziej znanych narzędzi jest Apache Spark. Okazuje się, że  uruchomienie tego oprogramowania  w usłudze Google Colab jest banalne. Systemem operacyjnych z jakiego korzystamy jest Ubuntu 18.04. Oczywiście taka konfiguracja nie umożliwi nam przetworzenia ogromnych ilości danych, ale pozwoli na rozpoznanie narzędzi, podstawowych  pojęć i zbudowania nowych kompetencji.

 

Po uruchomieniu notatnika  w języku Python 3.

W pierwszej komórce wklejamy

!wget https://raw.githubusercontent.com/djkormo/colab-examples/master/spark/install.bash -O install.bash
--2019-10-31 22:32:44--  https://raw.githubusercontent.com/djkormo/colab-examples/master/spark/install.bash
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 382 [text/plain]
Saving to: ‘install.bash’

install.bash        100%[===================>]     382  --.-KB/s    in 0s      

2019-10-31 22:32:44 (59.8 MB/s) - ‘install.bash’ saved [382/382]

Ściągnięty w ten sposób plik instalacyjny zawiera zaledwie kilka linii kodu

!cat install.bash
#!/bin/bash

cd /

mkdir -p content

cd content/ 

apt-get install openjdk-8-jdk-headless

wget http://apache.crihan.fr/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz -O spark-2.4.4-bin-hadoop2.7.tgz

tar xf spark-2.4.4-bin-hadoop2.7.tgz

pip install -q findspark
pip install spark-nlp

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
SPARK_HOME=/content/spark-2.4.4-bin-hadoop2.7

 

Instalujemy JDK dla Javy 1.8, następnie ściągamy z repozytorium Apache Sparka w postaci pliku .tgz, rozpakowujemy zawartość i ustawiamy dwie zmienne środowiskowe.

Uruchamiany skrypt instalacyjny

!bash install.bash

Nadeszła pora na uruchomienie przykładowego kawałka kodu

Najpierw uruchamiamy sesję Sparka

import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_ME'] = '/content/spark-2.4.4-bin-hadoop2.7'
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("First App") \
    .master("local[*]")\
    .getOrCreate()

print(spark)

Na wyjściu:

pyspark.sql.session.SparkSession object at 0x7fe1c00a5128

Potem uruchamiamy kontekst Sparka.

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
print(sc)

Na wyjściu:

SparkContext master=local[*] appName=First App

Musimy mieć świadomość, że każda zmiana kodu, instalacja danych, zastosowane algorytmy są uruchamiane na klastrze obliczeniowym. Dostęp do infrastruktury jest za pomocą kontekstu Sparka. Patrząc z wysokiego poziomu, każda aplikacja Sparka, składa się z programu sterownika, który uruchamia na klastrze różne równoległe procesy.Program sterownika ma  dostęp do Sparka przez obiektu kontekstowy (SparkContext). Mająć dostęp do kontekstu mamy już możliwość budowy obiektów zawierających dane, które początkowo mogą się nam wydawać jako zwykłe i znane z Pythona struktury danych. Nic bardziej mylnego, obiekty znane jako  Resilent Distributed Datasets (RDD) lub DataFrame (DF) są umieszczane równolegle na wielu węzłach zwanych  wykonawcami (worker node).

Na RDD będziemy wykonywać różne metody, które dzielimy na transformacje i akcje. Transformacje przekształcają nam RDD w inne RDD, zaś akcje generują z RDD wynik który może być zwrócony do sterownika bądź zapisany w HDFS’ie lub innym obsługiwanym źródle danych. Wszystkie transformacje są leniwe (lazy), co oznacza , będą wykonane wyłącznie wtedy, gdy będzie to konieczne. Zamiast manipulacji danymi mamy tu jedynie rejestrację metadanych, które pozwalają na zbudowanie RDD.

from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)
print(sqlContext)

Na wyjściu:

pyspark.sql.context.SQLContext object at 0x7fe1cf605438

Wczytajmy nasz plik jaka ramkę danych (dataframe)

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)	

Sprawdźmy jaka jest struktura tych danych.

df.printSchema()

Na wyjściu:

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)

Zróbmy kilka przykładów z zastosowanie RDD.

Utwórzmy pierwszy obiekt RDD o nazwie rdd-1, do którego załadowaliśmy listę składającą się z 10 cyfr. To prosty obiekt, ale na jego podstawie pokażę podstawowe transformacje i akcje.  Utworzenie obiektu z danych wewnętrznych (nie z plików) umożliwia nam funkcja parallelize.

rdd_1=sc.parallelize([0,1,2,3,4,5,6,7,8,9])
print(rdd_1)

Na wyjściu:

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Pierwszą transformację, którą można zrobić jest map.

Map bierze każdy element zbioru RDD jako element wejściowy przekształca go za pomocą funkcji w element wyjściowy. W poniższym przykładzie przemnożyliśmy elementy przez liczbę dwa. Dla osób, które nie znają składni lambda, mamy tu do czynienia z budową funkcji która przekształca element wejściowy na element wejściowy. Nie wiemy jaki jest typ elementu wejściowego, nie musi to być  typ skalarny. Nie ma potrzeby budowania tego w postaci def + ciało funkcji. Składnia wygląda w uproszczeniu tak lambda wejscie: wyjscie, gdzie wyjście zawiera algorytm.  Myślę, ze po kilku poniższych przykładach stanie się to bardziej czytelne.

rdd_2=rdd_1.map(lambda x: x*2)
print(rdd_2)

Na wyjściu:

PythonRDD[1] at RDD at PythonRDD.scala:53

Wykonajmy akcję collect na obu zbiorach, dzięki czemu będzie można zobaczyć ich zawartość. Uwaga, nie należy stosować tej akcji na bardzo dużych zbiorach danych, gdyż mogą się one nie zmieścić w lokalnej pamięci. Akcje powinny być wykonywane tylko kiedy są konieczne (!).

rdd_1.collect()
rdd_2.collect()

Na wyjściu:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Następną transformację, z której możemy skorzystać jest flatMap.

Funkcja  dla każdego elementu na wejściu zwraca kolekcję elementów, w tym przypadku rozkładamy każdy z napisów na wyrazy przyjmująć spację jako separator.

rdd_lines = sc.parallelize(["Pan Tadeusz","Pan Samochodzik i Templariusze", "Poznajemy Sparka", "Dwadzieścia tysięcy mil podmorskiej żeglugi"])
rdd_words = rdd_lines.flatMap(lambda line: line.split(" "))
print(rdd_words.collect())

Na wyjściu:

['Pan', 'Tadeusz', 'Pan', 'Samochodzik', 'i', 'Templariusze', 'Poznajemy', 'Sparka', 'Dwadzieścia', 'tysięcy', 'mil', 'podmorskiej', 'żeglugi']

Kolejną transformacją jest operacja filtrowania filter, która tworzy podzbiór spełniający warunki funkcji lambda zwracącej tylko wartości logiczne. W naszym przypadku wymagamy, by element był mniejszy od sześciu

rdd_numbers=sc.parallelize([0,1,2,3,4,5,6,7,8,9])
rdd_numbers_filtered=rdd_numbers.filter(lambda x:<6)
print(rdd_numbers_filtered.collect())

Na wyjściu:

[0, 1, 2, 3, 4, 5]

Kolejna transformacja o nazwie distinct umożliwia usunięcie zduplikowanych wartości w zbiorze.

rdd_nonunique = sc.parallelize([1,2,2,3,4,4,5,6,7,8,8,9,9,9])
rdd_unique = rdd_nonunique.distinct()
print(rdd_unique.collect())

Na wyjściu:

[2, 4, 6, 8, 1, 3, 5, 7, 9]

Jak wydać podwójne liczby dwa, cztery, osiem i potrójna dziewięć zostały zredukowane do jednego wystąpienia..

Istnieje możliwość utworzenia mniejszego zbioru danych za pomocą transformacji sample, która przyjmuje trzy argumenty

(czy zastępować, ułamek liczby elementów w zbiorze wyjściowym/wejściowym, ziarno dla generatora liczb pseudolosowych)

rdd_numbers = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
rdd_samples = rdd_numbers.sample(False, 0.5,123)
print(rdd_samples.collect())

Na wyjściu.

[1, 2, 3, 4, 6, 7]

Ustawienie ziarna na 123 oznacza, że z tego samego zbioru wejściowego zostanie wylosowany za każdym razem taki sam zbiór wyjściowy. Należy o tym pamiętać.

 

Wszystkie powyższe przykłady dotyczyły jednego zbioru RDD.

Utwórzmy dwa zbiory rdd_1 i  rdd_2. Elementy w nich mogą się pokrywać. Transformacja union umożliwia utworzenie nowego obiektu zawierającego ich sumę.

Składnia wygląda następująco  rdd_wy_wyjsciu=rdd_pierwsze.union(rdd_drugie)

rdd_1 = sc.parallelize([1,2,3,4,5])
rdd_2 = sc.parallelize([4,5,6,7,9])
union = rdd_1.union(rdd_2)
print(union.collect())

W posobny sposób można skorzystać z operacji intersetion (część wspólna), substract (różnica, czyli elementy, które są w pierwszym obiekcie, a nie ma ich w drugim), cartesian (iloczyn kartezjański). Składnia poleceń jest analogiczna jak dla union.

intersection = rdd_1.intersection(rdd_2)
print("intersection:",intersection.collect())
subtract = rdd_1.subtract(rdd_2)
print("subtract:",subtract.collect())
cartesian = rdd_1.cartesian(rdd_2)
print("cartesian:",cartesian.collect())

Na wyjściu:

intersection: [4, 5]
subtract: [1, 2, 3]
cartesian: [(1, 4), (1, 5), (2, 4), (2, 5), (1, 6), (1, 7), (2, 6), (2, 7), (1, 9), (2, 9), (3, 4), (3, 5), (4, 4), (4, 5), (5, 4), (5, 5), (3, 6), (3, 7), (4, 6), (4, 7), (3, 9), (4, 9), (5, 6), (5, 7), (5, 9)

Nadeszła pora na prezentację kilku akcji.

Akcja reduce umożliwia zamianę wielu elementów zbioru na jeden. W naszym przykładzie  jest to mnożenie

rdd_numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd_multis = rdd_numbers.reduce(lambda x, y: x * y)
print(rdd_multis)
print(type(rdd_multis))

Na wyjściu:

362880
<class 'int'>

Istnieje też możliwość prostego przemnożenia przez stałą wartość. Umożliwia to operacja fold, której pierwszym elementem jest wartość przez którą będzie przemnożony wynik.

rdd_numbers = sc.parallelize([1,2,3,4,5])
rdd_zero = rdd_numbers.fold(0, lambda x, y: x * y)
print(rdd_zero)
print(type(rdd_zero))

Na wyjściu:

0
<class 'int'>

Ten sam kod, ale zamiast zera zastosowaliśmy wartość minus jeden.

rdd_numbers = sc.parallelize([1,2,3,4,5])
rdd_minus = rdd_numbers.fold(-1, lambda x, y: x * y)
print(rdd_minus)
print(type(rdd_minus))

Na wyjściu:

-120
<class 'int'>

Na koniec zostawię dwie akcje count oraz take.

 

Akcja count zwraca liczbę elementów danego obiektu RDD.

rdd_numbers = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20])
rdd_numbers.count()

Na wyjściu:

20

Akcja take zwraca pierwszych x elementów z obiektu RDD. W naszym przypadku  jest to liczba dziesięć.

rdd_numbers = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20])
rdd_numbers.take(10)

Na wyjściu:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

 

Pokazałem jedynie kilka przykładów wykorzystujących stos technologiczny Apache Sparka. Środowisko Colab można wykorzystać jednak jeszcze efektywniej korzystając z SSH i VNC. Wtedy istnieje możliwość uruchamiania gotowych skryptów i kontroli przy wykorzystaniu Spark GUI i  prosty podgląd obiektów

 

O tym będzie kolejny wpis, gdzie spróbujemy przeanalizować pracę nad dużym zbiorem.

 

Przykłady umieściłem w repozytorium w postaci pliku .ipynb

https://github.com/djkormo/colab-examples/blob/master/spark/Colab_apache_spark_intro_primer.ipynb

 

Literatura:

 

Install Apache Spark on Ubuntu 19.04/18.04 & Debian 10/9/8

https://towardsdatascience.com/how-to-use-pyspark-on-your-computer-9c7180075617

https://www.tutorialspoint.com/pyspark/pyspark_sparkcontext.htm

https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf

2 Responses

  1. Tomasz Cieplak

    Kolejny świetny artykuł. Uruchomienie Apache Spark na maszynie lokalnej wymaga sporo przygotowań. A tutaj mamy bardzo pomocny przepis na to jak zacząć, pracować i uczyć się Sparka bez walki z całym otoczeniem. A do tego jeszcze Jupyter i Pyhon3. A co będzie dalej? Spark i GPU na platformie Google Colab? Już nie mogę się doczekać!

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *

Witryna wykorzystuje Akismet, aby ograniczyć spam. Dowiedz się więcej jak przetwarzane są dane komentarzy.