Window aggregate functions (aka window functions or windowed aggregates) are functions that perform a calculation over a group of records called window that are in some relation to the current record (i.e. can be in the same partition or frame as the current row). Window functions are also called over functions due to how they are applied using over operator.
Spark Sql Supports three kinds of window functions:
- Aggregate functions:-(sum, avg, min, max and count)
- Ranking functions:- (rank,dense_rank,percent_rank,row_number,ntile)
- Analytic functions:- (cume_dist,lag,lead)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
Create DataFrame
val empDF = spark.createDataFrame(Seq(
(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
)).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
row_number(): name",It is gives the sequential numer starting with one for each window partation.
row_number for each department
select
empno
,deptno
,sal,
row_number() over (partition by deptno order by sal desc) as row_num
from emp;
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("sequence_no",row_number.over(partitionWindow)).show
Find rank for each department
select
empno
,deptno
,sal,
rank() over (partition by deptno order by sal desc) as row_num
from emp;
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("Rank",rank.over(partitionWindow)).show
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
Create DataFrame
val empDF = spark.createDataFrame(Seq(
(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
)).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
row_number(): name",It is gives the sequential numer starting with one for each window partation.
row_number for each department
select
empno
,deptno
,sal,
row_number() over (partition by deptno order by sal desc) as row_num
from emp;
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("sequence_no",row_number.over(partitionWindow)).show
Find rank for each department
select
empno
,deptno
,sal,
rank() over (partition by deptno order by sal desc) as row_num
from emp;
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("Rank",rank.over(partitionWindow)).show
Find dense rank for each department
select
empno
,deptno
,sal,
dense_rank() over (partition by deptno order by sal desc) as row_num
from emp;
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
empDF.withColumn("sequence_no",dense_rank.over(partitionWindow)).show
No comments:
Post a Comment