Resilient Distributed Dataset (RDD) Transformations


Transformations are the one which generates new RDD from existing RDD/creates new RDD.


mapPartitions

Similar to map, but runs separately on each partition (block) of the RDD. i.e. Return a new RDD by applying a function to each partition of this RDD.

1
2
3
4
5
// RDD Transformations - mapPartitions
val resultRDD2 = namesRDD.mapPartitions(onePartition => {
  println("Processing Current Partition ... ")
  onePartition.map(element => element.size)
})

mapPartitionsWithIndex

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition. i.e. Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

1
2
3
4
5
// RDD Transformations - mapPartitionsWithIndex
val resultRDD3 = namesRDD.mapPartitionsWithIndex((index, onePartition) => {
  println("Partition Number(index): " + index)
  onePartition.map(element => "element: " + element +
  ", element size: " + element.size + ", Partition Number(index): " + index)})



Full Program (RDDMapPartitionsMapPartitionsWithIndexDemo.scala)


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ctv.apache.spark.rdd

import org.apache.spark.sql.SparkSession

object RDDMapPartitionsMapPartitionsWithIndexDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("Apache Spark for Beginners using Scala | RDD Transformations | mapPartitions, mapPartitionsWithIndex | Demo")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val namesList = List("Arun", "Rohit", "Vijay", "Bala", "Aket", "Frank")
    val namesRDD = spark.sparkContext.parallelize(namesList, 3)

    // RDD Transformations - map
    val resultRDD1 = namesRDD.map(element => element.size)

    resultRDD1.collect().foreach(println)

    // RDD Transformations - mapPartitions
    val resultRDD2 = namesRDD.mapPartitions(onePartition => {
      println("Processing Current Partition ... ")
      onePartition.map(element => element.size)
    })

    resultRDD2.collect().foreach(println)

    // RDD Transformations - mapPartitionsWithIndex
    val resultRDD3 = namesRDD.mapPartitionsWithIndex((index, onePartition) => {
      println("Partition Number(index): " + index)
      onePartition.map(element => "element: " + element +
      ", element size: " + element.size + ", Partition Number(index): " + index)})

    resultRDD3.collect().foreach(println)

    spark.stop()
  }
}

SBT Build File (build.sbt)


1
2
3
4
5
6
7
name := "spark_for_beginners"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"

Happy Learning !!!