Introduction to aggregating data sets using scala – totals
Aggregations can be broadly categorized into totals and by key. As part of this topic we will covering aggregations – totals.
- Load data from HDFS and store results back to HDFS using Spark
- Join disparate datasets together using Spark
- Calculate aggregate statistics (e.g., average or sum) using Spark
- Filter data into a smaller dataset using Spark
- Write a query that produces ranked or sorted data using Spark
Types of totals aggregations
- sum, average
- min, max
Spark provides actions such as count, total to compute sums. To compute average, we should be able to perform necessary operations by leveraging spark transformations and actions.
Computing totals – aggregations
This video covers computing totals using actions such as count and reduce to get count and sum.
- Get total number of records in a data set (eg: orders)
val ordersRDD = sc. textFile("/user/cloudera/sqoop_import/orders") ordersRDD.count()
- Get total revenue from order_items
val orderItemsRDD = sc. textFile("/user/cloudera/sqoop_import/order_items") val orderItemsMap = orderItemsRDD. map(rec => (rec.split(",")(4).toDouble)) orderItemsMap.take(5).foreach(println) val orderItemsReduce = orderItemsMap. reduce((acc, value) => acc + value)
- Get max priced product in products table. Cleanup that one product with product_id 685. We will look into filtering later.
hadoop fs -get /user/cloudera/sqoop_import/products #Delete the record with product_id 685 hadoop fs -put -f products/part* /user/cloudera/sqoop_import/products
- Get max priced product using reduce, see below for the implementation in the lambda function
val productsRDD = sc. textFile("/user/cloudera/sqoop_import/products") val productsMap = productsRDD.map(rec => rec) productsMap.reduce((rec1, rec2) => ( if(rec1.split(",")(4).toFloat >= rec2.split(",")(4).toFloat) rec1 else rec2) )
Computing average – aggregations
- Computing average revenue (total revenue/total distinct orders from order_items)
val revenue = sc. textFile("/user/cloudera/sqoop_import/order_items"). map(rec => rec.split(",")(4).toDouble). reduce((rev1, rev2) => rev1 + rev2) val totalOrders = sc. textFile("/user/cloudera/sqoop_import/order_items"). map(rec => rec.split(",")(1).toInt). distinct(). count()
revenue/totalOrders will give average revenue.