4.2. PySpark#

The content of this section is a jupyter book implementation of this tutorial.

Spark Cluster
Spark Cluster

4.2.1. Installation#

To install PySpark on your system, I recommend following this guide.

4.2.2. RDDs#

Resilient Distributed DAtasets (RDDs) can be seen as a sequence of elements. However, transformation on RDDs are evaluated lazily.
The official RDDs documentation covers extensively their usage.
Cheat Sheet: PySpark RDDs.

We initialize a SparkContext to interact with our local Spark Cluster.

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/15 01:29:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

We can parallelize file content or sequence of objects.

lines = sc.textFile("trees.csv")
lines
trees.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
rdd = sc.parallelize([i for i in range (10)])
rdd
ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:274

4.2.2.1. Actions#

Spark actions evaluate the rdd and usually returns data.
Some useful actions:

  • collect() : This function will collect the RDD into a python list.

  • take(k) : This function create an RDD from k element within the RDD.

  • count() : This function returns the number of element within the RDD.

Refer to the pyspark documentation for a complete list of actions.

rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
lines.take(3)
[Stage 1:>                                                          (0 + 1) / 1]
                                                                                
['Tree,Park Name,x,y', '1,Canada Park,2,3', '2,Otter Park,63,21']
lines.count()
4

4.2.2.2. Tranformations#

Spark transformation lazily apply a function to a RDD.
Refer to the pyspark documentation for a complete list of transformations.

map(func) : Return a new distributed dataset formed by passing each element of the source through a function func.

rdd.map(lambda x: x*x).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

flatMap(func) : Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

rdd.flatMap(lambda x: [x*x, x*x*x] if x % 2 == 1 else []).collect()
[1, 1, 9, 27, 25, 125, 49, 343, 81, 729]

filter(func) : Return a new dataset formed by selecting those elements of the source on which func returns true.

rdd.filter(lambda x: x % 3 == 1).collect()
[1, 4, 7]

reduceByKey(func) : When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V.

data = [(i%3, i) for i in range(10)]
rdd = sc.parallelize(data)
rdd.reduceByKey(lambda x, y: x+y).collect()
[(0, 18), (2, 15), (1, 12)]

intersection(dataset) : Return a new RDD that contains the intersection of elements in the source dataset and the argument.

data1 = [i for i in range(10)]
data2 = [i for i in range(5, 15)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
rdd1.intersection(rdd2).collect()
[8, 5, 9, 6, 7]

union(func): Return a new dataset that contains the union of the elements in the source dataset and the argument.

data1 = [i for i in range(10)]
data2 = [i for i in range(5, 15)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
rdd1.union(rdd2).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

distinct() : Return a new dataset that contains the distinct elements of the source dataset.

data = [i%3 for i in range(10)]
rdd = sc.parallelize(data)
rdd.distinct().collect()
[0, 2, 1]

zipWithIndex() : Assign an index to each element in the RDD.

data = [i for i in range(10)]
rdd = sc.parallelize(data)
rdd.zipWithIndex().collect()
[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9)]

groupByKey() : When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.

data = [(i%3, i) for i in range(10)]
rdd = sc.parallelize(data)
rdd.groupByKey().collect()
[(0, <pyspark.resultiterable.ResultIterable at 0x7f7c489138e0>),
 (2, <pyspark.resultiterable.ResultIterable at 0x7f7c489131f0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7f7c48913d60>)]

4.2.3. Dataframes#

Spark dataframes are tabular data. Similar to tables in SQL or spreadsheets.

Tree

Park Name

x

y

1

Canada Park

2

3

2

Otter Park

63

21

3

Canada Park

2

3

We instantiate a Spark session to interact with our local cluster.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

To load a csv file into a dataframe:

filename = "trees.csv"
df = spark.read.csv(filename, header=True, mode="DROPMALFORMED")
df = df .withColumn("x", df.x.cast("int")) \
        .withColumn("y", df.y.cast("int"))
df
DataFrame[Tree: string, Park Name: string, x: int, y: int]

show() : Display the first rows of a dataframe.

df.show()
+----+-----------+---+---+
|Tree|  Park Name|  x|  y|
+----+-----------+---+---+
|   1|Canada Park|  2|  3|
|   2| Otter Park| 63| 21|
|   3|Canada Park|  2| 25|
+----+-----------+---+---+

select(column name, another column name) : Return a dataframe with only the selected columns.

df.select("Park Name", "Tree").take(5)
[Row(Park Name='Canada Park', Tree='1'),
 Row(Park Name='Otter Park', Tree='2'),
 Row(Park Name='Canada Park', Tree='3')]

where("action") : Return a dataframe where only the condition applies.

df.where("x = 2").count()
2

groupBy(): Group rows together using aggregate function such as min, max or avg.

df.groupBy("x").count().collect()
[Row(x=63, count=1), Row(x=2, count=2)]
df.groupBy("x").sum().show()
+---+------+------+
|  x|sum(x)|sum(y)|
+---+------+------+
| 63|    63|    21|
|  2|     4|    28|
+---+------+------+
df.groupBy("x").mean("y").show()
+---+------+
|  x|avg(y)|
+---+------+
| 63|  21.0|
|  2|  14.0|
+---+------+

orderBy("column1", "column2", ...): Order the dataframe with one or more column.

from pyspark.sql.functions import desc
df.orderBy(desc("Park Name")).show()
+----+-----------+---+---+
|Tree|  Park Name|  x|  y|
+----+-----------+---+---+
|   2| Otter Park| 63| 21|
|   1|Canada Park|  2|  3|
|   3|Canada Park|  2| 25|
+----+-----------+---+---+

limit(number): Drop all rows after the number th row.

df.limit(2).show()
+----+-----------+---+---+
|Tree|  Park Name|  x|  y|
+----+-----------+---+---+
|   1|Canada Park|  2|  3|
|   2| Otter Park| 63| 21|
+----+-----------+---+---+

createDataFrame(rdd): Create a dataframe from an RDD.

from pyspark.sql import Row
lines = sc.textFile("trees.csv")
header = lines.first()
lines = lines.filter(lambda l: l != header)  # Remove header
lines = lines.map(lambda x: x.split(","))
trees_rdd = lines.map(lambda p: Row(Tree=p[0], Park_Name=p[1], x=p[2], y=p[3]))
trees_df = spark.createDataFrame(trees_rdd)
trees_df.show()
+----+-----------+---+---+
|Tree|  Park_Name|  x|  y|
+----+-----------+---+---+
|   1|Canada Park|  2|  3|
|   2| Otter Park| 63| 21|
|   3|Canada Park|  2| 25|
+----+-----------+---+---+

toDF(name1, name2, name3): Rename the columns of the data frame.

trees_df = trees_df.toDF("ID", "Park Name", "lat", "long")
trees_df.show()
+---+-----------+---+----+
| ID|  Park Name|lat|long|
+---+-----------+---+----+
|  1|Canada Park|  2|   3|
|  2| Otter Park| 63|  21|
|  3|Canada Park|  2|  25|
+---+-----------+---+----+

join(): Combine dataframe based on common column.

df.join(trees_df, df["Tree"] == trees_df.ID).drop("ID").show()
+----+-----------+---+---+-----------+---+----+
|Tree|  Park Name|  x|  y|  Park Name|lat|long|
+----+-----------+---+---+-----------+---+----+
|   1|Canada Park|  2|  3|Canada Park|  2|   3|
|   2| Otter Park| 63| 21| Otter Park| 63|  21|
|   3|Canada Park|  2| 25|Canada Park|  2|  25|
+----+-----------+---+---+-----------+---+----+