Resilient Distributed Dataset (RDD) Transformations
Transformations are the one which generates new RDD from existing RDD/creates new RDD.
map
Return a new RDD formed by passing each element of the source through a function func.
1
2
| // RDD Transformations - map
val wordsCountPerLineRDD = linesRDD.map(line => line.split(" ").size)
|
filter
Return a new RDD formed by selecting those elements of the source on which func returns true.
1
2
| // RDD Transformations - filter
val linesWithWordForRDD = linesRDD.filter(line => line.contains("for"))
|
flatMap
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
1
2
| // RDD Transformations - flatMap
val wordsRDD = linesWithWordForRDD.flatMap(line => line.split(" "))
|
Full Program (RDDMapFilterFlatMapDemo.scala)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| package com.ctv.apache.spark.rdd
import org.apache.spark.sql.SparkSession
object RDDMapFilterFlatMapDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Apache Spark for Beginners using Scala | RDD Transformations | map, filter, flatMap | Demo")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// Reading text file from local file system
val filePath = "file:///D://record_tech_videos//data//rdd_examples//test.txt"
val linesRDD = spark.sparkContext.textFile(filePath)
linesRDD.collect().foreach(println)
// RDD Transformations - map
val wordsCountPerLineRDD = linesRDD.map(line => line.split(" ").size)
wordsCountPerLineRDD.collect().foreach(println)
// RDD Transformations - filter
val linesWithWordForRDD = linesRDD.filter(line => line.contains("for"))
linesWithWordForRDD.collect().foreach(println)
// RDD Transformations - flatMap
val wordsRDD = linesWithWordForRDD.flatMap(line => line.split(" "))
wordsRDD.collect().foreach(println)
spark.stop()
}
}
|
SBT Build File (build.sbt)
1
2
3
4
5
6
7
| name := "spark_for_beginners"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
|
Input File (test.txt)
1
2
3
| Apache Spark is a unified analytics engine for big data processing,
with built-in modules for streaming, SQL, machine learning
and graph processing.
|
Happy Learning !!!
3 Comments
Hi,
ReplyDeleteCan you explain how below lines are getting execute/and the use of them in abobe code:-
val spark = SparkSession
.builder
.appName("Apache Spark for Beginners using Scala | RDD Transformations | map, filter, flatMap | Demo")
.master("local[*]")
.getOrCreate()
----------------------------
Hi Roman, here we are creating SparkSession object called "spark" for one user(session), builder function constructs SparkSession with properties like appName, master and function "getOrCreate" creates or gets SparkSession object, and also SparkSession internally creates SparkContext, SQLContext to work with Spark cluster. I hope this helps you.
ReplyDeleteFor more information:
https://spark.apache.org/docs/2.4.3/api/scala/index.html#org.apache.spark.sql.SparkSession
https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Thank you for your query and appreciated. Happy Learning !!!
Hi,
ReplyDeleteThanks for sharing the useful transformation.Could you please add more transformations. Also please provide examples on actions along with transformations.
Thanks,
Naveen.