map():The
map function iterates over every line in RDD and split into new RDD.
Using map() transformation we take in any function, and that function is
applied to every element of RDD. In the map, we have the flexibility
that the input and the return type of RDD may differ from each other.
For example, we can have input RDD type as String, after applying the
map() function the return RDD can be Boolean.
val rdd=sc.parallelize (Array(1, 2, 3, 4, 5))
val a=rdd.map(x=>x+2).collect
val rdd=sc.parallelize (Array(1, 2, 3, 4, 5))
val a=rdd.map(x=>x+2).collect
Basic map example in scala
val x = sc.parallelize(List("spark", "rdd", "example", "sample", "example"), 3)
val y = x.map(x => (x, 1))
y.collect
val y = x.map(x => (x, 1))
y.collect
val x = sc.parallelize(List("spark", "rdd", "example", "sample", "example"), 3)
val y = x.map((_,1))
y.collect
val y = x.map((_,1))
y.collect
val x = sc.parallelize(List("spark", "rdd", "example", "sample", "example"), 3)
val y = x.map(x=> (x,x.length))
y.collect
val x = sc.parallelize(List("spark", "rdd", "example", "sample", "example"), 3)
val y = x.map(x=> x.toUpperCase())
y.collect
val x = sc.parallelize(List("spark", "rdd", "example", "sample", "example"), 3)
val y = x.map(x=> x.toLowerCase())
y.collect
Basic map example in scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate()
val data = sc.textFile("spark_test.txt")
val mapFile = data.map(line => (line,line.length))
mapFile.foreach(println)
}
}
faltMap(): With the help of flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words. Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line.
- The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements(0 or more elements).Also, the output of the flatMap is flattened.
val flatmapFile = data.flatMap(lines => lines.split(" "))
flatmapFile.collect
val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word1.collect
val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word2.collect.foreach(println)
val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word3.collect.foreach(println)
val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word2.collect.foreach(println)
Basic map example in scala
sc.parallelize(1 to 9, 3).flatMap(x=>List(x,x,x)).collect
sc.parallelize(1 to 9, 3).flatMap(x=>List(x,x,x)).collect
Basic map example in scala
val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))rdd.collect
rdd.map(x => x.split(" ")).collect
rdd.flatMap(x => x.split(" ")).collect
rdd.map(x => x.split(" ")).count()
rdd.flatMap(x=>x.split(" ")).map(x=>(x, x.length)).collect
Filter: Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.
- filter is a transformation operation in Spark hence it is lazily evaluated
- It is a narrow operation as it is not shuffling data from one partition to multiple partitions
- filter accepts predicate as an argument and will filter the elements from source RDD which are not satisfied by predicate function
For example, Suppose RDD contains first five natural numbers (1, 2, 3,
4, and 5) and the predicate is check for an even number. The resulting
RDD after the filter will contain only the even numbers i.e., 2 and 4.
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val x1=x.filter(e => e > 4).collect
val num = List(1, 2, 3, 4, 5)
val rdd = num.filter( x => x != 5)
val rdd = num.filter( x => x != 5)
Write in diffrent format
val num = List(1, 2, 3, 4, 5)
val rdd = num.filter(_ != 5)
In a very huge text file you want to just check if a particular keyword exists.
val line = spark.read.textFile("/FileStore/tables/data.txt")
val word = line.flatMap(_.split(","))
val output = word.filter { x => x.startsWith("S") } // filter the words starts with letter“s”
output.collect.foreach(println)
val line = spark.read.textFile("/FileStore/tables/data.txt")
val word = line.flatMap(_.split(","))
val output = word.filter { x => x.startsWith("S") } // filter the words starts with letter“s”
output.collect.foreach(println)
val output1=line.filter(x=>x.contains("Spark"))
output1.collect.foreach(println)
val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))
rdd.filter(x=>x.contains("Himalayas")).collect
sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect
sc.parallelize(1 to 9, 3).partitions.size
sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next).iterator)).collect
In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times. In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator as input. In thirds step, I tried to get the iterator next value to show you the element. Note that next is always increasing value, so you can't step back.
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next,x.next, "|").iterator)).collect
In first call next value for partition 1 changed from 1 => 2 , for partition 2 it changed from 4 => 5 and similarly for partition 3 it changed from 7 => 8. You can keep this increasing until hasNext is False (hasNext is a property of iteration which tells you whether collection has ended or not, it returns you True or False based on items left in the collection). For example,
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.hasNext).iterator)).collect
You can see hasNext is true because there are elements left in each partition. Now suppose we access all three elements from each partition, then hasNext will result false. For example,
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.hasNext).iterator)).collect
Just for our understanding, if you will try to access next 4th time, you will get error which is expected.
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.next,x.hasNext).iterator)).collect
Think, map() transformation as special case of mapPartitions() where you have just 1 element in each partition. Isn't it?
rdd.filter(x=>x.contains("Himalayas")).collect
Filter out even number
sc.parallelize(1 to 15).filter(x=>(x%2==0)).collect
Filter out odd number
sc.parallelize(1 to 15).filter(x=>(x%2!=0)).collect
sc.parallelize(1 to 15).filter(_%5==0).collect
sc.parallelize(1 to 9, 3).flatMap(x=>List(x,x,x)).filter(x=> x%2 ==0 ).collect
sc.parallelize(1 to 9, 3).flatMap(x=>List(x,x,x)).filter(x=> x%2 ==0 ).collect
mapPartition
Similar to map() transformation but in this case function runs
separately on each partition (block) of RDD unlike map() where it was
running on each element of partition. Hence mapPartitions are also
useful when you are looking for performance gain (calls your function
once/partition not once/element). Suppose you have elements from 1 to
100 distributed among 10 partitions i.e. 10 elements/partition. map()
transformation will call func 100 times to process these 100 elements
but in case of mapPartitions(), func will be called once/partition i.e.
10 times. Secondly, mapPartitions() holds the data in-memory i.e. it
will store the result in memory until all the elements of the partition
has been processed. mapPartitions() will return the result only after it
finishes processing of whole partition.mapPartitions() requires an
iterator input unlike map() transformation.sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect
sc.parallelize(1 to 9, 3).partitions.size
sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next).iterator)).collect
In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times. In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator as input. In thirds step, I tried to get the iterator next value to show you the element. Note that next is always increasing value, so you can't step back.
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next,x.next, "|").iterator)).collect
In first call next value for partition 1 changed from 1 => 2 , for partition 2 it changed from 4 => 5 and similarly for partition 3 it changed from 7 => 8. You can keep this increasing until hasNext is False (hasNext is a property of iteration which tells you whether collection has ended or not, it returns you True or False based on items left in the collection). For example,
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.hasNext).iterator)).collect
You can see hasNext is true because there are elements left in each partition. Now suppose we access all three elements from each partition, then hasNext will result false. For example,
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.hasNext).iterator)).collect
Just for our understanding, if you will try to access next 4th time, you will get error which is expected.
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.next,x.hasNext).iterator)).collect
Think, map() transformation as special case of mapPartitions() where you have just 1 element in each partition. Isn't it?
No comments:
Post a Comment