Skip to main content
 首页 » 编程设计

python之处理 spark 数据帧中的非统一 JSON 列

2024年11月01日3davidwang456

我想知道将换行符分隔的 JSON 文件读入数据帧的最佳做法是什么。至关重要的是,每个记录中的(必填)字段之一映射到一个对象,该对象不能保证具有相同的子字段(即模式在所有记录中是不一致的)。

例如,输入文件可能如下所示:

{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}} 
{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}} 
{"id": 3, "type": "baz", "data": {"key3": "moo"}} 

在这种情况下,idtypedata 字段将出现在所有记录中,但 映射到的结构>data 将具有异构模式。

我有两种方法来处理 data 列的不均匀性:

  1. 让 Spark 推断模式:
df = spark.read.options(samplingRatio=1.0).json('s3://bucket/path/to/newline_separated_json.txt') 

这种方法的明显问题是需要对每个 记录进行采样以确定将成为最终模式的字段/模式的超集。考虑到数百万条记录的低 100 条数据集,这可能会非常昂贵?或者……

  1. 告诉 Spark 将数据字段强制转换为 JSON 字符串,然后只有一个由三个顶级字符串字段组成的模式,idtype数据。在这里,我不太确定最好的方法。例如,我假设只是将 data 字段声明为如下所示的字符串,这将不起作用,因为它没有明确地执行与 json.dumps 等效的操作?
schema = StructType([ 
    StructField("id", StringType(), true), 
    StructField("type", StringType(), true), 
    StructField("data", StringType(), true) 
]) 
df = spark.read.json('s3://bucket/path/to/newline_separated_json.txt', schema=schema) 

如果我想避免选项 1 产生的扫描完整数据集的成本,摄取此文件并将 data 字段保留为 JSON 字符串的最佳方法是什么?

谢谢

请您参考如下方法:

我认为您的尝试和总体思路是正确的。这里有两种基于内置选项的方法,也就是 get_json_object/from_json 通过数据帧 API 和使用 map 转换以及 python 的 json.dumps()json.loads() 通过 RDD API。

选项 1: get_json_object()/from_json()

首先让我们尝试使用不需要架构的 get_json_object():

import pyspark.sql.functions as f 
 
df = spark.createDataFrame([ 
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'), 
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'), 
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}') 
], StringType()) 
 
df.select(f.get_json_object("value", "$.id").alias("id"), \ 
          f.get_json_object("value", "$.type").alias("type"), \ 
           f.get_json_object("value", "$.data").alias("data")) 
 
# +---+----+-----------------------------+ 
# |id |type|data                         | 
# +---+----+-----------------------------+ 
# |1  |foo |{"key0":"foo","key2":"meh"}  | 
# |2  |bar |{"key2":"poo","key3":"pants"}| 
# |3  |baz |{"key3":"moo"}               | 
# +---+----+-----------------------------+ 

相反,from_json() 需要模式定义:

from pyspark.sql.types import StringType, StructType, StructField 
import pyspark.sql.functions as f 
 
df = spark.createDataFrame([ 
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'), 
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'), 
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}') 
], StringType()) 
 
schema = StructType([ 
    StructField("id", StringType(), True), 
    StructField("type", StringType(), True), 
    StructField("data", StringType(), True) 
]) 
 
df.select(f.from_json("value", schema).getItem("id").alias("id"), \ 
         f.from_json("value", schema).getItem("type").alias("type"), \ 
         f.from_json("value", schema).getItem("data").alias("data")) 
 
# +---+----+-----------------------------+ 
# |id |type|data                         | 
# +---+----+-----------------------------+ 
# |1  |foo |{"key0":"foo","key2":"meh"}  | 
# |2  |bar |{"key2":"poo","key3":"pants"}| 
# |3  |baz |{"key3":"moo"}               | 
# +---+----+-----------------------------+ 

选项 2:map/RDD API + json.dumps()

from pyspark.sql.types import StringType, StructType, StructField 
import json 
 
df = spark.createDataFrame([ 
  '{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}', 
  '{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}', 
  '{"id": 3, "type": "baz", "data": {"key3": "moo"}}' 
], StringType()) 
 
def from_json(data): 
  row = json.loads(data[0]) 
  return (row['id'], row['type'], json.dumps(row['data'])) 
 
json_rdd = df.rdd.map(from_json) 
 
schema = StructType([ 
    StructField("id", StringType(), True), 
    StructField("type", StringType(), True), 
    StructField("data", StringType(), True) 
]) 
 
spark.createDataFrame(json_rdd, schema).show(10, False) 
 
# +---+----+--------------------------------+ 
# |id |type|data                            | 
# +---+----+--------------------------------+ 
# |1  |foo |{"key2": "meh", "key0": "foo"}  | 
# |2  |bar |{"key2": "poo", "key3": "pants"}| 
# |3  |baz |{"key3": "moo"}                 | 
# +---+----+--------------------------------+ 
 

函数 from_json 会将字符串行转换为 (id, type, data) 的元组。 json.loads()将解析 json 字符串并返回一个字典,我们通过该字典生成并返回最终的元组。