How to select column(s) from the DataFrame
df.select($"street",$"citiy",$"zip").show
How to select column(s) from the DataFrame
df.selectExpr($"street",$"citiy",$"cast("zip as zip").show
How to isin function DataFrames
val items = List(101,102,103)
df1.filter($"id" in(items:_*)).show
How to do the casting in DataFrame
import org.apache.spark.sql.DataFrame
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("sep",",")
.load("/FileStore/tables/real_state.csv")
val df1=df.select($"street",$"zip".cast("String"),$"state").printSchema
val df2=df.selectExpr("cast(zip as String) zip","street").show
val df1 = spark.range(5).toDF("id")
df1.select ($"id".cast("String")).show
Spark DataFrame Replace String
emp.select( $"Name",translate($"Name","J","Z").as("New_Name")).show()
emp.select( $"Name",translate($"Name","Jhon","Martin").as("New_Name")).show()
Concat two string
import org.apache.spark.sql.functions.{concat, lit}
df1.select(concat($"id",lit("-"),$"name")).show
import org.apache.spark.sql.functions.{concat_ws}
df1.select(concat_ws("-",$"id",$"name")).show
Spark DataFrame Explode
val df = Seq((1,"abc",Array("p","q","r")),(2,"def",Array("x","y","z"))).toDF("id","col1","col2")
df.withColumn("col2",explode($"col2")).show()
val df = spark.read.option("header","true").option("inferschema","true").csv("country_prod.csv")
val output = df.withColumn("country",when(col("COUNTRY")==="IN","INDIA").otherwise("SINGAPORE") ).groupBy("PRODUCT").pivot("country").sum("revenue")
output.show()
Spark DF Subtraction
Below is a simple code which self sufficiently explains Data Frame Subtraction in Spark Scala:
Here we have created two data frames df1 & df2 & done operation on them:
val df1 = sc.parallelize(Seq(("a" -> 1), ("b" -> 2), ("c" -> 3), ("d" -> 4))).toDF("c1", "c2")
val df2 = sc.parallelize(Seq((“a” -> 1), (“b” -> 1), (“e” -> 4))).toDF(“c1”, “c2”)
df1.registerTempTable("temp_table_df1")
df2.registerTempTable(“temp_table_df2”)
df1.show
+—+—+
| c1| c2|
+—+—+
| a| 1|
| b| 2|
| c| 3|
| d| 4|
+—+—+
df2.show
+—+—+
| c1| c2|
+—+—+
| a| 1|
| b| 1|
| e| 4|
+—+—+
df1.except(df2).show
+—+—+
| c1| c2|
+—+—+
| d| 4|
| c| 3|
| b| 2|
+—+—+
val df3=df1.drop(“c2”)
df3.show
+—+
| c1|
+—+
| a|
| b|
| c|
| d|
+—+
val df4=df2.drop(“c2”)
df4.show
+—+
| c1|
+—+
| a|
| b|
| e|
+—+
df3.except(df4).show
+—+
| c1|
+—+
| c|
| d|
+—+
df3.except(df4).count()
Long = 2
Explode in Spark
val df = Seq((1,"abc",Array("p","q","r")),(2,"def",Array("x","y","z"))).toDF("id","col1","col2")
df.printSchema()
df.show()
df.withColumn("col2",explode($"col2")).show()
Here we have created two data frames df1 & df2 & done operation on them:
val df1 = sc.parallelize(Seq(("a" -> 1), ("b" -> 2), ("c" -> 3), ("d" -> 4))).toDF("c1", "c2")
val df2 = sc.parallelize(Seq((“a” -> 1), (“b” -> 1), (“e” -> 4))).toDF(“c1”, “c2”)
df1.registerTempTable("temp_table_df1")
df2.registerTempTable(“temp_table_df2”)
df1.show
+—+—+
| c1| c2|
+—+—+
| a| 1|
| b| 2|
| c| 3|
| d| 4|
+—+—+
df2.show
+—+—+
| c1| c2|
+—+—+
| a| 1|
| b| 1|
| e| 4|
+—+—+
df1.except(df2).show
+—+—+
| c1| c2|
+—+—+
| d| 4|
| c| 3|
| b| 2|
+—+—+
val df3=df1.drop(“c2”)
df3.show
+—+
| c1|
+—+
| a|
| b|
| c|
| d|
+—+
val df4=df2.drop(“c2”)
df4.show
+—+
| c1|
+—+
| a|
| b|
| e|
+—+
df3.except(df4).show
+—+
| c1|
+—+
| c|
| d|
+—+
df3.except(df4).count()
Long = 2
Explode in Spark
val df = Seq((1,"abc",Array("p","q","r")),(2,"def",Array("x","y","z"))).toDF("id","col1","col2")
df.printSchema()
df.show()
df.withColumn("col2",explode($"col2")).show()
Regexe_replace
regexp_replace(x, pattern, replacement)
## S4 method for signature 'Column,character,character'
regexp_replace(x, pattern, replacement)
How to get the summary statistics (mean, standard deviance, min ,max, count) of numerical columns in a DataFrame?
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("sep",",")
.load("/FileStore/tables/real_state.csv")
df.describe().show
select SUBSTR('pundrikv@gmail.com', INSTR('pundrikv@gmail.com', '@'),INSTR('pundrikv@gmail.com', '.'))
df.select("name").show()
Create Data Frame from Different Source file
There are different ways to store the DataFram into the hive table.
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("sep",",")
.load("/FileStore/tables/real_state.csv")
df.describe().show
select SUBSTR('pundrikv@gmail.com', INSTR('pundrikv@gmail.com', '@'),INSTR('pundrikv@gmail.com', '.'))
df.select("name").show()
df.select("name","age","sal","address").show()
using createDataFrame from sparkSession
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val df=spark.createDataFrame(data)Create Data Frame from Different Source file
Data Frames
loaded from any data source type can be converted into other types
using this syntax.you can also use their short names (json, parquet,
jdbc, orc, libsvm, csv, text)
Create DataFrame Read Json File
val userDF=spark.read.format("orc").load("/Data/nameorc")
usersDF.select("id","first_name","last_name").write.format("json").save("namejson")
usersDF.select("id","first_name","last_name").write.format("orc").save("/Data/nameorc")
usersDF.select("id","first_name","last_name").write.format("orc").save("/Data/nameorc")
val usersDF = spark.read.load("/D/users.parquet")
usersDF.select("name","favorite_color").write.save("namesAndFavColors")
Note:-
This store the data in parquet File format. Because Parquet is a
columnar format that is supported by many other data processing systems.
Spark SQL provides support for both reading and writing Parquet files
that automatically preserves the schema of the original data. When
writing Parquet files, all columns are automatically converted to be
nullable for compatibility reasons.
Save Modes
Save
operations can optionally take a SaveMode, that specifies how to handle
existing data if present. It is important to realize that these save
modes do not utilize any locking and are not atomic. Additionally, when
performing an Overwrite, the data will be deleted before writing out the
new data.
SaveMode.ErrorIfExists (default) :- When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
SaveMode.Append:-When
saving a DataFrame to a data source, if data/table already exists,
contents of the DataFrame are expected to be appended to existing data.
SaveMode.Overwrite:- Overwrite
mode means that when saving a DataFrame to a data source, if data/table
already exists, existing data is expected to be overwritten by the
contents of the DataFrame.
SaveMode.Ignore:-Ignore
mode means that when saving a DataFrame to a data source, if data
already exists, the save operation is expected not to save the contents
of the DataFrame and not to change the existing data. This is similar to
a CREATE TABLE IF NOT EXISTS in SQL.
Store DataFrame Value into TableThere are different ways to store the DataFram into the hive table.
- df.write().mode("overwrite").saveAsTable("schemaName.tableName");
- df.select(df.col("col1"),df.col("col2"),df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
- df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
- spark.sql("create table mytable as select * from mytempTable");
How to alter column in existing data dataframe
val df1=df.withColumn("Total_Sal",df.col("sal")+500)
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
val df2=df.withColumn("TotalSal", 'Salary+100).show
val df2=df.withColumn("TotalSal", col("Salary")+ lit(100)).show
val dfAfterColRename= dfAfterDrop.withColumnRenamed("oldColumnName","new ColumnName")
val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
val df1=df.withColumn("Total_Sal",df.col("sal")+500)
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
val df2=df.withColumn("TotalSal", 'Salary+100).show
val df2=df.withColumn("TotalSal", col("Salary")+ lit(100)).show
Using lit() function on DataFrame select create a new column with default value.
val data = Seq(("111",5,50000),("222",6,60000),("333",7,60000))
val df = data.toDF("EmpId","Experience","Salary")
val df2 = df.select(col("EmpId"),col("Experience"),col("Salary"),lit("1").as("NewColumn"))
Lit() can also be used on withColumn to derive a new column based on existing columns.
val df3 = df2.withColumn("Level",
when(col("Experience") > 1 && col("Experience") < 6, lit("100").cast(IntegerType))
.otherwise(lit("200").cast(IntegerType))
val initialDf= ....
val dfAfterDrop=initialDf.drop("column1").drop("coumn2")val dfAfterColRename= dfAfterDrop.withColumnRenamed("oldColumnName","new ColumnName")
val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
df1.join(df2,df1("user_id") === df2("valid_id")).show
employee.withColumn("Tota_sal",($"sal" + $"comm")).select("*").show
employee.withColumn("Tota_sal",($"sal" + $"comm")).drop($"sal").select("*").show
df1.join(df2,df1("user_id") === df2("valid_id")).show
employee.withColumn("Tota_sal",($"sal" + $"comm")).select("*").show
employee.withColumn("Tota_sal",($"sal" + $"comm")).drop($"sal").select("*").show
val session = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark CSV Reader").getOrCreate;
save the data in a table by registering it in a temp table
df.registerTempTable("hospital_charges")
Select everybody, but increment the age by 1:
df.select(df("name"), df("age") + 1).show()
Count people by age:
df.groupBy("age").count().show()
df.createOrReplaceTempView("people")
// Register the DataFrame as a SQL Hive table
myDf.createOrReplaceTempView("mytempTable")
sqlContext.sql("create table mytable as select * from mytempTable");
df.write().saveAsTable(tableName)
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
peopleDFCsv.select("name","age").write.format("parquet").save("namesAndAges.parquet")
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
peopleDFCsv.select("name","age").write.format("parquet").save("namesAndAges.parquet")
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
Examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object checkDFSchema extends App {
val cc = new SparkConf;
val sc = new SparkContext(cc)
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
//First option for creating hive table through dataframe
val DF = sparkSession.sql("select * from salary")
DF.createOrReplaceTempView("tempTable")
sparkSession.sql("Create table yourtable as select * form tempTable")
//Second option for creating hive table from schema
val oldDFF = sparkSession.sql("select * from salary")
//Generate the schema out of dataframe
val schema = oldDFF.schema
//Generate RDD of you data
val rowRDD = sc.parallelize(Seq(Row(100, "a", 123)))
//Creating new DF from data and schema
val newDFwithSchema = sparkSession.createDataFrame(rowRDD, schema)
newDFwithSchema.createOrReplaceTempView("tempTable")
sparkSession.sql("create table FinalTable AS select * from tempTable")
}
val rdd = sc.textFile("address of your file")
rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_ + _).collect
No comments:
Post a Comment