Broadcast
join is very efficient for joins between a large dataset with a small
dataset. Spark splits up data on different nodes in a cluster so multiple
computers can process data in parallel.Broadcast joins are easier to run on a cluster. Spark can
“broadcast” a small DataFrame by sending all the data in that small
DataFrame to all nodes in the cluster. After the small DataFrame is
broadcasted, Spark can perform a join without shuffling any of the data
in the large DataFrame.
Let say Table B is a smaller table, means that it will be
broadcast to each executor node. Table A of each partition will be
through the block manager to get Table A data. According to each record
of the Join Key to take the corresponding record in Table B, according
to the Join Type to operate.
- Table needs to be broadcast less than spark.sql.autoBroadcastJoinThreshold the configured value, default 10MB.
- Base table can not be broadcast, such as the left outer join, only broadcast the right table
case class Employee(name:String, age:Int, depId: String)
case class Department(id: String, name: String)
val empRDD = sc.parallelize(
Seq(Employee("Mary", 33, "IT"),
Employee("Paul", 45,"IT"),
Employee("Peter",26,"MKT"),
Employee("Jon",34, "MKT"),
Employee("Sarah", 29, "IT"),
Employee("Steve", 21, "Intern") ))
val deptRDD = sc.parallelize(
Seq(Department("IT", "IT Department"),
Department("MKT", "Marketing Department"),
Department("FIN", "Finance & Controlling")))
val empDF = empRDD.toDF
val deptDF = deptRDD.toDF
Result without Broadcast Variable
import org.apache.spark.sql.functions.broadcast
val res = empDF.join(deptDF,$"depId"===$"id","inner").explain()
val res = empDF.join(deptDF,$"depId"===$"id","inner").explain()
Result Broadcast Variable
import org.apache.spark.sql.functions.broadcast
val res = empDF.join(broadcast(deptDF),$"depId"===$"id","inner").explain()
val res = empDF.join(broadcast(deptDF),$"depId"===$"id","inner").explain()
Configuring Spark Auto Broadcast join.
We can provide the max size of DataFrame as a threshold for automatic broadcast join detection in Spark. This can be set up by using autoBroadcastJoinThreshold configuration in Spark SQL conf. Its value purely depends on the executor’s memory.
//Enable broadcast Join and
//Set Threshold limit of size in bytes of a dataFrame to broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)
//Disable broadcast Join.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
No comments:
Post a Comment