### Introduction to sorting and ranking by key

Sorting can be broadly categorized into global and by key. As part of this topic we will covering sorting – by key.

- Load data from HDFS and store results back to HDFS using Spark
- Join disparate datasets together using Spark
- Calculate aggregate statistics (e.g., average or sum) using Spark
- Filter data into a smaller dataset using Spark
**Write a query that produces ranked or sorted data using Spark**

Here is the video which covers basic python list operations as well as Spark groupByKey transformation.

### Sorting and ranking by key – groupByKey

Here is the video which covers by key sorting and ranking using pyspark.

- Requirement: Sort products by price with in each category
- Read data from HDFS and apply map function to define key which is category
- Apply groupByKey to group all the products in the category

products = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = products.map(lambda rec: (rec.split(",")[1], rec)) productsGroupBy = productsMap.groupByKey() for i in productsGroupBy.collect(): print(i)

- Get data sorted by product price per category

- With in each group use sorted function to sort the data in ascending or descending order

for i in productsGroupBy. map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))). take(100): print(i) for i in productsGroupBy. map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True)). take(100): print(i)

- Requirement: Get top 3 priced products in each category
- Develop python function which get RDD and topN as parameters
- Compute top 3 priced products (if there are 10 products with top 3 prices, the RDD should give us all 10 products)

def getTopDenseN(rec, topN): x = [ ] topNPrices = [ ] prodPrices = [ ] prodPricesDesc = [ ] for i in rec[1]: prodPrices.append(float(i.split(",")[4])) prodPricesDesc = list(sorted(set(prodPrices), reverse=True)) import itertools topNPrices = list(itertools.islice(prodPricesDesc, 0, topN)) for j in sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True): if(float(j.split(",")[4]) in topNPrices): x.append(j) return (y for y in x)

- Here is the code which invokes getTopDenseN

products = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = products.map(lambda rec: (rec.split(",")[1], rec)) for i in productsMap. groupByKey(). flatMap(lambda x: getTopDenseN(x, 2)). collect(): print(i)

### Sorting and ranking using spark sql

Here is the video which covers ranking using Spark SQL using windowing functions. This might not be very important for CCA certification.

- Different queries to perform sorting and ranking using Spark SQL.

#By key sorting #Using order by is not efficient, it serializes select * from products order by product_category_id, product_price desc; #Using distribute by sort by (to distribute sorting and scale it up) select * from products distribute by product_category_id sort by product_price desc; #By key ranking (in Hive we can use windowing/analytic functions) select * from (select p.*, dense_rank() over (partition by product_category_id order by product_price desc) dr from products p distribute by product_category_id) q where dr <= 2 order by product_category_id, dr;

Ivan says

Hi,

write functions as getTopDenseN is important for certification?

Regards

Training Itversity says

No, it is not important for the certification.

Manish Gupta says

Question in regard to DenseRank Computation :-

Why do we have to use flatMap instead of map here. Our Input data is already groupByKey and thus we could have done same thing using simply map. My logic works if i follow your video to do flatMap but does not work with map in last step. Here is my code :-

productsBaseRdd=sc.textFile(Practice_Read_Db_Data_Loc + “products/”).filter(lambda rec:int(rec.split(“,”)[0]) != 685)

productGroupByCategoryId = productsBaseRdd.map(lambda rec: ( int(rec.split(“,”)[1]), rec ) ).groupByKey()

works with flatMap but not with map.

topNPricedProductsPerCategoryId = productGroupByCategoryId.flatMap(lambda rec : getTopNPricedProductsPerCategoryId(rec,3))

Referred Video = https://www.youtube.com/watch?v=BkAPD0xSaEU&list=PLf0swTFhTI8rT3ApjBqt338MCO0ZvReFt&index=91