Apache Spark

Introduction

Apache Spark is a general purpose cluster computing system which can process big data. Spark is different from its previous tools available in market because it is fast due to the in memory processing. In Spark memory is used for storage, whereas in map reduce memory is just used for computation. Spark has different libraries which makes the usage of it even better. For example, it provides Spark SQL, ML, Streaming data, Graph processing.
Lets take a look at our hdfs structure to see where files exist

[munjesh1@gw03 ~]$ hadoop fs -ls /public/retail_db

- Found 6 items
- drwxr-xr-x   - hdfs hdfs          0 2016-12-19 03:52 /public/retail_db/categories
- drwxr-xr-x   - hdfs hdfs          0 2016-12-19 03:52 /public/retail_db/customers
- drwxr-xr-x   - hdfs hdfs          0 2016-12-19 03:52 /public/retail_db/departments
- drwxr-xr-x   - hdfs hdfs          0 2016-12-19 03:52 /public/retail_db/order_items
- drwxr-xr-x   - hdfs hdfs          0 2016-12-19 03:52 /public/retail_db/orders
- drwxr-xr-x   - hdfs hdfs          0 2016-12-19 03:52 /public/retail_db/products

[munjesh1@gw03 ~]$ pyspark –master yarn –conf spark.ui.port=12888

Welcome to

    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /__ / .__/\_,_/_/ /_/\_\   version 1.6.3
    /_/


-Using Python version 2.7.5 (default, Jun 20 2019 20:27:34) -SparkContext available as sc, HiveContext available as sqlContext.

To create RDD from textfile in HDFS, we use the textFile function using sc context.

In [ ]:
help(sc)
orderItems = sc.textFile("/public/retail_db/order_items")
type(orderItems)
#<class 'pyspark.rdd.RDD'>
help(orderItems)
In [ ]:
#To get first record
orderItems.first()
In [ ]:
#To get first 10 records
for i in orderItems.take(10): print(i)

