Window Function in spark

Window aggregate functions (aka window functions or windowed aggregates) are functions that perform a calculation over a group of records called window that are in some relation to the current record (i.e. can be in the same partition or frame as the current row). Window functions are also called over functions due to how they are applied using over operator.
Spark Sql Supports three kinds of window functions:
  • Aggregate functions:-(sum, avg, min, max and count)
  • Ranking functions:- (rank,dense_rank,percent_rank,row_number,ntile)
  • Analytic functions:- (cume_dist,lag,lead)
Package Need to imported
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

Create DataFrame
val empDF = spark.createDataFrame(Seq(
      (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
    )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")

row_number(): name",It is gives the sequential numer starting with one for each window partation.

row_number for each department
select
empno
,deptno
,sal,
row_number() over (partition by deptno order by sal desc) as row_num
from emp;

val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("sequence_no",row_number.over(partitionWindow)).show


Find rank  for each department
select
empno
,deptno
,sal,
rank() over (partition by deptno order by sal desc) as row_num
from emp;

val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("Rank",rank.over(partitionWindow)).show

Find dense rank  for each department
select
empno
,deptno
,sal,
dense_rank() over (partition by deptno order by sal desc) as row_num
from emp;

val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("sequence_no",dense_rank.over(partitionWindow)).show

No comments:

Post a Comment