Resilient Distributed Dataset (RDD) Transformations
Transformations are the one which generates new RDD from existing RDD/creates new RDD.
mapPartitions
Similar to map, but runs separately on each partition (block) of the RDD. i.e. Return a new RDD by applying a function to each partition of this RDD.1 2 3 4 5 | // RDD Transformations - mapPartitions val resultRDD2 = namesRDD.mapPartitions(onePartition => { println("Processing Current Partition ... ") onePartition.map(element => element.size) }) |
mapPartitionsWithIndex
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition. i.e. Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.1 2 3 4 5 | // RDD Transformations - mapPartitionsWithIndex val resultRDD3 = namesRDD.mapPartitionsWithIndex((index, onePartition) => { println("Partition Number(index): " + index) onePartition.map(element => "element: " + element + ", element size: " + element.size + ", Partition Number(index): " + index)}) |
Full Program (RDDMapPartitionsMapPartitionsWithIndexDemo.scala)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | package com.ctv.apache.spark.rdd import org.apache.spark.sql.SparkSession object RDDMapPartitionsMapPartitionsWithIndexDemo { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("Apache Spark for Beginners using Scala | RDD Transformations | mapPartitions, mapPartitionsWithIndex | Demo") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") val namesList = List("Arun", "Rohit", "Vijay", "Bala", "Aket", "Frank") val namesRDD = spark.sparkContext.parallelize(namesList, 3) // RDD Transformations - map val resultRDD1 = namesRDD.map(element => element.size) resultRDD1.collect().foreach(println) // RDD Transformations - mapPartitions val resultRDD2 = namesRDD.mapPartitions(onePartition => { println("Processing Current Partition ... ") onePartition.map(element => element.size) }) resultRDD2.collect().foreach(println) // RDD Transformations - mapPartitionsWithIndex val resultRDD3 = namesRDD.mapPartitionsWithIndex((index, onePartition) => { println("Partition Number(index): " + index) onePartition.map(element => "element: " + element + ", element size: " + element.size + ", Partition Number(index): " + index)}) resultRDD3.collect().foreach(println) spark.stop() } } |
SBT Build File (build.sbt)
1 2 3 4 5 6 7 | name := "spark_for_beginners" version := "1.0" scalaVersion := "2.12.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3" |
Happy Learning !!!

0 Comments