When we launch spark shell, you can check the console (http://rm01.itversity.com:18080/history/application_1565300265360_61685/jobs/) for the jobs running. You will see that till the time we run an action, not job is triggered. This shows how the Lazy evalualtion works. No job is launched till the time we run an action. This would have only created a DAG by that time. We can check the DAG details by using RDDname.toDebugString()

Collect

To convert a RDD into a collection we can use RDDname.collect()

Parallelize

And to convert a collection into RDD we use collectionName.parallelize()

In [ ]:
l = range(1,10000)
lRDD = sc.paralellize(l)
type(lRDD)
#<class 'pyspark.rdd.RDD'>
collec = lRDD.collect()
type(collec)
<type 'list'> 

To read data from a local file system and convert it into a RDD, we will use normal python api’s to open the file. So, we converted a file into collection, then that collection into RDD and used the spark API’s to preview.

In [ ]:
productsRaw = open("/data/retail_db/products/part-00000").read().splitlines()
type(productsRaw)
productsRDD = sc.parallelize(productsRaw)
type(productsRDD)
productsRDD.first()
for i in productsRDD.take(10): print(i)
productsRDD.count()

SQLContext-

SQLContext is a class and is used for initializing the functionalities of Spark SQL. SparkContext class object (sc) is required for initializing SQLContext class object.
SQLContext have 2 APIs to read data of different file formats.
load – typically takes 2 arguments, path and format
read – have an interface for each of the file formats (e.g.: read.json)
Following are the file formats supported
text
orc
parquet
json (example showed)
csv (3rd party plugin)
avro (3rd party plugin, but Cloudera clusters get by default)
In [ ]:
#Both the read and load api's will create data frames from the json files.
sqlContext.load("/public/retail_db_json/order_items", "json").show()
sqlContext.read.json("/public/retail_db_json/order_items").show()
In [ ]:
#String Manipulation
orders = sc.textFile("/public/retail_db/orders")
s = orders.first()

#first character from a string
s[0]

#first 10 characters from a string
s[:10]

#get length of string
len(s)

#One way to get the date, but it will not work if the order id before first
#comma is more than one character or digit
s[2:12]

#split and extract date
s.split(",")
type(s.split(","))
#Get Date
s.split(",")[1]
#Get customer id
s.split(",")[2]

#type casting to integer
int(s.split(",")[0])

#type casting integer to string
print("printing " + str(1))

int(s.split(",")[1].split(" ")[0].replace("-", ""))

Map, Flatmap and Filter

Map is used to –
Perform row level transformations where one record transforms into another record
Number of records in input RDD and output RDD will be equal
map is typically followed by other APIs used for
    joining the data
    performing aggregations
    sorting etc
flatMap is used to –
Perform row level transformations where one record will be transformed into the array of records
Number of records in output RDD will be typically more than number of records in input RDD
filter is used to –
Perform filtering of rows using a lambda function which returns a boolean and it is applied on each row.
Create a new data set for which the function return true. False records will be discarded.
In [ ]:
#Map

orders = sc.textFile("/public/retail_db/orders")
help(orders.map)

#Get status
orders.map(lambda o: o.split(",")[3]).first()
#Get count
orders.map(lambda o: o.split(",")[1]).first()

#Convert date format from YYYY-MM-DD HH24:MI:SS -> YYYYMM
#Type cast date to integer
orders.map(lambda o: int(o.split(",")[1].split(" ")[0].replace("-", ""))).first()
orders.map(lambda o: int(o.split(",")[1].split(" ")[0].replace("-", ""))).take(10)
orders.map(lambda o: int(o.split(",")[1].split(" ")[0].replace("-", ""))).count()

#Create tuples
orders.map(lambda o: (o.split(",")[3], 1))

orderItems = sc.textFile("/public/retail_db/order_items")
orderItems.first()
for i in orderItems.take(10): print(i)
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
orderItemsMap.first()
for i in orderItemsMap.take(10): print(i)
In [ ]:
#FlatMap
linesList = ["How are you", "let us perform", "word count using flatMap", "to understand flatMap in detail"]
lines = sc.parallelize(linesList)
words = lines.flatMap(lambda l: l.split(" "))
tuples = words.map(lambda word: (word, 1))
for i in tuples.countByKey(): print(i)
In [ ]:
#Filter
orders = sc.textFile("/public/retail_db/orders")
ordersComplete = orders.filter(lambda o: o.split(",")[3] in ["COMPLETE", "CLOSED"] and o.split(",")[1][:7] == "2014-01")

Joining Data sets

Join is one of transformation in Spark.
We can have join, leftouterjoin, rightouterjoin and fullouterjoin.
To make those joins we should have RDD in (key, value) format.
When we want to make the join, both the paired RDDs should have the same key.
The output of that join will be a paired RDD, with same key and (K, (V,W))
In [ ]:
#joins
orders = sc.textFile("/public/retail_db/orders")
orderItems = sc.textFile("/public/retail_db/order_items")

ordersMap = orders.map(lambda o:(int(o.split(",")[0]), o.split(",")[1]))
for i in ordersMap.take(10): print(i)

# (1, u'2013-07-25 00:00:00.0')
# (2, u'2013-07-25 00:00:00.0')
# (3, u'2013-07-25 00:00:00.0')
# (4, u'2013-07-25 00:00:00.0')
# (5, u'2013-07-25 00:00:00.0')
# (6, u'2013-07-25 00:00:00.0')
# (7, u'2013-07-25 00:00:00.0')
# (8, u'2013-07-25 00:00:00.0')
# (9, u'2013-07-25 00:00:00.0')
# (10, u'2013-07-25 00:00:00.0')    

orderItemsMap = orderItems.map(lambda oi:(int(oi.split(",")[1]), float(oi.split(",")[4])))
for i in orderItemsMap.take(10): print(i)

# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)

ordersJoin = ordersMap.join(orderItemsMap)

for i in ordersJoin.take(10): print(i)

# (32768, (u'2014-02-12 00:00:00.0', 199.99))                                     
# (32768, (u'2014-02-12 00:00:00.0', 129.99))
# (32768, (u'2014-02-12 00:00:00.0', 299.98))
# (32768, (u'2014-02-12 00:00:00.0', 399.98))
# (49152, (u'2014-05-27 00:00:00.0', 299.98))
# (4, (u'2013-07-25 00:00:00.0', 49.98))
# (4, (u'2013-07-25 00:00:00.0', 299.95))
# (4, (u'2013-07-25 00:00:00.0', 150.0))
# (4, (u'2013-07-25 00:00:00.0', 199.92))
# (50192, (u'2014-06-04 00:00:00.0', 129.99))
In [ ]:
#outer join
orders = sc.textFile("/public/retail_db/orders")
orderItems = sc.textFile("/public/retail_db/order_items")

ordersMap = orders.map(lambda o:(int(o.split(",")[0]), o.split(",")[3]))
for i in ordersMap.take(10): print(i)

# (1, u'CLOSED')
# (2, u'PENDING_PAYMENT')
# (3, u'COMPLETE')
# (4, u'CLOSED')
# (5, u'COMPLETE')
# (6, u'COMPLETE')
# (7, u'COMPLETE')
# (8, u'PROCESSING')
# (9, u'PENDING_PAYMENT')
# (10, u'PENDING_PAYMENT')

orderItemsMap = orderItems.map(lambda oi:(int(oi.split(",")[1]), float(oi.split(",")[4])))
for i in orderItemsMap.take(10): print(i)

# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)

ordersLeftOuterJoin = ordersMap.leftOuterJoin(orderItemsMap)

ordersLeftOuterJoinFilter = ordersLeftOuterJoin.filter(lambda o: o[1][1] == None)

for i in ordersLeftOuterJoinFilter.take(10): print(i)

# (43692, (u'PENDING_PAYMENT', None))
# (32, (u'COMPLETE', None))
# (40, (u'PENDING_PAYMENT', None))
# (32776, (u'CLOSED', None))
# (65904, (u'PENDING_PAYMENT', None))
# (60, (u'PENDING_PAYMENT', None))
# (38240, (u'COMPLETE', None))
# (57440, (u'COMPLETE', None))
# (76, (u'COMPLETE', None))
# (80, (u'COMPLETE', None))


ordersRightOuterJoin = orderItemsMap.rightOuterJoin(ordersMap)
ordersRightOuterJoinFilter = ordersRightOuterJoin.filter(lambda o: o[1][0] == None)

for i in ordersRightOuterJoinFilter.take(10): print(i)

Aggregations

Total aggregations – count, reduce
    count – give the number of records in RDD
    reduce – used to perform aggregations such as sum, min, max etc on RDDs which contain numeric elements

By Key aggregations – countByKey, reduceByKey and aggregateByKey    
    countByKey
    reduceByKey
    aggregateByKey

groupByKey can be used for aggregations but should be given low priority as it does not use the combiner
In [ ]:
#Aggregations - total
orderItems = sc.textFile("/public/retail_db/order_items")
orderItems.count()

#Aggregations - total - Get revenue for given order_id
orderItems = sc.textFile("/public/retail_db/order_items")
orderItemsFiltered = orderItems.filter(lambda oi: int(oi.split(",")[1]) == 2)
orderItemsSubtotals = orderItemsFiltered.map(lambda oi: float(oi.split(",")[4]))

from operator import add
orderItemsSubtotals.reduce(add)
#Both add or lambda fnuction will produce the same result
orderItemsSubtotals.reduce(lambda x, y: x + y)
In [ ]:
# Get order item details which have minimum order_item_subtotal for given order_id
orderItems = sc.textFile("/public/retail_db/order_items")
orderItems.reduce(lambda x, y: x if(float(x.split(",")[4]) < float(y.split(",")[4])) else y)
In [ ]:
#Get count by status - countByKey
orders = sc.textFile("/public/retail_db/orders")

ordersStatus = orders.map(lambda o: (o.split(",")[3], 1))

countByStatus = ordersStatus.countByKey()
for i,j in countByStatus.items(): print(i, j)
# (u'COMPLETE', 22899)
# (u'PAYMENT_REVIEW', 729)
# (u'PROCESSING', 8275)
# (u'CANCELED', 1428)
# (u'PENDING', 7610)
# (u'CLOSED', 7556)
# (u'PENDING_PAYMENT', 15030)
# (u'SUSPECTED_FRAUD', 1558)
# (u'ON_HOLD', 3798)    
GroupByKey
groupByKey can be used for any aggregation
It is least preferred as combiner will not be used
groupByKey is a generic API which group values into an array for a given key
On top of aggregations, we can perform many other transformations using groupByKey
In [ ]:
#Get revenue for each order_id - groupByKey
orderItems = sc.textFile("/public/retail_db/order_items")
for i in orderItems.take(10) : print(i)
# 1,1,957,1,299.98,299.98
# 2,2,1073,1,199.99,199.99
# 3,2,502,5,250.0,50.0
# 4,2,403,1,129.99,129.99
# 5,4,897,2,49.98,24.99
# 6,4,365,5,299.95,59.99
# 7,4,502,3,150.0,50.0
# 8,4,1014,4,199.92,49.98
# 9,5,957,1,299.98,299.98
# 10,5,365,5,299.95,59.99

# We need to create a key value(k,v) pair to apply group by key
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)
#After converting it into k,v pairs, we can apply the group by key and 
orderItemsGroupByOrderId = orderItemsMap.groupByKey()
for i in orderItemsGroupByOrderId.take(10): print(i)
# (2, <pyspark.resultiterable.ResultIterable object at 0x7fc24de60ed0>)           
# (4, <pyspark.resultiterable.ResultIterable object at 0x7fc24de670d0>)
# (8, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67110>)
# (10, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67150>)
# (12, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67190>)
# (14, <pyspark.resultiterable.ResultIterable object at 0x7fc24de671d0>)
# (16, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67210>)
# (18, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67250>)
# (20, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67290>)
# (24, <pyspark.resultiterable.ResultIterable object at 0x7fc24de672d0>)

