Prerequisite
- Apache Spark
- IntelliJ IDEA Community Edition
Walk-through
In this article, I am going to walk-through you all, how to create Spark DataFrame from Nested(Complex) JSON file in the Apache Spark application using IntelliJ IDEA Community Edition.sample_nested_json_file.json
{ "IFAM":"EQR", "KTM": 1548176931466, "COL": 21, "DATA": [{ "MLrate": "31", "Crate": [{ "key": "k1", "value": "v1" }, { "key": "k2", "value": "v2" }] },{ "MLrate": "33", "Crate": [{ "key": "k3", "value": "v3" }, { "key": "k4", "value": "v4" }] }], "CHECK": { "Check1": 1, "Check2": "TWO" } }
build.sbt
name := "apache_spark_dataframe_practical_tutorial" version := "1.0" scalaVersion := "2.12.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
part_4_create_dataframe_from_nested_json_file.scala
package com.datamaking.apache.spark.dataframe import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{ArrayType, StructType} import org.apache.spark.sql.functions.explode import org.apache.spark.sql.functions.col object part_4_create_dataframe_from_nested_json_file { def expand_nested_column(json_data_df_temp: DataFrame): DataFrame = { var json_data_df: DataFrame = json_data_df_temp var select_clause_list = List.empty[String] // Iterating each columns again to check if any next json data is exists for (column_name <- json_data_df.schema.names){ println("Outside isinstance loop: " + column_name) // Checking column type is ArrayType if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]){ println("Inside isinstance loop: " + column_name) //Extracting nested json columns/data using explode function json_data_df = json_data_df.withColumn(column_name, explode(json_data_df(column_name)).alias(column_name)) select_clause_list :+= column_name } else if (json_data_df.schema(column_name).dataType.isInstanceOf[StructType]){ println("Inside isinstance loop of StructType: " + column_name) for (field <- json_data_df.schema(column_name).dataType.asInstanceOf[StructType].fields){ select_clause_list :+= column_name + "." + field.name } } else{ select_clause_list :+= column_name } } val columnNames = select_clause_list.map(name => col(name).alias(name.replace('.', '_'))) // Selecting columns using select_clause_list from dataframe: json_data_df json_data_df.select(columnNames:_*) } def main(args: Array[String]): Unit = { println("Apache Spark Application Started ...") val spark = SparkSession.builder() .appName("Create DataFrame from JSON File") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") //Code Block 1 Starts Here val json_file_path = "D:\\apache_spark_dataframe\\data\\json\\sample_nested_json_file.json" var json_data_df = spark.read.option("multiline", true).json(json_file_path) json_data_df.show(10, false) json_data_df.printSchema() // Process the Nested Structure var nested_column_count = 1 // Run the while loop until the nested_column_count is zero(0) while (nested_column_count != 0) { println("Printing nested_column_count: " + nested_column_count) var nested_column_count_temp = 0 // Iterating each columns again to check if any next json data is exists for (column_name <- json_data_df.schema.names){ print("Iterating DataFrame Columns: " + column_name) // Checking column type is ArrayType if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType] || json_data_df.schema(column_name).dataType.isInstanceOf[StructType]){ nested_column_count_temp += 1 } } if (nested_column_count_temp != 0){ json_data_df = expand_nested_column(json_data_df) json_data_df.show(10, false) } print("Printing nested_column_count_temp: " + nested_column_count_temp) nested_column_count = nested_column_count_temp } //Code Block 1 Ends Here json_data_df.show(10, false) json_data_df.printSchema() spark.stop() println("Apache Spark Application Completed.") } }
Summary
In this article, we have successfully learned how to create Spark DataFrame from Nested(Complex) JSON file in the Apache Spark application. Please go through all these steps and provide your feedback and post your queries/doubts if you have. Thank you. Appreciated.Happy Learning !!!
10 Comments
Great stuff, any chance you have this in python?
ReplyDeleteThis comment has been removed by the author.
DeleteCan you please write above code in Python Please?
DeleteI will do it in future and will let you know.
ReplyDeleteCan u share same thing in pyspark...
ReplyDeletegetting error in nested ArrayType
ReplyDeletecan you help me ?
This comment has been removed by the author.
ReplyDeleteLoved the detailed explaination!!!
ReplyDeleteGetting error while parsing nested structType and nested arrayType
ReplyDeletevery nice code , thanks brother . but giving dataframe count zero for below scenario.
ReplyDelete{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters": {
"batter": [
]
},
"topping": [
{
"id": "5001",
"type": "None"
},
{
"id": "5002",
"type": "Glazed"
},
{
"id": "5005",
"type": "Sugar"
},
{
"id": "5007",
"type": "Powdered Sugar"
},
{
"id": "5006",
"type": "Chocolate with Sprinkles"
},
{
"id": "5003",
"type": "Chocolate"
},
{
"id": "5004",
"type": "Maple"
}
]
}