Module 3.4: Building Data Pipeline using Spark Structured Streaming with Scala

Prerequisite

  • Java 1.8
  • Scala 2.12.8
  • Apache Spark
  • Apache Hadoop
  • Apache Kafka
  • MongoDB
  • MySQL
  • IntelliJ IDEA Community Edition

Walk-through

In this article, we are going to discuss about how to consume Meetup.com's RSVP JSON Message in Spark Structured Streaming and store the raw JSON messages into MongoDB collection and then store the processed data into MySQl table in the "Real-Time Meetup.com RSVP Message Processing Application".



stream_processing_app.scala



package com.datamaking.stream.processing

// --class com.datamaking.stream.processing.stream_processing_app

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object stream_processing_app {
  def main(args: Array[String]): Unit = {
    println("Stream Processing Application Started ...")

    // Code Block 1 Starts Here

    // Code Block 1 Ends Here

    // Code Block 2 Starts Here

    // Code Block 2 Ends Here

    // Code Block 3 Starts Here

    // Code Block 3 Ends Here

    // Code Block 4 Starts Here

    // Code Block 4 Ends Here

    // Code Block 5 Starts Here

    // Code Block 5 Ends Here

    // Code Block 6 Starts Here

    // Code Block 6 Ends Here

    // Code Block 7 Starts Here

    // Code Block 7 Ends Here

    // Code Block 8 Starts Here

    // Code Block 8 Ends Here

    println("Stream Processing Application Completed.")
  }
}


Module 3.4.1: Building Data Pipeline using Spark Structured Streaming for Data Ingestion



    // Code Block 1 Starts Here
    val kafka_topic_name = "meetuprsvptopic"
    val kafka_bootstrap_servers = "localhost:9092"

    val mysql_host_name = "localhost"
    val mysql_port_no = "3306"
    val mysql_user_name = "root"
    val mysql_password = "root"
    val mysql_database_name = "meetup_rsvp_db"
    val mysql_driver_class = "com.mysql.jdbc.Driver"
    val mysql_table_name = "meetup_rsvp_message_agg_detail_tbl"
    val mysql_jdbc_url = "jdbc:mysql://" + mysql_host_name + ":" + mysql_port_no + "/" + mysql_database_name

    val mongodb_host_name = "localhost"
    val mongodb_port_no = "27017"
    val mongodb_user_name = "admin"
    val mongodb_password = "admin"
    val mongodb_database_name = "meetup_rsvp_db"
    val mongodb_collection_name = "meetup_rsvp_message_detail_tbl"
    // Code Block 1 Ends Here


    // Code Block 2 Starts Here
    val spark = SparkSession.builder
      .master("local[*]")
      .appName("Stream Processing Application")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    // Code Block 2 Ends Here


    // Code Block 3 Starts Here
    // Stream meetup.com RSVP Message Data from Kafka
    val meetup_rsvp_df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
      .option("subscribe", kafka_topic_name)
      .option("startingOffsets", "latest")
      .load()

    println("Printing Schema of transaction_detail_df: ")
    meetup_rsvp_df.printSchema()
    // Code Block 3 Ends Here


    // Code Block 4 Starts Here
    // Define a schema for the message_detail data
    val meetup_rsvp_message_schema = StructType(Array(
      StructField("venue", StructType(Array(
        StructField("venue_name", StringType),
        StructField("lon", StringType),
        StructField("lat", StringType),
        StructField("venue_id", StringType)
      ))),
      StructField("visibility", StringType),
      StructField("response", StringType),
      StructField("guests", StringType),
      StructField("member", StructType(Array(
        StructField("member_id", StringType),
        StructField("photo", StringType),
        StructField("member_name", StringType)
      ))),
      StructField("rsvp_id", StringType),
      StructField("mtime", StringType),
      StructField("event", StructType(Array(
        StructField("event_name", StringType),
        StructField("event_id", StringType),
        StructField("time", StringType),
        StructField("event_url", StringType)
      ))),
      StructField("group", StructType(Array(
        StructField("group_topics", ArrayType(StructType(Array(
          StructField("urlkey", StringType),
          StructField("topic_name", StringType)
        )), true)),
        StructField("group_city", StringType),
        StructField("group_country", StringType),
        StructField("group_id", StringType),
        StructField("group_name", StringType),
        StructField("group_lon", StringType),
        StructField("group_urlname", StringType),
        StructField("group_state", StringType),
        StructField("group_lat", StringType)
      )))
    ))

    val meetup_rsvp_df_1 = meetup_rsvp_df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

    val meetup_rsvp_df_2 = meetup_rsvp_df_1.select(from_json(col("value"), meetup_rsvp_message_schema)
        .as("message_detail"), col("timestamp"))

    val meetup_rsvp_df_3 = meetup_rsvp_df_2.select("message_detail.*", "timestamp")

    val meetup_rsvp_df_4 = meetup_rsvp_df_3.select(col("group.group_name"),
      col("group.group_country"), col("group.group_state"), col("group.group_city"),
      col("group.group_lat"), col("group.group_lon"), col("group.group_id"),
      col("group.group_topics"), col("member.member_name"), col("response"),
      col("guests"), col("venue.venue_name"), col("venue.lon"), col("venue.lat"),
      col("venue.venue_id"), col("visibility"), col("member.member_id"),
      col("member.photo"), col("event.event_name"), col("event.event_id"),
      col("event.time"), col("event.event_url")
    )

    println("Printing Schema of meetup_rsvp_df_4: ")
    meetup_rsvp_df_4.printSchema()
    // Code Block 4 Ends Here


