Resilient Distributed Dataset (RDD) Transformations
Transformations are the one which generates new RDD from existing RDD/creates new RDD.
mapPartitions
Similar to map, but runs separately on each partition (block) of the RDD. i.e. Return a new RDD by applying a function to each partition of this RDD.
1
2
3
4
5
| // RDD Transformations - mapPartitions
val resultRDD2 = namesRDD.mapPartitions(onePartition => {
println("Processing Current Partition ... ")
onePartition.map(element => element.size)
})
|
mapPartitionsWithIndex
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition. i.e. Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
1
2
3
4
5
| // RDD Transformations - mapPartitionsWithIndex
val resultRDD3 = namesRDD.mapPartitionsWithIndex((index, onePartition) => {
println("Partition Number(index): " + index)
onePartition.map(element => "element: " + element +
", element size: " + element.size + ", Partition Number(index): " + index)})
|
Full Program (RDDMapPartitionsMapPartitionsWithIndexDemo.scala)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| package com.ctv.apache.spark.rdd
import org.apache.spark.sql.SparkSession
object RDDMapPartitionsMapPartitionsWithIndexDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Apache Spark for Beginners using Scala | RDD Transformations | mapPartitions, mapPartitionsWithIndex | Demo")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val namesList = List("Arun", "Rohit", "Vijay", "Bala", "Aket", "Frank")
val namesRDD = spark.sparkContext.parallelize(namesList, 3)
// RDD Transformations - map
val resultRDD1 = namesRDD.map(element => element.size)
resultRDD1.collect().foreach(println)
// RDD Transformations - mapPartitions
val resultRDD2 = namesRDD.mapPartitions(onePartition => {
println("Processing Current Partition ... ")
onePartition.map(element => element.size)
})
resultRDD2.collect().foreach(println)
// RDD Transformations - mapPartitionsWithIndex
val resultRDD3 = namesRDD.mapPartitionsWithIndex((index, onePartition) => {
println("Partition Number(index): " + index)
onePartition.map(element => "element: " + element +
", element size: " + element.size + ", Partition Number(index): " + index)})
resultRDD3.collect().foreach(println)
spark.stop()
}
}
|
SBT Build File (build.sbt)
1
2
3
4
5
6
7
| name := "spark_for_beginners"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
|
Happy Learning !!!
0 Comments