Introduction to aggregating data sets using pyspark – totals
Aggregations can be broadly categorized into totals and by key. As part of this topic we will covering aggregations – totals.
- Load data from HDFS and store results back to HDFS using Spark
- Join disparate datasets together using Spark
- Calculate aggregate statistics (e.g., average or sum) using Spark
- Filter data into a smaller dataset using Spark
- Write a query that produces ranked or sorted data using Spark
Types of totals aggregations
- sum, average
- min, max
Spark provides actions such as count, total to compute sums. To compute average, we should be able to perform necessary operations by leveraging spark transformations and actions.
Computing totals – aggregations
This video covers computing totals using actions such as count and reduce to get count and sum.
- Get total number of records in a data set (eg: orders)
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders") ordersRDD.count()
- Get total revenue from order_items
orderItemsRDD = sc.textFile("/user/cloudera/sqoop_import/order_items") orderItemsMap = orderItemsRDD.map(lambda rec: float(rec.split(","))) for i in orderItemsMap.take(5): print i orderItemsReduce = orderItemsMap.reduce(lambda rev1, rev2: rev1 + rev2)
- Get max priced product in products table. Cleanup that one product with product_id 685. We will look into filtering later.
hadoop fs -get /user/cloudera/sqoop_import/products #Delete the record with product_id 685 hadoop fs -put -f products/part* /user/cloudera/sqoop_import/products
- Get max priced product using reduce, see below for the implementation in the lambda function
productsRDD = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = productsRDD.map(lambda rec: rec) productsMap.reduce( lambda rec1, rec2: (rec1 if((rec1.split(",") != "" and rec2.split(",") != "") and float(rec1.split(",")) >= float(rec2.split(","))) else rec2) )
Computing average – aggregations
Computing average revenue using total revenue divided by total distinct orders.
- Computing average revenue (total revenue/total distinct orders from order_items)
revenue = sc.textFile("/user/cloudera/sqoop_import/order_items"). map(lambda rec: float(rec.split(","))). reduce(lambda rev1, rev2: rev1 + rev2) totalOrders = sc.textFile("/user/cloudera/sqoop_import/order_items"). map(lambda rec: int(rec.split(","))). distinct(). count() revenue/totalOrders
revenue/totalOrders will give average revenue.