2017-11-23 5 views
0

Maxwell's Daemon의 출력을 사용하여 MySQL 데이터베이스에서 발생한 변경 사항을 캡처하고 있습니다. 중첩 된 JSON 필드로 변경 사항을 표시합니다. 'data'에는 테이블의 최신 스냅 샷이 포함되고 'old'는 변경된 필드를 나타냅니다.Pyspark - null 이외의 값으로 누락 된 JSON 필드를 나타냅니다.

이 JSON을 Spark DataFrame으로 읽으면 'old'의 누락 된 모든 필드가 'null'로 설정됩니다.

필드가 'null'에서 '[some_value]'로 변경되었거나 행의 일부 필드가 변경되어 'null'이 표시되기 때문에 불행한 상황입니다. JSON의 누락 된 필드

from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType 

custom_schema = StructType(
[StructField("type", StringType(), True), 
StructField("ts", LongType(), True), 
StructField("xid", LongType(), True), 
StructField("data", StructType([ 
    StructField("id", LongType(), True), 
    StructField("bought_by", StringType(), True), 
    StructField("userprofile_id", StringType(), True)]), True), 
StructField("old", StructType([ 
    StructField("id", LongType(), True), 
    StructField("bought_by", StringType(), True), 
    StructField("userprofile_id", StringType(), True)]), True)] 
) 

source_list = [ 
'{"type":"update","ts":1510901244,"xid":1,"data":{"id":1,"bought_by":"user:1","userprofile_id":1}, "old":{"userprofile_id":null}}', 
'{"type":"update","ts":1510901245,"xid":2,"data":{"id":1,"bought_by":"user:1","userprofile_id":null}, "old":{"userprofile_id":2}}', 
'{"type":"update","ts":1510901246,"xid":3,"data":{"id":1,"bought_by":"user:1","userprofile_id":1}, "old":{"userprofile_id":2}}', 
'{"type":"update","ts":1510901246,"xid":4,"data":{"id":1,"bought_by":"user:1","userprofile_id":1}, "old":{"bought_by":"user:2"}}', 
] 

df = spark.read.json(spark.sparkContext.parallelize(source_list), schema=custom_schema) 

df.show() 

이의 출력은 다음과 같습니다 : 나는 보냈다

+------+----------+---+---------------+--------------------+ 
| type|  ts|xid|   data|     old| 
+------+----------+---+---------------+--------------------+ 
|update|1510901244| 1| [1,user:1,1]| ['N/A','N/A',null]| 
|update|1510901245| 2|[1,user:1,null]|  ['N/A','N/A',2]| 
|update|1510901246| 3| [1,user:1,1]|  ['N/A','N/A',2]| 
|update|1510901246| 4| [1,user:1,1]|['N/A',user:2,'N/A']| 
+------+----------+---+---------------+--------------------+ 

:

+------+----------+---+---------------+------------------+ 
| type|  ts|xid|   data|    old| 
+------+----------+---+---------------+------------------+ 
|update|1510901244| 1| [1,user:1,1]| [null,null,null]| 
|update|1510901245| 2|[1,user:1,null]|  [null,null,2]| 
|update|1510901246| 3| [1,user:1,1]|  [null,null,2]| 
|update|1510901246| 4| [1,user:1,1]|[null,user:2,null]| 
+------+----------+---+---------------+------------------+ 

그러나 나는 이런 식으로 뭔가를 생산하기 위해 찾고 여기

은 예입니다 솔루션을 검색하는 데 꽤 오랜 시간이 걸렸지 만이 기사를 설명하는 기사 만 찾았습니다. 누락 된 필드를 나타내는 'null'값을 가진 ituation 및 'null'값을 다른 값으로 대체 할 수있는 솔루션은 있지만 내 상황에서는 도움이되지 않습니다. "userprofile_id에 널 (null) : 우리가 Gobblin를 사용하여 데이터를 섭취하고 있기 때문에

, 우리가

"userprofile_id "를 대체하는 규칙을 추가합니다 :

지금 내가 가지고있는 가장 가까운 해결책은 이것이다 "-1

또는 문자열 값에 대한

교체

"string_field": "string_field"와 널 (null) : "N/A"

그러나이 확장 될 너무 해키입니다.

이 문제를 해결할 수있는 도움을 주시면 감사하겠습니다. 감사!

+0

나는 못생긴 해결책이있어 https://stackoverflow.com/a/47531436/8432213 더 나은 것을 기다리고있다. –

답변

0

원본 파일에서 RDD로 읽었습니다. (기본값을 사용하여) 식별하려는 'null'필드에 대한 문자열 대체를 수행하고 결과를 임시 위치에 쓰고 내용을 다시 읽습니다. DataFrame으로. 그런 다음 나중에 내 코드에서 Null로 기본값을 처리합니다. 이것은 매우 못 생겼지 만 작동합니다. 내가 어떻게 DataFrame으로 그것을 읽을 때 누락 된 JSON 필드에 기본 값을 할당하는 단서를 얻을 경우

NULL_USERPROFILE_ID = -1234321 

in_file = spark.sparkContext.textFile(source + "*") 
rdd = in_file.map(lambda x: x.replace('"userprofile_id":null', '"userprofile_id":%d' % NULL_USERPROFILE_ID)) 
rdd.saveAsTextFile(destination) 

나는 그것을 리팩토링하실 수 있습니다.