Introduction to sorting and ranking by key
Sorting can be broadly categorized into global and by key. As part of this topic we will covering sorting – by key.
- Load data from HDFS and store results back to HDFS using Spark
- Join disparate datasets together using Spark
- Calculate aggregate statistics (e.g., average or sum) using Spark
- Filter data into a smaller dataset using Spark
- Write a query that produces ranked or sorted data using Spark
Here is the video which covers basic python list operations as well as Spark groupByKey transformation.
Sorting and ranking by key – groupByKey
Here is the video which covers by key sorting and ranking using pyspark.
- Requirement: Sort products by price with in each category
- Read data from HDFS and apply map function to define key which is category
- Apply groupByKey to group all the products in the category
products = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = products.map(lambda rec: (rec.split(","), rec)) productsGroupBy = productsMap.groupByKey() for i in productsGroupBy.collect(): print(i)
- Get data sorted by product price per category
- With in each group use sorted function to sort the data in ascending or descending order
for i in productsGroupBy. map(lambda rec: sorted(rec, key=lambda k: float(k.split(",")))). take(100): print(i) for i in productsGroupBy. map(lambda rec: sorted(rec, key=lambda k: float(k.split(",")), reverse=True)). take(100): print(i)
- Requirement: Get top 3 priced products in each category
- Develop python function which get RDD and topN as parameters
- Compute top 3 priced products (if there are 10 products with top 3 prices, the RDD should give us all 10 products)
def getTopDenseN(rec, topN): x = [ ] topNPrices = [ ] prodPrices = [ ] prodPricesDesc = [ ] for i in rec: prodPrices.append(float(i.split(","))) prodPricesDesc = list(sorted(set(prodPrices), reverse=True)) import itertools topNPrices = list(itertools.islice(prodPricesDesc, 0, topN)) for j in sorted(rec, key=lambda k: float(k.split(",")), reverse=True): if(float(j.split(",")) in topNPrices): x.append(j) return (y for y in x)
- Here is the code which invokes getTopDenseN
products = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = products.map(lambda rec: (rec.split(","), rec)) for i in productsMap. groupByKey(). flatMap(lambda x: getTopDenseN(x, 2)). collect(): print(i)
Sorting and ranking using spark sql
Here is the video which covers ranking using Spark SQL using windowing functions. This might not be very important for CCA certification.
- Different queries to perform sorting and ranking using Spark SQL.
#By key sorting #Using order by is not efficient, it serializes select * from products order by product_category_id, product_price desc; #Using distribute by sort by (to distribute sorting and scale it up) select * from products distribute by product_category_id sort by product_price desc; #By key ranking (in Hive we can use windowing/analytic functions) select * from (select p.*, dense_rank() over (partition by product_category_id order by product_price desc) dr from products p distribute by product_category_id) q where dr <= 2 order by product_category_id, dr;