Maxwell's Daemon의 출력을 사용하여 MySQL 데이터베이스에서 발생한 변경 사항을 캡처하고 있습니다. 중첩 된 JSON 필드로 변경 사항을 표시합니다. 'data'에는 테이블의 최신 스냅 샷이 포함되고 'old'는 변경된 필드를 나타냅니다.Pyspark - null 이외의 값으로 누락 된 JSON 필드를 나타냅니다.
이 JSON을 Spark DataFrame으로 읽으면 'old'의 누락 된 모든 필드가 'null'로 설정됩니다.
필드가 'null'에서 '[some_value]'로 변경되었거나 행의 일부 필드가 변경되어 'null'이 표시되기 때문에 불행한 상황입니다. JSON의 누락 된 필드
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType
custom_schema = StructType(
[StructField("type", StringType(), True),
StructField("ts", LongType(), True),
StructField("xid", LongType(), True),
StructField("data", StructType([
StructField("id", LongType(), True),
StructField("bought_by", StringType(), True),
StructField("userprofile_id", StringType(), True)]), True),
StructField("old", StructType([
StructField("id", LongType(), True),
StructField("bought_by", StringType(), True),
StructField("userprofile_id", StringType(), True)]), True)]
)
source_list = [
'{"type":"update","ts":1510901244,"xid":1,"data":{"id":1,"bought_by":"user:1","userprofile_id":1}, "old":{"userprofile_id":null}}',
'{"type":"update","ts":1510901245,"xid":2,"data":{"id":1,"bought_by":"user:1","userprofile_id":null}, "old":{"userprofile_id":2}}',
'{"type":"update","ts":1510901246,"xid":3,"data":{"id":1,"bought_by":"user:1","userprofile_id":1}, "old":{"userprofile_id":2}}',
'{"type":"update","ts":1510901246,"xid":4,"data":{"id":1,"bought_by":"user:1","userprofile_id":1}, "old":{"bought_by":"user:2"}}',
]
df = spark.read.json(spark.sparkContext.parallelize(source_list), schema=custom_schema)
df.show()
이의 출력은 다음과 같습니다 : 나는 보냈다
+------+----------+---+---------------+--------------------+
| type| ts|xid| data| old|
+------+----------+---+---------------+--------------------+
|update|1510901244| 1| [1,user:1,1]| ['N/A','N/A',null]|
|update|1510901245| 2|[1,user:1,null]| ['N/A','N/A',2]|
|update|1510901246| 3| [1,user:1,1]| ['N/A','N/A',2]|
|update|1510901246| 4| [1,user:1,1]|['N/A',user:2,'N/A']|
+------+----------+---+---------------+--------------------+
:
+------+----------+---+---------------+------------------+
| type| ts|xid| data| old|
+------+----------+---+---------------+------------------+
|update|1510901244| 1| [1,user:1,1]| [null,null,null]|
|update|1510901245| 2|[1,user:1,null]| [null,null,2]|
|update|1510901246| 3| [1,user:1,1]| [null,null,2]|
|update|1510901246| 4| [1,user:1,1]|[null,user:2,null]|
+------+----------+---+---------------+------------------+
그러나 나는 이런 식으로 뭔가를 생산하기 위해 찾고 여기
은 예입니다 솔루션을 검색하는 데 꽤 오랜 시간이 걸렸지 만이 기사를 설명하는 기사 만 찾았습니다. 누락 된 필드를 나타내는 'null'값을 가진 ituation 및 'null'값을 다른 값으로 대체 할 수있는 솔루션은 있지만 내 상황에서는 도움이되지 않습니다. "userprofile_id에 널 (null) : 우리가 Gobblin를 사용하여 데이터를 섭취하고 있기 때문에
, 우리가
"userprofile_id "를 대체하는 규칙을 추가합니다 :
지금 내가 가지고있는 가장 가까운 해결책은 이것이다 "-1
또는 문자열 값에 대한
교체"string_field": "string_field"와 널 (null) : "N/A"
그러나이 확장 될 너무 해키입니다.
이 문제를 해결할 수있는 도움을 주시면 감사하겠습니다. 감사!
나는 못생긴 해결책이있어 https://stackoverflow.com/a/47531436/8432213 더 나은 것을 기다리고있다. –