To calculate sum, min, max for each department using Spark SQL Aggregate window functions and WindowSpec. When working with Aggregate functions, we don’t need to use order by clause.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
Create DataFrame
val empDF = spark.createDataFrame(Seq(
(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
)).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
Window Aggregate Functions
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
empDF.withColumn("total_sal",sum("sal") over(Window.partitionBy($"deptno"))).select($"deptno",$"total_sal").distinct().show
import org.apache.spark.sql.functions._
empDF.withColumn("total_sal",sum("sal") over(Window.partitionBy($"deptno"))).select($"deptno",$"total_sal").distinct().show
Find max salary for each depatrment
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
empDF.withColumn("max_sal",max("sal") over(Window.partitionBy($"deptno"))).select($"deptno",$"max_sal").distinct().show
import org.apache.spark.sql.functions._
empDF.withColumn("max_sal",max("sal") over(Window.partitionBy($"deptno"))).select($"deptno",$"max_sal").distinct().show
Window Aggregate Functions
import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.partitionBy($"job")
empDF.withColumn("total_sal", sum('sal) over partitionWindow)
.withColumn("max_sal", max('sal) over partitionWindow)
.withColumn("min_sal", min('sal) over partitionWindow)
.withColumn("avg_sal", avg('sal) over partitionWindow)
.withColumn("row_no",row_number.over(partitionWindow orderBy($"sal".desc)))
.where(col("row_no")===1)
.select("job","Total_sal","max_sal","min_sal","avg_sal").show
Find Third highest salary for each department
val partitionWindow = Window.partitionBy($"deptno")
empDF.withColumn("row_no",row_number.over(partitionWindow orderBy($"sal".desc))) .where(col("row_no")===3).select("*").show
No comments:
Post a Comment