Joining two RDDs using join RDD transformation in PySpark | PySpark 101 | Part 16



Prerequisite

  • Apache Spark
  • PyCharm Community Edition

Walk-through

In this article, I am going to walk-through you all, how to use join RDD transformation in the PySpark application using PyCharm Community Edition.

join: Return an RDD containing all pairs of elements with matching keys in C{current} and C{another}

# Importing Spark Related Packages
from pyspark.sql import SparkSession

# Importing Python Related Packages

if __name__ == "__main__":
    print("PySpark 101 Tutorial")

    # join - Return an RDD containing all pairs of elements with matching keys in self and other
    # Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.
    # When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
    # Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

    spark = SparkSession \
            .builder \
            .appName("Part 16 - Joining two RDDs using join RDD transformation in PySpark | PySpark 101") \
            .master("local[*]") \
            .enableHiveSupport() \
            .getOrCreate()

    orders_kv_list = [(1, "2019-10-01"), (2, "2019-10-02"), (3, "2019-10-03")]
    print("Printing orders_kv_list: ")
    print(orders_kv_list)

    orders_dtl_kv_list = [(1, 12.5), (2, 15.3), (2, 10.0), (4, 20.5)]
    print("Printing orders_dtl_kv_list: ")
    print(orders_dtl_kv_list)

    orders_kv_rdd = spark.sparkContext.parallelize(orders_kv_list)

    orders_dtl_kv_rdd = spark.sparkContext.parallelize(orders_dtl_kv_list)

    orders_join_rdd = orders_kv_rdd.join(orders_dtl_kv_rdd)
    print("After Join: ")
    print(orders_join_rdd.collect())

    print(orders_join_rdd.map(lambda e: (e[0], e[1][1])).reduceByKey(lambda x, y: x + y).collect())

    customers_input_file_path = "file:///home/dmadmin/datamaking/data/pyspark101/input/customers_orders/customers_tbl_data.csv"
    orders_input_file_path = "file:///home/dmadmin/datamaking/data/pyspark101/input/customers_orders/orders_tbl_data.csv"

    customers_rdd = spark.sparkContext.textFile(customers_input_file_path)
    customers_rdd = customers_rdd.filter(lambda row: row != 'customer_id,customer_name,gender,country')

    customers_kv_rdd = customers_rdd.map(lambda row: (row.split(',')[0], row.split(',')[3]))

    orders_rdd = spark.sparkContext.textFile(orders_input_file_path)
    orders_rdd = orders_rdd.filter(lambda row: row != 'order_id,item_code,order_quantity,unit_price,customer_id,order_date')

    orders_kv_rdd = orders_rdd.map(lambda row: (row.split(',')[4], int(row.split(',')[2]) * float(row.split(',')[3])))

    customers_orders_join_rdd = customers_kv_rdd.join(orders_kv_rdd)
    print("Printing Join Results: ")
    print(customers_orders_join_rdd.collect())

    # join, rightOuterJoin, leftOuterJoin, fullOuterJoin

    print("Printing Aggregated Results: ")
    print(customers_orders_join_rdd.map(lambda e: (e[0], e[1][1])).reduceByKey(lambda x, y: x + y).collect())

    print("Stopping the SparkSession object")
    spark.stop()


Summary

In this article, we have successfully used join RDD transformation in the PySpark application using PyCharm Community Edition. Please go through all these steps and provide your feedback and post your queries/doubts if you have. Thank you. Appreciated.

Happy Learning !!!

Post a Comment

0 Comments