Spark SQL provides pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values transposed into individual columns with distinct data. From the above DataFrame, to get the total amount exported to each country of each product will do group by Product, pivot by Country, and the sum of Amount.
val empDF = spark.createDataFrame(Seq(
(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
)).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
empDF.groupBy("job").pivot("deptno").sum("sal").show()
from pyspark.sql import functions as F
d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
mydf = spark.createDataFrame(d,['id','day','price','units'])
mydf.show()
pvtdf = mydf.withColumn('combcol', F.concat(F.lit('price_'),mydf['day'])).groupby('id'). pivot('combcol').agg(F.first('price'))
pvtdf.show()
This article shows how to 'delete' rows/data from Spark data frame using Python. I added double quotes to word "Delete" because we are not really deleting the data. Because of Spark's lazy evaluation mechanism for transformations, it is very different from creating a data frame in memory with data and then physically deleting some rows from it.
from pyspark.sql import SparkSession
appName = "Python Example - 'Delete' Data from DataFrame"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
# List
data = [{"Category": 'Category A', "ID": 1, "Value": 12.40},
{"Category": 'Category B', "ID": 2, "Value": 30.10},
{"Category": 'Category C', "ID": 3, "Value": 100.01}
]
# Create data frame
df = spark.createDataFrame(data)
print(df.schema)
# Delete/Remove data from the dataframe
df2 = df.where("Category <> 'Category B'")
df2.show()
PySpark DataFrame APIs provide two drop related methods: drop and dropDuplicates (or drop_duplicates). The former is used to drop specified column(s) from a DataFrame while the latter is used to drop duplicated rows.
from pyspark.sql import SparkSession
appName = "PySpark drop and dropDuplicates"
master = "local"
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Create a dataframe
df = spark.sql("""SELECT ACCT, TXN_DT, AMT FROM VALUES
(101,10.01, DATE'2021-01-01'),
(101,10.01, DATE'2021-01-01'),
(101,102.01, DATE'2021-01-01')
AS TXN(ACCT,AMT,TXN_DT)""")
print(df.schema)
# Use drop function
df.drop('TXN_DT').show(truncate=False)
import org.apache.spark.sql.functions._
import spark.implicits._
val data = spark.sparkContext.parallelize(Seq(
(1, "A", List(1,2,3)),
(2, "B", List(3, 5))
)).toDF("FieldA", "FieldB", "FieldC")
data.show
data.withColumn("ExplodedField", explode($"FieldC")).drop("FieldC").show
Dataframe concat string
No comments:
Post a Comment