#Here each value is a list of values for that key grouped together
#Now we need to sum up the values of that list.
revenuePerOrderId = orderItemsGroupByOrderId.map(lambda oi: (oi[0], round(sum(oi[1]), 2)))

for i in revenuePerOrderId.take(10): print(i)
In [ ]:
#Sorting with GroupByKey

#Get order item details in descending order by revenue - groupByKey
orderItems = sc.textFile("/public/retail_db/order_items")
for i in orderItems.take(10): print(i)
#1,1,957,1,299.98,299.98
# 2,2,1073,1,199.99,199.99
# 3,2,502,5,250.0,50.0
# 4,2,403,1,129.99,129.99
# 5,4,897,2,49.98,24.99
# 6,4,365,5,299.95,59.99
# 7,4,502,3,150.0,50.0
# 8,4,1014,4,199.92,49.98
# 9,5,957,1,299.98,299.98
# 10,5,365,5,299.95,59.99
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), oi))
for i in orderItemsMap.take(10): print(i)
# (1, u'1,1,957,1,299.98,299.98')
# (2, u'2,2,1073,1,199.99,199.99')
# (2, u'3,2,502,5,250.0,50.0')
# (2, u'4,2,403,1,129.99,129.99')
# (4, u'5,4,897,2,49.98,24.99')
# (4, u'6,4,365,5,299.95,59.99')
# (4, u'7,4,502,3,150.0,50.0')
# (4, u'8,4,1014,4,199.92,49.98')
# (5, u'9,5,957,1,299.98,299.98')
# (5, u'10,5,365,5,299.95,59.99')
orderItemsGroupByOrderId = orderItemsMap.groupByKey()
for i in orderItemsGroupByOrderId.take(10): print(i)
# (2, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67910>)           
# (4, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67750>)
# (8, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67450>)
# (10, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67650>)
# (12, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67590>)
# (14, <pyspark.resultiterable.ResultIterable object at 0x7fc24de676d0>)
# (16, <pyspark.resultiterable.ResultIterable object at 0x7fc24de675d0>)
# (18, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67510>)
# (20, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67690>)
# (24, <pyspark.resultiterable.ResultIterable object at 0x7fc24de67550>)