Module 3.4.2: Building Data Pipeline to store raw data into MongoDB data store using Spark Structured Streaming





    // Code Block 5 Starts Here
    // Writing Meetup RSVP DataFrame into MongoDB Collection Starts Here
    val spark_mongodb_output_uri = "mongodb://" + mongodb_user_name + ":" + mongodb_password + "@" + mongodb_host_name + ":" + mongodb_port_no + "/" + mongodb_database_name + "." + mongodb_collection_name
    println("Printing spark_mongodb_output_uri: " + spark_mongodb_output_uri)

    meetup_rsvp_df_4.writeStream
      .trigger(Trigger.ProcessingTime("20 seconds"))
      .outputMode("update")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        val batchDF_1 = batchDF.withColumn("batch_id", lit(batchId))
        // Transform batchDF and write it to sink/target/persistent storage
        // Write data from spark dataframe to database

        batchDF_1.write
        .format("mongo")
        .mode("append")
        .option("uri", spark_mongodb_output_uri)
        .option("database", mongodb_database_name)
        .option("collection", mongodb_collection_name)
        .save()
      }.start()
    // Writing Aggregated Meetup RSVP DataFrame into MySQL Database Table Ends Here
    // Code Block 5 Ends Here


    // Code Block 6 Starts Here
    // Simple aggregate - find response_count by grouping group_name,
    // group_country, group_state, group_city, group_lat, group_lon, response
    val meetup_rsvp_df_5 = meetup_rsvp_df_4.groupBy("group_name", "group_country",
      "group_state", "group_city", "group_lat", "group_lon", "response")
      .agg(count(col("response")).as("response_count"))

    println("Printing Schema of meetup_rsvp_df_5: ")
    meetup_rsvp_df_5.printSchema()
    // Code Block 6 Ends Here


    // Code Block 7 Starts Here
    // Write final result into console for debugging purpose
    val trans_detail_write_stream = meetup_rsvp_df_5
      .writeStream
      .trigger(Trigger.ProcessingTime("20 seconds"))
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()
    // Code Block 7 Ends Here


Module 3.4.3: Building Data Pipeline to store processed data into MySQL database using Spark Structured Streaming | Data Processing





    // Code Block 8 Starts Here
    // Writing Aggregated Meetup RSVP DataFrame into MySQL Database Table Starts Here
    val mysql_properties = new java.util.Properties
    mysql_properties.setProperty("driver", mysql_driver_class)
    mysql_properties.setProperty("user", mysql_user_name)
    mysql_properties.setProperty("password", mysql_password)

    println("mysql_jdbc_url: " + mysql_jdbc_url)

    meetup_rsvp_df_5.writeStream
      .trigger(Trigger.ProcessingTime("20 seconds"))
      .outputMode("update")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        val batchDF_1 = batchDF.withColumn("batch_id", lit(batchId))
        // Transform batchDF and write it to sink/target/persistent storage
        // Write data from spark dataframe to database
        batchDF_1.write.mode("append").jdbc(mysql_jdbc_url, mysql_table_name, mysql_properties)
    }.start()
    // Writing Aggregated Meetup RSVP DataFrame into MySQL Database Table Ends Here

    trans_detail_write_stream.awaitTermination()
    // Code Block 8 Ends Here


build.sbt



name := "meetup_dot_com_rsvp_stream_processing"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.4"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.3.1"

// https://mvnrepository.com/artifact/mysql/mysql-connector-java
libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.18"

// https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1"


Summary

In this article, we have successfully consumed Meetup.com's RSVP JSON Message in Spark Structured Streaming and stored the raw JSON messages into MongoDB collection and then stored the processed data into MySQl table in this project. Please go through all these steps and provide your feedback and post your queries/doubts if you have. Thank you. Appreciated.



You can build this project and run on your Spark and Hadoop cluster or reach to us for FREE Spark and Hadoop VM.

Request for FREE Spark and Hadoop VM

Click here for more details about FREE Spark and Hadoop VM

Happy Learning !!!

Post a Comment

0 Comments