Resilient Distributed Dataset (RDD) Transformations


Transformations are the one which generates new RDD from existing RDD/creates new RDD.


map

Return a new RDD formed by passing each element of the source through a function func.

1
2
// RDD Transformations - map
val wordsCountPerLineRDD = linesRDD.map(line => line.split(" ").size)

filter

Return a new RDD formed by selecting those elements of the source on which func returns true.

1
2
// RDD Transformations - filter
val linesWithWordForRDD = linesRDD.filter(line => line.contains("for"))

flatMap

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

1
2
// RDD Transformations - flatMap
val wordsRDD = linesWithWordForRDD.flatMap(line => line.split(" "))



Full Program (RDDMapFilterFlatMapDemo.scala)


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.ctv.apache.spark.rdd

import org.apache.spark.sql.SparkSession

object RDDMapFilterFlatMapDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("Apache Spark for Beginners using Scala | RDD Transformations | map, filter, flatMap | Demo")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Reading text file from local file system
    val filePath = "file:///D://record_tech_videos//data//rdd_examples//test.txt"
    val linesRDD = spark.sparkContext.textFile(filePath)

    linesRDD.collect().foreach(println)

    // RDD Transformations - map
    val wordsCountPerLineRDD = linesRDD.map(line => line.split(" ").size)

    wordsCountPerLineRDD.collect().foreach(println)

    // RDD Transformations - filter
    val linesWithWordForRDD = linesRDD.filter(line => line.contains("for"))

    linesWithWordForRDD.collect().foreach(println)

    // RDD Transformations - flatMap
    val wordsRDD = linesWithWordForRDD.flatMap(line => line.split(" "))

    wordsRDD.collect().foreach(println)

    spark.stop()
  }
}

SBT Build File (build.sbt)


1
2
3
4
5
6
7
name := "spark_for_beginners"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"

Input File (test.txt)


1
2
3
Apache Spark is a unified analytics engine for big data processing, 
with built-in modules for streaming, SQL, machine learning 
and graph processing.

Happy Learning !!!