### Aggregations – by key

Aggregations can be broadly categorized into totals and by key. As part of this topic we will covering aggregations – by key.

- Load data from HDFS and store results back to HDFS using Spark
- Join disparate datasets together using Spark
**Calculate aggregate statistics (e.g., average or sum) using Spark**- Filter data into a smaller dataset using Spark
- Write a query that produces ranked or sorted data using Spark

By key aggregations are extensively used to group data by a key and get insights. Key could be time, category, department etc.

Here is the introduction video about by key operations. In this all the by key transformations such as groupByKey, reduceByKey, aggregateByKey are extensively covered.

- Number of orders by status using different by key operations
- Using countByKey

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersMap = ordersRDD.map(lambda rec: (rec.split(",")[3], 1)) for i in ordersMap.countByKey().items(): print(i)

- Using groupByKey
- groupByKey is not efficient in this case, as the size of the output is very small compared to size of input data.

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersByStatus = ordersMap. groupByKey(). map(lambda t: (t[0], sum(t[1]))) for recs in ordersByStatus.collect(): print(recs)

- Using reduceByKey. It uses combiner internally.
- Input data and output data for reduceByKey need to be of same type.
- Combiner is implicit and uses the reduce logic.

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersByStatus = ordersMap. reduceByKey(lambda acc, value: acc + value) for recs in ordersByStatus.collect(): print(recs)

- Using aggregateByKey. It also uses combiner internally.
- Input data and output data for reduceByKey can be of different type.
- Also combiner logic can be different from reduce logic.

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersMap = ordersRDD. map(lambda rec: (rec.split(",")[3], rec)) ordersByStatus = ordersMap. aggregateByKey(0, lambda acc, value: acc+1, lambda acc, value: acc+value ) for recs in ordersByStatus.collect(): print(recs)

- Using combineByKey. It also uses combiner internally.
- Input data and output data for reduceByKey can be of different type.
- It is almost same as aggregateByKey and less frequently used.

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersByStatus = ordersMap. combineByKey(lambda value: 1, lambda acc, value: acc+1, lambda acc, value: acc+value ) for recs in ordersByStatus.collect(): print(recs)

### Overview of Combiner

Here is the video which talks about combiner. Combiner is the main difference between groupByKey and other by key transformations such as reduceByKey, aggregateByKey etc.

- For all the by key aggregations where the input data volume is significantly higher than output data volume, then we should not use groupByKey
- groupByKey is the only option where we can extend functionality for other operations such as sorting.

### Combiner in reduceByKey and aggregateByKey

Here is the video which talks about combiner in the context of reduceByKey, aggregateByKey etc.

- Once we understand differences between different by key transformations, we need to apply for new requirements
- Requirement: Number of orders by order date and order status
- For this requirement, key is order_date and order_status
- We are doing aggregation, hence groupByKey is eliminated
- Combiner logic and reducer logic can be same, hence aggregateByKey and combineByKey are eliminated
- We are left with countByKey and reduceByKey, we can use either of them.
- Here is the logic implementing reduceByKey

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersMapRDD = ordersRDD. map(lambda rec: ((rec.split(",")[1], rec.split(",")[3]), 1)) ordersByStatusPerDay = ordersMapRDD. reduceByKey(lambda v1, v2: v1+v2) for recs in ordersByStatusPerDay.collect(): print(recs)

### Aggregations – compute averages by key

Here is the video which explains aggregation such as average by key

