DataFrame Operations


How to select column(s) from the DataFrame
df.select($"street",$"citiy",$"zip").show

How to select column(s) from the DataFrame
df.selectExpr($"street",$"citiy",$"cast("zip as zip").show


How to isin function DataFrames
val items =  List(101,102,103)
df1.filter($"id" in(items:_*)).show

How to do the casting in DataFrame 
import org.apache.spark.sql.DataFrame
val df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("sep",",")
  .load("/FileStore/tables/real_state.csv")


val df1=df.select($"street",$"zip".cast("String"),$"state").printSchema

val df2=df.selectExpr("cast(zip as String) zip","street").show
val df1 = spark.range(5).toDF("id")
df1.select ($"id".cast("String")).show

Spark DataFrame Replace String
emp.select( $"Name",translate($"Name","J","Z").as("New_Name")).show()  
emp.select( $"Name",translate($"Name","Jhon","Martin").as("New_Name")).show() 

Concat two string 
import org.apache.spark.sql.functions.{concat, lit}
df1.select(concat($"id",lit("-"),$"name")).show


import org.apache.spark.sql.functions.{concat_ws}
df1.select(concat_ws("-",$"id",$"name")).show




Spark DataFrame Explode
val df = Seq((1,"abc",Array("p","q","r")),(2,"def",Array("x","y","z"))).toDF("id","col1","col2")
df.withColumn("col2",explode($"col2")).show()



val df = spark.read.option("header","true").option("inferschema","true").csv("country_prod.csv")
val output = df.withColumn("country",when(col("COUNTRY")==="IN","INDIA").otherwise("SINGAPORE") ).groupBy("PRODUCT").pivot("country").sum("revenue")
output.show()

Spark DF Subtraction
Below is a simple code which self sufficiently explains Data Frame Subtraction in Spark Scala:
Here we have created two data frames df1 & df2 & done operation on them:

val df1 = sc.parallelize(Seq(("a" -> 1), ("b" -> 2), ("c" -> 3), ("d" -> 4))).toDF("c1", "c2")
val df2 = sc.parallelize(Seq((“a” -> 1), (“b” -> 1), (“e” -> 4))).toDF(“c1”, “c2”)

df1.registerTempTable("temp_table_df1")
df2.registerTempTable(“temp_table_df2”)

df1.show
+—+—+
| c1| c2|
+—+—+
| a| 1|
| b| 2|
| c| 3|
| d| 4|
+—+—+

df2.show
+—+—+
| c1| c2|
+—+—+
| a| 1|
| b| 1|
| e| 4|
+—+—+

df1.except(df2).show
+—+—+
| c1| c2|
+—+—+
| d| 4|
| c| 3|
| b| 2|
+—+—+

val df3=df1.drop(“c2”)
df3.show
+—+
| c1|
+—+
| a|
| b|
| c|
| d|
+—+

val df4=df2.drop(“c2”)
df4.show
+—+
| c1|
+—+
| a|
| b|
| e|
+—+

df3.except(df4).show

+—+
| c1|
+—+
| c|
| d|
+—+

df3.except(df4).count()
Long = 2



Explode in Spark
val df = Seq((1,"abc",Array("p","q","r")),(2,"def",Array("x","y","z"))).toDF("id","col1","col2")
df.printSchema()
df.show()
df.withColumn("col2",explode($"col2")).show()

Regexe_replace
regexp_replace(x, pattern, replacement)
## S4 method for signature 'Column,character,character'
regexp_replace(x, pattern, replacement)


How to get the summary statistics (mean, standard deviance, min ,max, count) of numerical columns in a DataFrame?

   val df = spark.read.format("csv") 
  .option("header", "true") 
  .option("inferSchema", "true") 
  .option("sep",",") 
  .load("/FileStore/tables/real_state.csv")

  df.describe().show

select SUBSTR('pundrikv@gmail.com', INSTR('pundrikv@gmail.com', '@'),INSTR('pundrikv@gmail.com', '.'))


df.select("name").show()
df.select("name","age","sal","address").show()
using createDataFrame from sparkSession
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val df=spark.createDataFrame(data)

Create Data Frame from  Different Source file 
Data Frames loaded from any data source type can be converted into other types using this syntax.you can also use their short names (json, parquet, jdbc, orc, libsvm, csv, text)

Create DataFrame Read Json File
val userDF=spark.read.format("orc").load("/Data/nameorc")
usersDF.select("id","first_name","last_name").write.format("json").save("namejson")
usersDF.select("id","first_name","last_name").write.format("orc").save("/Data/nameorc")
val usersDF = spark.read.load("/D/users.parquet")
usersDF.select("name","favorite_color").write.save("namesAndFavColors")

Note:- This store the data in parquet File format. Because Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.


Save Modes
Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.

SaveMode.ErrorIfExists (default) :- When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

SaveMode.Append:-When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

SaveMode.Overwrite:- Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

SaveMode.Ignore:-Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.
Store DataFrame Value into Table
There are different ways to store the DataFram into the hive table.
  • df.write().mode("overwrite").saveAsTable("schemaName.tableName");
  • df.select(df.col("col1"),df.col("col2"),df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
  • df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
  • spark.sql("create table mytable as select * from mytempTable");
How to alter column in existing data dataframe

val df1=df.withColumn("Total_Sal",df.col("sal")+500)
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
val df2=df.withColumn("TotalSal", 'Salary+100).show
val df2=df.withColumn("TotalSal", col("Salary")+ lit(100)).show
Using lit() function on DataFrame select create a new column with default value.
val data = Seq(("111",5,50000),("222",6,60000),("333",7,60000))
val df = data.toDF("EmpId","Experience","Salary")
val df2 = df.select(col("EmpId"),col("Experience"),col("Salary"),lit("1").as("NewColumn"))

Lit() can also be used on withColumn to derive a new column based on existing columns.

val df3 = df2.withColumn("Level",
      when(col("Experience") > 1 && col("Experience") < 6, lit("100").cast(IntegerType))
        .otherwise(lit("200").cast(IntegerType))

val initialDf= ....
val dfAfterDrop=initialDf.drop("column1").drop("coumn2")

val dfAfterColRename= dfAfterDrop.withColumnRenamed("oldColumnName","new ColumnName")
val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
df1.join(df2,df1("user_id") === df2("valid_id")).show
employee.withColumn("Tota_sal",($"sal" + $"comm")).select("*").show
employee.withColumn("Tota_sal",($"sal" + $"comm")).drop($"sal").select("*").show

val session = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark CSV Reader").getOrCreate;

save the data in a table by registering it in a temp table

df.registerTempTable("hospital_charges")

Select everybody, but increment the age by 1:

df.select(df("name"), df("age") + 1).show()

Count people by age:
df.groupBy("age").count().show()
df.createOrReplaceTempView("people")

 // Register the DataFrame as a SQL  Hive table

myDf.createOrReplaceTempView("mytempTable")
sqlContext.sql("create table mytable as select * from mytempTable");
df.write().saveAsTable(tableName)

val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")


peopleDFCsv.select("name","age").write.format("parquet").save("namesAndAges.parquet")


// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._



Examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object checkDFSchema extends App {
  val cc = new SparkConf;
  val sc = new SparkContext(cc)
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
  //First option for creating hive table through dataframe
  val DF = sparkSession.sql("select * from salary")
  DF.createOrReplaceTempView("tempTable")
  sparkSession.sql("Create table yourtable as select * form tempTable")
  //Second option for creating hive table from schema
  val oldDFF = sparkSession.sql("select * from salary")
  //Generate the schema out of dataframe
  val schema = oldDFF.schema
  //Generate RDD of you data
  val rowRDD = sc.parallelize(Seq(Row(100, "a", 123)))
  //Creating new DF from data and schema
  val newDFwithSchema = sparkSession.createDataFrame(rowRDD, schema)
  newDFwithSchema.createOrReplaceTempView("tempTable")
  sparkSession.sql("create table FinalTable AS select * from tempTable")
}

val rdd = sc.textFile("address of your file")
rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_ + _).collect

No comments:

Post a Comment