help(sorted)
#sorted(iterable, cmp=None, key=None, reverse=False) --> new sorted list
orderItemsSortedBySubtotalPerOrder = orderItemsGroupByOrderId. \
flatMap(lambda oi: 
  sorted(oi[1], key=lambda k: float(k.split(",")[4]), reverse=True)
  )

for i in orderItemsSortedBySubtotalPerOrder.take(10): print(i)
reduceByKey
reduceByKey uses combiner
It is used when logic to compute intermediate values and logic to compute the final value using intermediate values are same
It is very straightforward to implement
It takes one anonymous or lambda function with 2 arguments
In [ ]:
#Get revenue for each order_id - reduceByKey
orderItems = sc.textFile("/public/retail_db/order_items")
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
for i in orderItemsMap.take(10): print(i)
# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)
from operator import add
revenuePerOrderId = orderItemsMap.reduceByKey(add)
for i in revenuePerOrderId.take(10): print(i)
# (2, 579.98)                                                                     
# (4, 699.85)
# (8, 729.8399999999999)
# (10, 651.9200000000001)
# (12, 1299.8700000000001)
# (14, 549.94)
# (16, 419.93)
# (18, 449.96000000000004)
# (20, 879.8599999999999)
# (24, 829.97)
#Alternative way of adding for each key using reduceByKey
revenuePerOrderId = orderItemsMap.reduceByKey(lambda x, y: x + y)
for i in revenuePerOrderId.take(10): print(i)
# (2, 579.98)                                                                     
# (4, 699.85)
# (8, 729.8399999999999)
# (10, 651.9200000000001)
# (12, 1299.8700000000001)
# (14, 549.94)
# (16, 419.93)
# (18, 449.96000000000004)
# (20, 879.8599999999999)
# (24, 829.97)

#To get the min of subtotal
minRevenuePerOrderId = orderItemsMap.reduceByKey(lambda x, y: x if(x<y) else y)
for i in minRevenuePerOrderId.take(10): print(i)
# (2, 129.99)                                                                     
# (4, 49.98)
# (8, 50.0)
# (10, 21.99)
# (12, 100.0)
# (14, 50.0)
# (16, 119.98)
# (18, 119.98)
# (20, 129.99)
# (24, 50.0)
aggregateByKey
aggregateByKey uses combiner
It is used when logic to compute intermediate values and logic to compute the final value using intermediate values are not the same
It is a bit tricky to implement
It takes 3 arguments
    Initialize value – driven by output value type
    Combine function or seqOp – 2 arguments
        first argument – driven by output value type
        the second argument – driven by input value type
    Reduce function or combineOp – 2 arguments – driven by output value type
In [ ]:
#Get revenue and count of items for each order id - aggregateByKey
orderItems = sc.textFile("/public/retail_db/order_items")
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
for i in orderItemsMap.take(10): print(i)
# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)
revenuePerOrder = orderItemsMap.aggregateByKey((0.0, 0), 
  lambda x, y: (x[0] + y, x[1] + 1), 
  lambda x, y: (x[0] + y[0], x[1] + y[1]))
for i in revenuePerOrder.take(10): print(i)
# (2, (579.98, 3))                                                                
# (4, (699.85, 4))
# (8, (729.8399999999999, 4))
# (10, (651.9200000000001, 5))
# (12, (1299.8700000000001, 5))
# (14, (549.94, 3))
# (16, (419.93, 2))
# (18, (449.96000000000004, 3))
# (20, (879.8599999999999, 4))
# (24, (829.97, 5))
Sorting
Data can be sorted using sortByKey
Input RDD have to be paired RDD
Data can be sorted in ascending or descending order
By default, data will be sorted in natural ascending order of the key
We can pass False to sortByKey function to sort in descending order

Steps to follow for sorting
Understand data and identify the key and it’s data type using which data need to be sorted
Perform all the necessary transformations required before sorting the data
As part of last transformation before sorting the data, make sure the output have the sort key and value in the form of Paired RDD
Perform sortByKey to sort the data based on the key
Typically sortByKey will be followed by map to discard the key, if we need only value for saving or for further processing of data.
In [ ]:
#Sort data by product price - sortByKey
products = sc.textFile("/public/retail_db/products")
for i in products.take(10): print(i)