- Requirement: Generate average revenue per day
- Parse Orders (key order_id)
- Parse Order items (key order_item_order_id)
- Join Orders and Order item on the key
- Parse joined data and get (order_date and order_id) as key and order_item_subtotal as value. Here number of input records and output records will be same.
- Apply aggregate function (reduceByKey) to get revenue per order. Here number of output records will be number of distinct orders from order_items table.
- Parse revenue per order and remove order_id from the key. Here number of output records will be number of distinct orders from order_items table.
- Apply aggregate function (combineByKey or aggregateByKey) to get total revenue per day. In this input type and output type for aggregate functions are different and also we need to have custom combiner logic, hence reduceByKey cannot be used.
- Now data will have order_date as key and (revenue_per_day and total_number_of_orders) as value. Apply map function to divide revenue with total number of orders per day.

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items") ordersParsedRDD = ordersRDD.map(lambda rec: (rec.split(",")[0], rec)) orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (rec.split(",")[1], rec)) ordersJoinOrderItems = orderItemsParsedRDD.join(ordersParsedRDD) ordersJoinOrderItemsMap = ordersJoinOrderItems. map(lambda t: ((t[1][1].split(",")[1], t[0]), float(t[1][0].split(",")[4]))) revenuePerDayPerOrder = ordersJoinOrderItemsMap. reduceByKey(lambda acc, value: acc + value) revenuePerDayPerOrderMap = revenuePerDayPerOrder. map(lambda rec: (rec[0][0], rec[1])) revenuePerDay = revenuePerDayPerOrderMap.combineByKey( lambda x: (x, 1), lambda acc, revenue: (acc[0] + revenue, acc[1] + 1), lambda total1, total2: (round(total1[0] + total2[0], 2), total1[1] + total2[1]) ) revenuePerDay = revenuePerDayPerOrderMap.aggregateByKey( (0, 0), lambda acc, revenue: (acc[0] + revenue, acc[1] + 1), lambda total1, total2: (round(total1[0] + total2[0], 2), total1[1] + total2[1]) ) for data in revenuePerDay.collect(): print(data) avgRevenuePerDay = revenuePerDay.map(lambda x: (x[0], x[1][0]/x[1][1])) for data in avgRevenuePerDay.collect(): print(data)

### Aggregations – compute max by key using SQL

Here is the video which explains implementation of max by key. It includes SQL (but SQL is not relevant for the CCA certification)

- Requirement: Get customer_id with max revenue for each day
- customer_id is in orders table and revenue need to be derived from order_items table
- Apply join between orders and order_items with order_id as key
- Apply map function to get (order_date and customer_id) as key and order_item_subtotal as value
- Apply aggregate function to generate revenue per customer per day
- Apply another aggregate function using reduceByKey to get customer with maximum revenue per day. Logic is implemented using named function and invoked from lambda function of reduceByKey

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items") ordersParsedRDD = ordersRDD.map(lambda rec: (rec.split(",")[0], rec)) orderItemsParsedRDD = orderItemsRDD.map(lambda rec: (rec.split(",")[1], rec)) ordersJoinOrderItems = orderItemsParsedRDD.join(ordersParsedRDD) ordersPerDayPerCustomer = ordersJoinOrderItems. map(lambda rec: ((rec[1][1].split(",")[1], rec[1][1].split(",")[2]), float(rec[1][0].split(",")[4]))) revenuePerDayPerCustomer = ordersPerDayPerCustomer. reduceByKey(lambda x, y: x + y) revenuePerDayPerCustomerMap = revenuePerDayPerCustomer. map(lambda rec: (rec[0][0], (rec[0][1], rec[1]))) topCustomerPerDaybyRevenue = revenuePerDayPerCustomerMap. reduceByKey(lambda x, y: (x if x[1] >= y[1] else y)) #Using regular function def findMax(x, y): if(x[1] >= y[1]): return x else: return y topCustomerPerDaybyRevenue = revenuePerDayPerCustomerMap. reduceByKey(lambda x, y: findMax(x, y))

Just a small correction in the above topic ,this line

ordersPerDayPerCustomer = ordersJoinOrderItems.map(lambda rec: ((rec[1][1].split(",")[1], rec[1][1].split(",")[2]), float(rec[1][0].split(",")[4])))

should instead read

ordersPerDayPerCustomer = ordersJoinOrderItems.map(lambda rec: ((rec[1][1].split(",")[1], rec[1][1].split(",")[2]), float(rec[1][0].split(",")[3])))

since order item sub_total is the 4 th field in the dataset and accessed by array index/subscript 3 .

Sorry , below line is correct , my bad eyes today

ordersPerDayPerCustomer = ordersJoinOrderItems.map(lambda rec: ((rec[1][1].split(",")[1], rec[1][1].split(",")[2]), float(rec[1][0].split(",")[4])))