Spark Action

Spark API action functions result is produced back the spark driver.

  • aggregate
  • countApprox*
  • countByValue*
  • forachPartition
  • max
  • min
  • saveAs* actions, e.g. saveAsTextFile, saveAsHadoopFile
  • takeSample
  • toLocalIterator
  • top
  • treeAggregate
  • treeReduce
collect(): The action collect() is the common and simplest operation that returns our entire RDDs content to driver program. The application of collect() is unit testing where the entire RDD is expected to fit in memory.
 
val rdd=sc.parallelize(List(1,2,3))
rdd.map(x=> (x,x,x)).collect


val rdd=sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x))
rdd.collect


foreach(): It is an action. Unlike other actions, foreach do not return any value. It simply operates on all the elements in the RDD. foreach() can be used in situations, where we do not want to return any result, but want to initiate a computation. A good example is ; inserting elements in RDD into database. Let us look at an example for foreach()

var rdd=sc.parallelize(1 to 5, 3)
rdd.collect.foreach{x=> {println(x)}}


first(): Returns the first element of the rdd.
val names = sc.parallelize(List("apple", "beatty", "beatrice"))
names.first 


sc.parallelize(1 to 20, 4).first

top(): top() & takeOrdered() are actions that return the N elements based on the default ordering or the Customer ordering provided by us
take(n):Returns an array of  first n elements of the rdd.
val names = sc.parallelize(List("apple", "beatty", "beatrice"))
names.take(2)


sc.parallelize(1 to 20, 4).take(5)


takeOrdered():,2,3,5 Return n elements form RDD in ascending order.
val seq = Seq(3,9,2,3,5,4)
val rdd = sc.parallelize(seq,2)
rdd.takeOrdered(4)


count(): Returns the number of elements in RDD.
val rdd = sc.parallelize(Seq(("math", 55),("math",56),("english",57),("english",58),("science", 59), ("science", 54)))
rdd.count



countByKey(): Only available on RDDs of Type (K,V) pairs with the count of each key. It takes (key, value) pair and returns (key, count of key)
val rdd = sc.parallelize(Seq(("math", 55),("math",56),("english",57),("english",58),("science", 59), ("science", 54)))
rdd.countByKey()

val rdd=sc.parallelize(Array("Apple","Banana","Grapes","Oranges","Grapes","Banana"))
val rdd1=rdd.map(k=>(k,1))
rdd1.countByKey()



 countByValue
  • It is an action.
  • it returns the count of each unique value in RDD as local  MAP pairts count
val rdd = sc.parallelize( Seq(10, 4, 3, 3))
rdd.countByValue()

val rdd1 = sc.parallelize(Seq(("HR",5),("RD",4),("ADMIN",5),("SALES",4),("SER",6),("MAN",8),("HR",5)))
rdd1.countByValue()


reduce: Spark RDD reduce() aggregate action function is used to calculate min, max, and total of elements in a dataset.Spark RDD reduce function reduces the elements of this RDD using the specified commutative and associative binary operator.This operation is also a wide operation.
  • In the sense the execution of this operation results in distributing the data across the multiple partitions.
  • It accepts a function with (which accepts two arguments and returns a single element) which should be Commutative and Associative in mathematical nature. 
  • That intuitively means, this function produces same result when repetitively applied on same set of RDD data with multiple partitions irrespective of element’s order. 

Here we are doing cumulative sum of numbers from 1 to 10 using reduce function. Here reduce method accepts a function (a,n) => (a + n) . This function initialize  a variable with default integer value 0 , adds up an element every when reduce method is called and returns final value when all elements of RDD X are processed. It returns the final value rather than another RDD.

Important points to note are,
  • reduce is an action operation in Spark hence it triggers execution of DAG and gets execute on final RDD
  • It is a wide operation as it is shuffling data from multiple partitions and reduces to a single value
  • It accepts a Commutative and Associative function as an argument
  • The parameter function should have two arguments of the same data type
  • The return type of the function also must be same as argument types
val x = sc.parallelize(1 to 10, 2)
val y = x.reduce((a,n) => (a + n))


val x = sc.parallelize(1 to 10, 2)
val y = x.reduce(_ + _)


val x = sc.parallelize(1 to 10, 2)
val y = x.reduce(_ * _)



val names = sc.parallelize(List("abe", "abby", "apple"))
names.reduce((t1,t2) => t1 + t2)
names.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2)
val names1 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size))
names1.flatMap(t => Array(t._2)).reduce(_ + _)

val rdd = sc.parallelize(1 to 15).reduce(_ + _)

val rdd = sc.parallelize(Array("Hello", "Dataneb", "Spark")).reduce(_ + _)
al rdd = sc.parallelize(Array("Hello", "Dataneb", "Spark")).map(x =>(x, x.length)).flatMap(l=> List(l._2)).collect
rdd.reduce(_ + _)
rdd.reduce((x, y)=>x+y)
 
fold(); it is similar to reduce except that it takes an 'Zero value'(Think of it as a kind of initial value) which will be used in the initial call on each Partition.

val empData = List(("Jack",1000.0),("Bob",2000.0),("Carl",7000.0))
val empRDD = sc.parallelize(empData)
val dummyEmp = ("dummy",0.0);
val maxSalEmp = empRDD.fold(dummyEmp)((acc,emp) => {
if(acc._2 < emp._2) emp else acc})
println("employee with maximum salary is"+maxSalEmp)

No comments:

Post a Comment