Aggregations can be broadly categorized into totals and by key. Spark brings power of in memory as well as rich APIs to aggregate data sets. In this blog I will discussing about implementation of aggregations using different actions and transformations.
- Pre-requisites
- Spark Introduction
- Aggregations – totals
- Aggregations – by key
- Difference between groupByKey, reduceByKey, aggregateByKey etc
- Computing by key averages
- Computing by key min/max
Pre-requisites
- Make sure to have access to cluster or set up Cloudera QuickStart VM
- Use sqoop import-all-tables to load data into HDFS
sqoop import-all-tables -m 4 --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username=retail_dba --password=cloudera --warehouse-dir=/user/cloudera/sqoop_import
- Once you have environment with data set, we can start exploring different strategies to aggregate data using Spark.
Spark Introduction
Apache Spark is open source cluster computing framework. This lesson will have all the topics related to Spark using Python.
- It works with any file system (s3, HDFS etc)
- Processing will be done in-memory
- It is effective in processing streaming data loads
- It is primarily distributed by databricksThere are many components in Spark eco system, such as Core Spark or Transformations and Actions, Streaming, MLLib, Graphx, Data Frames (from 1.3.x) etc.
For CCA Spark and Hadoop Developer certification, we just need to focus on Core Spark which means core set of Transformations and Actions. While other components change with versions, core spark syntax remained almost same.
Here is the documentation that will be provided at the time of taking certification exam. One need to use Spark programming guide.
Here is the video about the introduction of Spark.
Total Aggregations – eg: sum
- sum, average
- min, max
Spark provides actions such as count, total to compute sums. To compute average, we should be able to perform necessary operations by leveraging spark transformations and actions.
This video covers computing totals using actions such as count and reduce to get count and sum.
- Get total number of records in a data set (eg: orders)
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersRDD.count()
- Get total revenue from order_items
orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items") orderItemsMap = orderItemsRDD.map(lambda rec: float(rec.split(",")[4])) for i in orderItemsMap.take(5): print i orderItemsReduce = orderItemsMap.reduce(lambda rev1, rev2: rev1 + rev2)
- Get max priced product in products table. Cleanup that one product with product_id 685. We will look into filtering later.
hadoop fs -get /user/cloudera/sqoop_import/products #Delete the record with product_id 685 hadoop fs -put -f products/part* /user/cloudera/sqoop_import/products
- Get max priced product using reduce, see below for the implementation in the lambda function
productsRDD = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = productsRDD.map(lambda rec: rec) productsMap.reduce( lambda rec1, rec2: (rec1 if((rec1.split(",")[4] != "" and rec2.split(",")[4] != "") and float(rec1.split(",")[4]) >= float(rec2.split(",")[4])) else rec2) )
Total Aggregations – eg: Average
- Computing average revenue (total revenue/total distinct orders from order_items)
revenue = sc.textFile("/user/cloudera/sqoop_import/order_items"). map(lambda rec: float(rec.split(",")[4])). reduce(lambda rev1, rev2: rev1 + rev2) totalOrders = sc.textFile("/user/cloudera/sqoop_import/order_items"). map(lambda rec: int(rec.split(",")[1])). distinct(). count() revenue/totalOrders
Aggregations – By Key
By key aggregations are extensively used to group data by a key and get insights. Key could be time, category, department etc.
Differences between by key transformations
Here is the introduction video about by key operations. In this all the by key transformations such as groupByKey, reduceByKey, aggregateByKey are extensively covered.
- Number of orders by status using different by key operations
- Using countByKey
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersMap = ordersRDD.map(lambda rec: (rec.split(",")[3], 1)) for i in ordersMap.countByKey().items(): print(i)
- Using groupByKey
- groupByKey is not efficient in this case, as the size of the output is very small compared to size of input data.
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersByStatus = ordersMap. groupByKey(). map(lambda t: (t[0], sum(t[1]))) for recs in ordersByStatus.collect(): print(recs)
- Using reduceByKey. It uses combiner internally.
- Input data and output data for reduceByKey need to be of same type.
- Combiner is implicit and uses the reduce logic.
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersByStatus = ordersMap. reduceByKey(lambda acc, value: acc + value) for recs in ordersByStatus.collect(): print(recs)
- Using aggregateByKey. It also uses combiner internally.
- Input data and output data for reduceByKey can be of different type.
- Also combiner logic can be different from reduce logic.
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersMap = ordersRDD. map(lambda rec: (rec.split(",")[3], rec)) ordersByStatus = ordersMap. aggregateByKey(0, lambda acc, value: acc+1, lambda acc, value: acc+value ) for recs in ordersByStatus.collect(): print(recs)
- Using combineByKey. It also uses combiner internally.
- Input data and output data for reduceByKey can be of different type.
- It is almost same as aggregateByKey and less frequently used.
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersByStatus = ordersMap. combineByKey(lambda value: 1, lambda acc, value: acc+1, lambda acc, value: acc+value ) for recs in ordersByStatus.collect(): print(recs)
By key aggregation – combiner theory
Here is the video which talks about combiner. Combiner is the main difference between groupByKey and other by key transformations such as reduceByKey, aggregateByKey etc.
- For all the by key aggregations where the input data volume is significantly higher than output data volume, then we should not use groupByKey
- groupByKey is the only option where we can extend functionality for other operations such as sorting.
By key aggregation – reduceByKey and aggregateByKey
Here is the video which talks about combiner in the context of reduceByKey, aggregateByKey etc.
- Once we understand differences between different by key transformations, we need to apply for new requirements
- Requirement: Number of orders by order date and order status
- For this requirement, key is order_date and order_status
- We are doing aggregation, hence groupByKey is eliminated
- Combiner logic and reducer logic can be same, hence aggregateByKey and combineByKey are eliminated
- We are left with countByKey and reduceByKey, we can use either of them.
- Here is the logic implementing reduceByKey
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersMapRDD = ordersRDD. map(lambda rec: ((rec.split(",")[1], rec.split(",")[3]), 1)) ordersByStatusPerDay = ordersMapRDD. reduceByKey(lambda v1, v2: v1+v2) for recs in ordersByStatusPerDay.collect(): print(recs)
Computing By Key averages
- Requirement: Generate average revenue per day
- Parse Orders (key order_id)
- Parse Order items (key order_item_order_id)
- Join Orders and Order item on the key
- Parse joined data and get (order_date and order_id) as key and order_item_subtotal as value. Here number of input records and output records will be same.
- Apply aggregate function (reduceByKey) to get revenue per order. Here number of output records will be number of distinct orders from order_items table.
- Parse revenue per order and remove order_id from the key. Here number of output records will be number of distinct orders from order_items table.
- Apply aggregate function (combineByKey or aggregateByKey) to get total revenue per day. In this input type and output type for aggregate functions are different and also we need to have custom combiner logic, hence reduceByKey cannot be used.
- Now data will have order_date as key and (revenue_per_day and total_number_of_orders) as value. Apply map function to divide revenue with total number of orders per day.
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items") ordersParsedRDD = ordersRDD.map(lambda rec: (rec.split(",")[0], rec)) orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (rec.split(",")[1], rec)) ordersJoinOrderItems = orderItemsParsedRDD.join(ordersParsedRDD) ordersJoinOrderItemsMap = ordersJoinOrderItems. map(lambda t: ((t[1][1].split(",")[1], t[0]), float(t[1][0].split(",")[4]))) revenuePerDayPerOrder = ordersJoinOrderItemsMap. reduceByKey(lambda acc, value: acc + value) revenuePerDayPerOrderMap = revenuePerDayPerOrder. map(lambda rec: (rec[0][0], rec[1])) revenuePerDay = revenuePerDayPerOrderMap.combineByKey( lambda x: (x, 1), lambda acc, revenue: (acc[0] + revenue, acc[1] + 1), lambda total1, total2: (round(total1[0] + total2[0], 2), total1[1] + total2[1]) ) revenuePerDay = revenuePerDayPerOrderMap.aggregateByKey( (0, 0), lambda acc, revenue: (acc[0] + revenue, acc[1] + 1), lambda total1, total2: (round(total1[0] + total2[0], 2), total1[1] + total2[1]) ) for data in revenuePerDay.collect(): print(data) avgRevenuePerDay = revenuePerDay.map(lambda x: (x[0], x[1][0]/x[1][1])) for data in avgRevenuePerDay.collect(): print(data)
Computing by key min/max
Here is the video which explains implementation of max by key. It includes SQL (but SQL is not relevant for the CCA certification)
- Requirement: Get customer_id with max revenue for each day
- customer_id is in orders table and revenue need to be derived from order_items table
- Apply join between orders and order_items with order_id as key
- Apply map function to get (order_date and customer_id) as key and order_item_subtotal as value
- Apply aggregate function to generate revenue per customer per day
- Apply another aggregate function using reduceByKey to get customer with maximum revenue per day. Logic is implemented using named function and invoked from lambda function of reduceByKey
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items") ordersParsedRDD = ordersRDD.map(lambda rec: (rec.split(",")[0], rec)) orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (rec.split(",")[1], rec)) ordersJoinOrderItems = orderItemsParsedRDD.join(ordersParsedRDD) ordersPerDayPerCustomer = ordersJoinOrderItems. map(lambda rec: ((rec[1][1].split(",")[1], rec[1][1].split(",")[2]), float(rec[1][0].split(",")[4]))) revenuePerDayPerCustomer = ordersPerDayPerCustomer. reduceByKey(lambda x, y: x + y) revenuePerDayPerCustomerMap = revenuePerDayPerCustomer. map(lambda rec: (rec[0][0], (rec[0][1], rec[1]))) topCustomerPerDaybyRevenue = revenuePerDayPerCustomerMap. reduceByKey(lambda x, y: (x if x[1] >= y[1] else y)) #Using regular function def findMax(x, y): if(x[1] >= y[1]): return x else: return y topCustomerPerDaybyRevenue = revenuePerDayPerCustomerMap. reduceByKey(lambda x, y: findMax(x, y))
Leave a Reply