# 1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
# 2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
# 3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
# 7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014
# 8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat
# 9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves
# 10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
productsMap = products.filter(lambda p: p.split(",")[4] != ""). \
map(lambda p: (float(p.split(",")[4]), p))
# (59.98, u'1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')
# (129.99, u"2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
# (89.99, u"3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# (89.99, u"4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# (199.99, u'5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet')
# (134.99, u"6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat")
# (99.99, u'7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014')
# (129.99, u"8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat")
# (50.0, u'9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves')
# (129.99, u"10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
productsSortedByPrice = productsMap.sortByKey()
for i in productsSortedByPrice.take(10): print(i)
# (0.0, u"38,3,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat")
# (0.0, u"388,18,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat")
# (0.0, u"414,19,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat")
# (0.0, u"517,24,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat")
# (0.0, u"547,25,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat")
# (0.0, u'934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver')
# (0.0, u"1284,57,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat")
# (4.99, u'624,29,adidas Batting Helmet Hardware Kit,,4.99,http://images.acmesports.sports/adidas+Batting+Helmet+Hardware+Kit')
# (4.99, u'815,37,Zero Friction Practice Golf Balls - 12 Pack,,4.99,http://images.acmesports.sports/Zero+Friction+Practice+Golf+Balls+-+12+Pack')
# (5.0, u'336,15,"Nike Swoosh Headband - 2""",,5.0,http://images.acmesports.sports/Nike+Swoosh+Headband+-+2%22')
productsSortedMap = productsSortedByPrice.map(lambda p: p[1])

for i in productsSortedMap.take(10): print(i)
# 38,3,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
# 388,18,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
# 414,19,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
# 517,24,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
# 547,25,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
# 934,42,Callaway X Hot Driver,,0.0,http://images.acmesports.sports/Callaway+X+Hot+Driver
# 1284,57,Nike Men's Hypervenom Phantom Premium FG Socc,,0.0,http://images.acmesports.sports/Nike+Men%27s+Hypervenom+Phantom+Premium+FG+Soccer+Cleat
# 624,29,adidas Batting Helmet Hardware Kit,,4.99,http://images.acmesports.sports/adidas+Batting+Helmet+Hardware+Kit
# 815,37,Zero Friction Practice Golf Balls - 12 Pack,,4.99,http://images.acmesports.sports/Zero+Friction+Practice+Golf+Balls+-+12+Pack
# 336,15,"Nike Swoosh Headband - 2""",,5.0,http://images.acmesports.sports/Nike+Swoosh+Headband+-+2%22
Composite Sorting
We typically get key in the form of tuple having all the elements on which data need to be sorted
We can also accomplish sorting in ascending order of one element of composite key and sorting in descending order on another element (e.g.:ascending order by category and then descending order by product price) in some cases
In [ ]:
#Sort data by product category and then product price descending - sortByKey
products = sc.textFile("/public/retail_db/products")
for i in products.take(10): print(i)
# 1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
# 2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
# 3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
# 7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014
# 8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat
# 9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves
# 10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat

productsMap = products. \
filter(lambda p: p.split(",")[4] != "").map(lambda p: ((int(p.split(",")[1]), -float(p.split(",")[4])), p))
for i in productsMap.take(10): print(i)
# ((2, -59.98), u'1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')
# ((2, -129.99), u"2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
# ((2, -89.99), u"3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# ((2, -89.99), u"4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# ((2, -199.99), u'5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet')
# ((2, -134.99), u"6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat")
# ((2, -99.99), u'7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014')
# ((2, -129.99), u"8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat")
# ((2, -50.0), u'9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves')
# ((2, -129.99), u"10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
for i in productsMap.sortByKey().map(lambda p: p[1]).take(10): print(i)

# 16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet
# 11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 14,2,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
# 12,2,Under Armour Men's Highlight MC Alter Ego Fla,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Flash+Football...
# 23,2,Under Armour Men's Highlight MC Alter Ego Hul,,139.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Alter+Ego+Hulk+Football...
# 6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
# 2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
# 8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat
# 10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
Global Ranking – sortByKey and take
sortByKey is used to sort the data
We can use take to get first n number of records after sorting the data
However typical sortByKey and take does not work if there are more than n records that falls under top N
In [ ]:
#Get top N products by price - Global Ranking - sortByKey and take
products = sc.textFile("/public/retail_db/products")
for i in products.take(10): print(i)
# 1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
# 2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
# 3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
# 7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014
# 8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat
# 9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves
# 10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
productsMap = products.filter(lambda p: p.split(",")[4] != ""). \
map(lambda p: (float(p.split(",")[4]), p))
for i in productsMap.take(10): print(i)
# (59.98, u'1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')
# (129.99, u"2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
# (89.99, u"3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# (89.99, u"4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# (199.99, u'5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet')
# (134.99, u"6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat")
# (99.99, u'7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014')
# (129.99, u"8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat")
# (50.0, u'9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves')
# (129.99, u"10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")

#False for descending order
productsSortedByPrice = productsMap.sortByKey(False)
for i in productsSortedByPrice.map(lambda p: p[1]).take(5): print(i)
# 208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
# 66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop
Global Ranking – takeOrdered or top
takeOrdered will sort the the data based on the key as per logic and then get first N records
top will sort the data in descending order based on the key as per logic and then get first N records
We can use either of them to get top N records
In [ ]:
#Get top N products by price - Global Ranking - top or takeOrdered
products = sc.textFile("/public/retail_db/products")
productsFiltered = products.filter(lambda p: p.split(",")[4] != "")
for i in productsFiltered.take(10): print(i)
# 1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
# 2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
# 3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
# 7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014
# 8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat
# 9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves
# 10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
topNProducts = productsFiltered.top(5, key=lambda k: float(k.split(",")[4]))
topNProducts = productsFiltered.takeOrdered(5, key=lambda k: -float(k.split(",")[4]))
for i in topNProducts: print(i)
# 208,10,SOLE E35 Elliptical,,1999.99,http://images.acmesports.sports/SOLE+E35+Elliptical
# 66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 199,10,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 496,22,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 1048,47,"Spalding Beast 60"" Glass Portable Basketball ",,1099.99,http://images.acmesports.sports/Spalding+Beast+60%22+Glass+Portable+Basketball+Hoop    
Ranking by category
Good knowledge of programming languages such as Python, especially manipulating collections
groupByKey with flatMap
After groupByKey, we need to process the values as a collection using APIs of underlying programming language
The logic needs to be invoked using flatMap
In [ ]:
#Get top N products by price in each category
products = sc.textFile("/public/retail_db/products")
productsFiltered = products.filter(lambda p: p.split(",")[4] != "")
for i in productsFiltered.take(10): print(i)
# 1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy
# 2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat
# 3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat
# 7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014
# 8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat
# 9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves
# 10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat

