### Introduction to sorting and ranking by key

Sorting can be broadly categorized into global and by key. As part of this topic we will covering sorting – by key.

- Load data from HDFS and store results back to HDFS using Spark
- Join disparate datasets together using Spark
- Calculate aggregate statistics (e.g., average or sum) using Spark
- Filter data into a smaller dataset using Spark
**Write a query that produces ranked or sorted data using Spark**

Here is the video which covers basic python list operations as well as Spark groupByKey transformation.

### Sorting and ranking by key – groupByKey

Here is the video which covers by key sorting and ranking using pyspark.

- Requirement: Sort products by price with in each category
- Read data from HDFS and apply map function to define key which is category
- Apply groupByKey to group all the products in the category

products = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = products.map(lambda rec: (rec.split(",")[1], rec)) productsGroupBy = productsMap.groupByKey() for i in productsGroupBy.collect(): print(i)

- Get data sorted by product price per category

- With in each group use sorted function to sort the data in ascending or descending order

for i in productsGroupBy. map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))). take(100): print(i) for i in productsGroupBy. map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True)). take(100): print(i)

- Requirement: Get top 3 priced products in each category
- Develop python function which get RDD and topN as parameters
- Compute top 3 priced products (if there are 10 products with top 3 prices, the RDD should give us all 10 products)

def getTopDenseN(rec, topN): x = [ ] topNPrices = [ ] prodPrices = [ ] prodPricesDesc = [ ] for i in rec[1]: prodPrices.append(float(i.split(",")[4])) prodPricesDesc = list(sorted(set(prodPrices), reverse=True)) import itertools topNPrices = list(itertools.islice(prodPricesDesc, 0, topN)) for j in sorted(rec[1], key=lambda k: float(k.split(",")[4]), reverse=True): if(float(j.split(",")[4]) in topNPrices): x.append(j) return (y for y in x)

- Here is the code which invokes getTopDenseN

products = sc.textFile("/user/cloudera/sqoop_import/products") productsMap = products.map(lambda rec: (rec.split(",")[1], rec)) for i in productsMap. groupByKey(). flatMap(lambda x: getTopDenseN(x, 2)). collect(): print(i)

### Sorting and ranking using spark sql

Here is the video which covers ranking using Spark SQL using windowing functions. This might not be very important for CCA certification.

- Different queries to perform sorting and ranking using Spark SQL.

#By key sorting #Using order by is not efficient, it serializes select * from products order by product_category_id, product_price desc; #Using distribute by sort by (to distribute sorting and scale it up) select * from products distribute by product_category_id sort by product_price desc; #By key ranking (in Hive we can use windowing/analytic functions) select * from (select p.*, dense_rank() over (partition by product_category_id order by product_price desc) dr from products p distribute by product_category_id) q where dr <= 2 order by product_category_id, dr;

Ivan says

Hi,

write functions as getTopDenseN is important for certification?

Regards

Training Itversity says

No, it is not important for the certification.