JSON woes in Apache Spark
This Apache Spark feature has made us scratch our heads way too much.
The issue has been some null fields where no nulls should appear, and then not being able to count them. It was very puzzling.
Let’s start with this JSON file:
{"foo": {"id": "32"}, "id": 42}
{"id": 43, "foo": {"id": "32"}}
{"id": 44, "foo": {"id": 32}}
The producer had some issue, maybe because of Javascript and some of the internal ids changed from the expected int
to string
. In other words, our Spark application expected
case class Foo(id: Option[Int])
case class Damn(foo: Foo, id: Int)
Now, take a moment to think: what do you expect should happen when you read these records as JSON and cast to this case class?
Let’s find out!
We prepare a JSON reader, enforce the schema and cast as DataSet. Nice. Just for convenience, we register it as a view to run SQL directly, easier to write than Scala sometimes.
val reader = spark.read.format("json")
val schema = Encoders.product[Damn].schema
val data = reader.schema(schema)
.json("damn.jsonl").as[Damn]
data.createOrReplaceTempView("data")
This one may be partially expected: there are wrong types in the data, right? We got one row.
spark.sql("select * from data where id is null")
.show
And here it is, a wrong row. Although… why one? Whatever, move on. The interesting part is next.
+----+----+
| foo| id|
+----+----+
|null|null|
+----+----+
What do you expect the following COUNT
will return? Should be one right?
spark.sql("select count(*) from data where id is null")
.show
Wrong.
+--------+
|count(1)|
+--------+
| 0|
+--------+
A WAT moment ensues.
This makes sense when you start digging into the execution plans for both queries.
For the first one, we have
spark.sql("select * from data where id is null")
.explain
with the following plan (with alignment modified to make it more readable):
While the second one,
spark.sql("select count(*) from data where id is null")
.explain
has plan
Usually there can be a lot of noise in query plans, but here you should focus on the last part, the ReadSchema. I’ll put them together for our convenience.
// Finds nulls
ReadSchema: struct<foo:struct<id:int>,id:int>
// Finds no nulls
ReadSchema: struct<id:int>
When Spark is reading the JSON file, it’s not processing it in the same way for both queries. For the first one, it is trying to cast to the defined schema in the DataSet, failing for a row and setting anything invalid to null.
And in this case, invalid means any data we could not parse. Let’s go back to the JSON file:
{"foo": {"id": "32"}, "id": 42}
{"id": 43, "foo": {"id": "32"}}
{"id": 44, "foo": {"id": 32}}
When reading row by row,
- the first row is invalid, fully.
- The second row is partially invalid:
id
is processed - The third row is fully valid.
To make it clearer, let’s select all the data.
spark.sql("select * from data").show
+----+----+
| foo| id|
+----+----+
|null|null|
|null| 43|
|{32}| 43|
+----+----+
And do the actual crazy, mind-blowing query
spark.sql("select id from data").show
+---+
| id|
+---+
| 42|
| 43|
| 44|
+---+
What can we do? This is horrible to handle, right?
Handling this issue
There are two approaches you can take here:
- Fail hard.
- Track failures.
Obviously, I prefer Option 2.
Fail hard
You create a reader and set it to fail fast.
val reader2 = spark.read.format("json")
.option("mode", "FAILFAST")
val schema2 = Encoders.product[Damn].schema
val data2 = reader2.schema(schema2)
.json("damn.jsonl").as[Damn]
Now the following query would fail due to the mismatch in schemas.
data2.show
Track failures
This keeps track of rows with invalid schemas. Note that PERMISSIVE
is the default mode, so you can skip this option.
import org.apache.spark.sql.types._
val reader3 = spark.read.format("json")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "corrupted")
val schema3 = schema
.add(StructField("corrupted", StringType))
val data3= reader3.schema(schema3)
.json("damn.jsonl").as[Damn]
data3.show
It will look like this:
+----+----+--------------------+
| foo| id| corrupted|
+----+----+--------------------+
|null|null|{"foo": {"id": "3...|
|null| 43|{"id": 43, "foo":...|
|{32}| 44| null|
+----+----+--------------------+
With this solution you can at least read your data and track how many rows had an schema violation.