productsMap = productsFiltered.map(lambda p: (int(p.split(",")[1]),p))
for i in productsMap.take(10): print(i)
# (2, u'1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy')
# (2, u"2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")
# (2, u"3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# (2, u"4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat")
# (2, u'5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet')
# (2, u"6,2,Jordan Men's VI Retro TD Football Cleat,,134.99,http://images.acmesports.sports/Jordan+Men%27s+VI+Retro+TD+Football+Cleat")
# (2, u'7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.99,http://images.acmesports.sports/Schutt+Youth+Recruit+Hybrid+Custom+Football+Helmet+2014')
# (2, u"8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.99,http://images.acmesports.sports/Nike+Men%27s+Vapor+Carbon+Elite+TD+Football+Cleat")
# (2, u'9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vapor+Jet+3.0+Receiver+Gloves')
# (2, u"10,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat")

productsGroupByCategoryId = productsMap.groupByKey()
for i in productsGroupByCategoryId.take(10): print(i)

# (2, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a310>)           
# (4, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a4d0>)
# (6, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a510>)
# (8, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a550>)
# (10, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a590>)
# (12, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a5d0>)
# (16, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a610>)
# (18, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a650>)
# (20, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a690>)
# (22, <pyspark.resultiterable.ResultIterable object at 0x7f0195b7a6d0>)

topNProductsByCategory = productsGroupByCategoryId. \
flatMap(lambda p: sorted(p[1], key = lambda k: float(k.split(",")[4]), reverse = True)[:3])
for i in topNProductsByCategory.take(10): print(i)
    
# 16,2,Riddell Youth 360 Custom Football Helmet,,299.99,http://images.acmesports.sports/Riddell+Youth+360+Custom+Football+Helmet
# 11,2,Fitness Gear 300 lb Olympic Weight Set,,209.99,http://images.acmesports.sports/Fitness+Gear+300+lb+Olympic+Weight+Set
# 5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet
# 66,4,SOLE F85 Treadmill,,1799.99,http://images.acmesports.sports/SOLE+F85+Treadmill
# 60,4,SOLE E25 Elliptical,,999.99,http://images.acmesports.sports/SOLE+E25+Elliptical
# 71,4,Diamondback Adult Response XE Mountain Bike 2,,349.98,http://images.acmesports.sports/Diamondback+Adult+Response+XE+Mountain+Bike+2014
# 117,6,YETI Tundra 65 Chest Cooler,,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
# 106,6,Teeter Hang Ups NXT-S Inversion Table,,299.99,http://images.acmesports.sports/Teeter+Hang+Ups+NXT-S+Inversion+Table
# 100,6,Quik Shade Summit SX170 10 FT. x 10 FT. Canop,,199.99,http://images.acmesports.sports/Quik+Shade+Summit+SX170+10+FT.+x+10+FT.+Canopy
# 162,8,YETI Tundra 65 Chest Cooler,,399.99,http://images.acmesports.sports/YETI+Tundra+65+Chest+Cooler
Set Operations
In [ ]:
#Set operations - 
#Prepare data - subsets of products for 2013-12 and 2014-01
orders = sc.textFile("/public/retail_db/orders")
orderItems = sc.textFile("/public/retail_db/order_items")
for i in orders.take(10): print(i)
# 1,2013-07-25 00:00:00.0,11599,CLOSED
# 2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
# 3,2013-07-25 00:00:00.0,12111,COMPLETE
# 4,2013-07-25 00:00:00.0,8827,CLOSED
# 5,2013-07-25 00:00:00.0,11318,COMPLETE
# 6,2013-07-25 00:00:00.0,7130,COMPLETE
# 7,2013-07-25 00:00:00.0,4530,COMPLETE
# 8,2013-07-25 00:00:00.0,2911,PROCESSING
# 9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
# 10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
for i in orderItems.take(10): print(i)    

