Datasets

Dataset is a data structure in SparkSQL which is strongly typed and is a map to a relational schema. It represents structured queries with encoders. It is an extension to data frame API. Spark Dataset provides both type safety and object-oriented programming interface. We encounter the release of the dataset in Spark 1.6. The encoder is primary concept in serialization and deserialization (SerDes) framework in Spark SQL. Encoders translate between JVM objects and Spark’s internal binary format. Spark has built-in encoders which are very advanced. They generate bytecode to interact with off-heap data.

An encoder provides on-demand access to individual attributes without having to de-serialize an entire object. To make input-output time and space efficient, Spark SQL uses the SerDe framework. Since encoder knows the schema of record, it can achieve serialization and deserialization.

Spark Dataset is structured and lazy query expression that triggers the action. Internally dataset represents a logical plan. The logical plan tells the computational query that we need to produce the data. the logical plan is a base catalyst query plan for the logical operator to form a logical query plan. When we analyze this and resolve we can form a physical query plan.

Dataset clubs the features of RDD and DataFrame. It provides:
  • The convenience of RDD.
  • Performance optimization of DataFrame.
  • Static type-safety of Scala.

Thus, Datasets provides a more functional programming interface to work with structured data

 Need of Dataset in Spark
To overcome the limitations of RDD and Dataframe, Dataset emerged. In DataFrame, there was no provision for compile-time type safety. Data cannot be altered without knowing its structure. In RDD there was no automatic optimization. So for optimization, we do it manually when needed.

To create a Dataset we need:
  • SparkSession :- SparkSession is the entry point to the SparkSQL. It is a very first object that we create while developing Spark SQL applications using fully typed Dataset data abstractions. Using SparkSession.Builder, we can create an instance of SparkSession. And can stop SparkSession using the stop method (spark.stop).
  • QueryExecution :- We represent structured query execution pipeline of the dataset using QueryExecution. To access QueryExecution of a Dataset use QueryExecution attribute. By executing a logical plan in Spark Session we get QueryExecution. executePlan(plan: LogicalPlan): QueryExecution executePlan executes the input LogicalPlan to produce a QueryExecution in the current SparkSession.
  • Encoder :- An encoder provides conversion between tabular representation and JVM objects. With the help of the encoder, we serialize the object. Encoder serializes objects for processing or transmitting over the network encoders.
Diffrence between Spark DataFrame and Dataset's

Spark Release
DataFrame-  In Spark 1.3 Release, dataframes are introduced.
DataSets-  In Spark 1.6 Release, datasets are introduced.

Data Formats
Dataframes organizes the data in the named column. Basically, dataframes can efficiently process unstructured and structured data. Also, allows the Spark to manage schema.

DataSets-  As similar as dataframes, it also efficiently processes unstructured and structured data. Also, represents data in the form of a collection of row object or JVM objects of row. Through encoders, is represented in tabular forms.

Data Representation
DataFrame-  In dataframe data is organized into named columns. Basically,  it is as same as a table in a relational database.

DataSets-  As we know, it is an extension of dataframe API, which provides the functionality of type-safe, object-oriented programming interface of the RDD API. Also, performance benefits of the Catalyst query optimizer.

Compile-time type safety
DataFrame- There is a case if we try to access the column which is not on the table. Then, dataframe APIs does not support compile-time error.

DataSets- Datasets offers compile-time type safety.

Data Sources API
DataFrame- It allows data processing in different formats, for example, AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL.

DataSets- It also supports data from different sources.

Immutability and Interoperability
DataFrame- Once transforming into dataframe, we cannot regenerate a domain object.

DataSets- Datasets overcomes this drawback of dataframe to regenerate the RDD from dataframe. It also allows us to convert our existing RDD and dataframes into datasets.

Efficiency/Memory use
DataFrame- By using off-heap memory for serialization, reduce the overhead.
DataSets-  It allows to perform an operation on serialized data. Also, improves memory usage.

Serialization
DataFrame- In dataframe, can serialize data into off-heap storage in binary format. Afterwards, it performs many transformations directly on this off-heap memory.

DataSets-  In Spark, dataset API has the concept of an encoder. Basically, it handles conversion between JVM objects to tabular representation. Moreover, by using spark internal tungsten binary format it stores, tabular representation. Also, allows to perform an operation on serialized data and also improves memory usage.

Lazy Evolution
DataFrame- As same as RDD, Spark evaluates dataframe lazily too.

DataSets- As similar to RDD, and Dataset it also evaluates lazily.

Optimization
DataFrame- Through spark catalyst optimizer, optimization takes place in dataframe.

DataSets-  For optimizing query plan, it offers the concept of dataframe catalyst optimizer.

Schema Projection
DataFrame- Through the Hive meta store, it auto-discovers the schema. We do not need to specify the schema manually.

DataSets-  Because of using spark SQL engine, it auto discovers the schema of the files.

Programming Language Support
DataFrame-  In 4 languages like Java, Python, Scala, and R dataframes are available.

DataSets- Only available in Scala and Java.

Usage of Datasets and Dataframes

DataFrame-
If low-level functionality is there.
Also, if high-level abstraction is required.

DataSets-
For high-degree safety at runtime.
To take advantage of typed JVM objects.
Also, take advantage of the catalyst optimizer.
To save space.
It required faster execution.



Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.

When working with Hive, one must instantiate SparkSession with Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions. Users who do not have an existing Hive deployment can still enable Hive support. When not configured by the hive-site.xml, the context automatically creates metastore_db in the current directory and creates a directory configured by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse in the current directory that the Spark application is started. Note that the hive.metastore.warehouse.dir property in hive-site.xml is deprecated since Spark 2.0.0. Instead, use spark.sql.warehouse.dir to specify the default location of database in warehouse. You may need to grant write privilege to the user who starts the Spark application.

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

Create Spark Session
val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

 select e.emp_no,e.ename,d.dept_no,d.d_name,max(e.salary)
from emp e inner join dept d
on e.dept_no=d.dept_no
group by e.emp_no,e.ename,d.dept_no,d.d_name



val spark = SparkSession.builder().appName("Spark Hive Example").enableHiveSupport().getOrCreate()

case class employee(emp_no:Int,ename:String,job:String,mgr_id:Int,date_of_joining:String,salary:Int,bonus:Int,dept_no:Int)

case class department(dept_no:Int,d_name:String,l_loc:String)

val emp = sc.textFile("/Data/emp.txt").map(_.split(",")).map(e => employee(e(0).toInt,e(1),e(2),e(3).toInt,e(4),e(5).toInt,e(6).toInt,e(7).toInt)).toDF()

val dept = sc.textFile("/Data/dept.txt").map(_.split(",")).map(d => department(d(0).toInt,d(1),d(2))).toDF()


emp.select("emp_no","ename","dept_no")

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val Schema = StructType(Seq(StructField("userId", StringType, true), StructField("color", StringType, true), StructField("count", IntegerType, true)))
case class AbTest(userId: String, color: String, count: Integer)
val result = spark.read.schema(Schema).option("delimiter", ",").csv("/Data/data2.txt").as[AbTest]
val finalResult = result.groupBy("color").sum("count")
val output = finalResult.show()

what is the difference between dataframe and dataset?
Dataframe is untyped (throw an exception at runtime in case of any error in the schema mismatch)
Dataset is typed(throw an exception at compile time in case of any error in the schema mismatch)

what are all the memory tuning parameters and how to achieve parallelism in spark?

  • leverage the Tungsten engine.
  • spark job execution plan analysis.
  • caching and data broadcasting and accumulating the data using multiple optimization techniques in spark.

No comments:

Post a Comment