# 1,1,957,1,299.98,299.98
# 2,2,1073,1,199.99,199.99
# 3,2,502,5,250.0,50.0
# 4,2,403,1,129.99,129.99
# 5,4,897,2,49.98,24.99
# 6,4,365,5,299.95,59.99
# 7,4,502,3,150.0,50.0
# 8,4,1014,4,199.92,49.98
# 9,5,957,1,299.98,299.98
# 10,5,365,5,299.95,59.99
orders201312 = orders. \
filter(lambda o: o.split(",")[1][:7] == "2013-12"). \
map(lambda o: (int(o.split(",")[0]), o))
for i in orders201312.take(10): print(i)
# (20916, u'20916,2013-12-01 00:00:00.0,11503,CLOSED')
# (20917, u'20917,2013-12-01 00:00:00.0,10441,PENDING_PAYMENT')
# (20918, u'20918,2013-12-01 00:00:00.0,1664,PENDING')
# (20919, u'20919,2013-12-01 00:00:00.0,383,COMPLETE')
# (20920, u'20920,2013-12-01 00:00:00.0,4799,PROCESSING')
# (20921, u'20921,2013-12-01 00:00:00.0,4712,PROCESSING')
# (20922, u'20922,2013-12-01 00:00:00.0,9720,COMPLETE')
# (20923, u'20923,2013-12-01 00:00:00.0,10118,COMPLETE')
# (20924, u'20924,2013-12-01 00:00:00.0,417,PENDING')
# (20925, u'20925,2013-12-01 00:00:00.0,6416,PAYMENT_REVIEW')

orders201401 = orders. \
filter(lambda o: o.split(",")[1][:7] == "2014-01"). \
map(lambda o: (int(o.split(",")[0]), o))
for i in orders201401.take(10): print(i)
        
# (25876, u'25876,2014-01-01 00:00:00.0,3414,PENDING_PAYMENT')
# (25877, u'25877,2014-01-01 00:00:00.0,5549,PENDING_PAYMENT')
# (25878, u'25878,2014-01-01 00:00:00.0,9084,PENDING')
# (25879, u'25879,2014-01-01 00:00:00.0,5118,PENDING')
# (25880, u'25880,2014-01-01 00:00:00.0,10146,CANCELED')
# (25881, u'25881,2014-01-01 00:00:00.0,3205,PENDING_PAYMENT')
# (25882, u'25882,2014-01-01 00:00:00.0,4598,COMPLETE')
# (25883, u'25883,2014-01-01 00:00:00.0,11764,PENDING')
# (25884, u'25884,2014-01-01 00:00:00.0,7904,PENDING_PAYMENT')
# (25885, u'25885,2014-01-01 00:00:00.0,7253,PENDING')
orderItemsMap = orderItems. \
map(lambda oi: (int(oi.split(",")[1]), oi))
for i in orderItemsMap.take(10): print(i)
    
# (1, u'1,1,957,1,299.98,299.98')
# (2, u'2,2,1073,1,199.99,199.99')
# (2, u'3,2,502,5,250.0,50.0')
# (2, u'4,2,403,1,129.99,129.99')
# (4, u'5,4,897,2,49.98,24.99')
# (4, u'6,4,365,5,299.95,59.99')
# (4, u'7,4,502,3,150.0,50.0')
# (4, u'8,4,1014,4,199.92,49.98')
# (5, u'9,5,957,1,299.98,299.98')
# (5, u'10,5,365,5,299.95,59.99')
orderItems201312 = orders201312. \
join(orderItemsMap). \
map(lambda oi: oi[1][1])
orderItems201401 = orders201401. \
join(orderItemsMap). \
map(lambda oi: oi[1][1])

for i in orderItems201312.take(10): print(i)
# 62881,25096,502,5,250.0,50.0                                                    
# 62882,25096,365,1,59.99,59.99
# 62883,25096,365,4,239.96,59.99
# 62884,25096,365,2,119.98,59.99
# 62885,25096,191,3,299.97,99.99
# 57915,23132,1004,1,399.98,399.98
# 57916,23132,1014,1,49.98,49.98
# 54657,21856,1004,1,399.98,399.98
# 54658,21856,403,1,129.99,129.99
# 54659,21856,627,4,159.96,39.99
for i in orderItems201401.take(10): print(i)
# 68397,27308,365,1,59.99,59.99                                                   
# 68398,27308,365,4,239.96,59.99
# 68399,27308,1004,1,399.98,399.98
# 68400,27308,365,4,239.96,59.99
# 68401,27308,1073,1,199.99,199.99
# 68417,27316,365,5,299.95,59.99
# 68424,27320,627,2,79.98,39.99
# 68425,27320,818,5,239.95,47.99
# 68426,27320,778,1,24.99,24.99
# 68429,27324,365,5,299.95,59.99
Set operations can let us get the common elements betweeb two data sets or all elements from two data sets.
A union will get all the elements from both the data sets
Intersect will get all the elements common in both the datasets
Distinct will get all the distinct elements in a data set
In the case of a union, it will not get distinct elements. Apply distinct, if you only want to get distinct elements after union.
When we use set operations such as union and intersect, data should have a similar structure
Diff and complement are not available on top of RDDs
In [ ]:
#Set operations - Union - Get product ids sold in 2013-12 and 2014-01
products201312 = orderItems201312.map(lambda p: int(p.split(",")[2]))
products201401 = orderItems201401.map(lambda p: int(p.split(",")[2]))
for i in products201312.take(10): print(i)
# 502
# 365
# 365
# 365
# 191
# 1004
# 1014
# 1004
# 403
# 627

for i in products201401.take(10): print(i)
# 191
# 365
# 364
# 365
# 365
# 1073
# 365
# 724
# 403
# 773
allproducts = products201312.union(products201401).distinct()

for i in allproducts.take(10) : print(i)
# 768                                                                             
# 792
# 208
# 728
# 24
# 216
# 897
# 705
# 977
# 273    

#Set operations - Intersection - Get product ids sold in both 2013-12 and 2014-01
products201312 = orderItems201312.map(lambda p: int(p.split(",")[2]))
products201401 = orderItems201401.map(lambda p: int(p.split(",")[2]))

for i in products201312.take(10) : print(i)
    
# 1014
# 1073
# 1004
# 1004
# 1014
# 1073
# 1014
# 61
# 1004
# 1004        
    
for i in products201401.take(10) : print(i)    

# 191
# 365
# 364
# 365
# 365
# 1073
# 365
# 724
# 403
# 773    
commonproducts = products201312.intersection(products201401)
for i in commonproducts.take(10) : print(i)
# 768                                                                             
# 208
# 728
# 24
# 792
# 216
# 897
# 705
# 825
# 273
#Set operations - minus - Get product ids sold in 2013-12 but not in 2014-01

products201312only = products201312.subtract(products201401).distinct()
for i in products201312only.take(10) : print(i)
#127

products201401only = products201401.subtract(products201312).distinct()
for i in products201401only.take(10) : print(i)
#58
    
#Set operations - (a-b)u(b-a)

productsSoldOnlyInOneMonth = products201312only.union(products201401only)
for i in productsSoldOnlyInOneMonth.take(10) : print(i)
# 127
# 58
Saving RDD to HDFS
In [ ]:
#Saving as text files with delimiters - revenue per order id
orderItems = sc.textFile("/public/retail_db/order_items")
for i in orderItems.take(10) : print(i)
# 1,1,957,1,299.98,299.98
# 2,2,1073,1,199.99,199.99
# 3,2,502,5,250.0,50.0
# 4,2,403,1,129.99,129.99
# 5,4,897,2,49.98,24.99
# 6,4,365,5,299.95,59.99
# 7,4,502,3,150.0,50.0
# 8,4,1014,4,199.92,49.98
# 9,5,957,1,299.98,299.98
# 10,5,365,5,299.95,59.99
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
for i in orderItemsMap.take(10) : print(i)
# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)
    
from operator import add
revenuePerOrderId = orderItemsMap.reduceByKey(add).map(lambda r: str(r[0]) + "\t" + str(r[1]))
for i in revenuePerOrderId.take(10) : print(i)

# 2       579.98                                                                  
# 4       699.85
# 8       729.84
# 10      651.92
# 12      1299.87
# 14      549.94
# 16      419.93
# 18      449.96
# 20      879.86
# 24      829.97
    
revenuePerOrderId.saveAsTextFile("/user/munjesh1/revenue_per_order_id")
#For compression 

revenuePerOrderId. \
saveAsTextFile("/user/dgadiraju/revenue_per_order_compressed",
  compressionCodecClass="org.apache.hadoop.io.compress.SnappyCodec")

# hadoop fs -ls /user/munjesh1/revenue_per_order_id
# hadoop fs -tail /user/munjesh1/revenue_per_order_id/part-00000

for i in sc.textFile("/user/munjesh1/revenue_per_order_id").take(10): print(i)
    
# 2       579.98
# 4       699.85
# 8       729.84
# 10      651.92
# 12      1299.87
# 14      549.94
# 16      419.93
# 18      449.96
# 20      879.86
# 24      829.97
Saving data in different file formats
Supported file formats
    orc
    json
    parquet
    avro (with databricks plugin)
Steps to save into different file formats
    Make sure data is represented as Data Frame
    Use write or save API to save Data Frame into different file formats
    Use compression algorithm if required
In [ ]:
orderItems = sc.textFile("/public/retail_db/order_items")
for i in orderItems.take(10) : print(i)
# 1,1,957,1,299.98,299.98
# 2,2,1073,1,199.99,199.99
# 3,2,502,5,250.0,50.0
# 4,2,403,1,129.99,129.99
# 5,4,897,2,49.98,24.99
# 6,4,365,5,299.95,59.99
# 7,4,502,3,150.0,50.0
# 8,4,1014,4,199.92,49.98
# 9,5,957,1,299.98,299.98
# 10,5,365,5,299.95,59.99
orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
for i in orderItemsMap.take(10) : print(i)
# (1, 299.98)
# (2, 199.99)
# (2, 250.0)
# (2, 129.99)
# (4, 49.98)
# (4, 299.95)
# (4, 150.0)
# (4, 199.92)
# (5, 299.98)
# (5, 299.95)

for i in revenuePerOrderId.take(10) : print(i)
# (2, 579.98)                                                                     
# (4, 699.85)
# (8, 729.8399999999999)
# (10, 651.9200000000001)
# (12, 1299.8700000000001)
# (14, 549.94)
# (16, 419.93)
# (18, 449.96000000000004)
# (20, 879.8599999999999)
# (24, 829.97)

from operator import add
revenuePerOrderId = orderItemsMap.reduceByKey(add)
revenuePerOrderIdDF = revenuePerOrderId. \
toDF(schema=["order_id", "order_revenue"])

revenuePerOrderIdDF.save("/user/munjesh1/revenue_per_order_json", "json")
revenuePerOrderIdDF.write.json("/user/munjesh1/revenue_per_order_json")

sqlContext.read.json("/user/munjesh1/revenue_per_order_json").show()