내가 전화 휴대폰 다음 스키마 및 내용처럼 열을 폭발하려고했다와 Jsons의 목록에 열을 폭발 : Pyspark
(customer_external_id,StringType
phones,StringType)
customer_id phones
x8x46x5 [{"phone" : "(xx) 35xx4x80"},{"phone" : "(xx) xxxx46605"}]
xx44xx5 [{"phone" : "(xx) xxx3x8443"}]
4xxxxx5 [{"phone" : "(xx) x6xx083x3"},{"areaCode" : "xx"},{"phone" : "(xx) 3xxx83x3"}]
xx6564x [{"phone" : "(x3) x88xx344x"}]
xx8x4x0 [{"phone" : "(xx) x83x5x8xx"}]
xx0434x [{"phone" : "(0x) 3x6x4080"},{"areaCode" : "xx"}]
x465x40 [{"phone" : "(6x) x6x445xx"}]
x0684x8 [{"phone" : "(xx) x8x88x4x4"},{"phone" : "(xx) x8x88x4x4"}]
x84x850 [{"phone" : "(xx) 55x56xx4"}]
x0604xx [{"phone" : "(xx) x8x4xxx68"}]
4x6xxx0 [{"phone" : "(xx) x588x43xx"},{"phone" : "(xx) 5x6465xx"},{"phone" : "(xx) x835xxxx8"},{"phone" : "(xx) x5x6465xx"}]
x6x000x [{"phone" : "(xx) xxx044xx4"}]
5x65533 [{"phone" : "(xx) xx686x0xx"}]
x3668xx [{"phone" : "(5x) 33x8x3x4"},{"phone" : "(5x) 8040x8x6"}]
그래서 나는이 코드를 실행하려고하고 subsequential 오류가 발생했습니다 :
df.select('customer_external_id', explode(df.phones))
AnalysisException: u"cannot resolve 'explode(`phones`)' due to data type mismatch: input to function explode should be array or map type, not StringType;;
'Project [customer_external_id#293, explode(phones#296) AS List()]\n+- Relation[order_id#292,customer_external_id#293,name#294,email#295,phones#296,phones_version#297,classification#298,locale#299] parquet\n"
그리고 내가 브래킷을 제거하고 JSON으로 변환하는이 코드를 실행하므로 내 열이 StringType이었다 발견이 오류로 :
phones = df.select('customer_external_id', 'phones').rdd\
.map(lambda x: str(x).replace('[','')\
.replace(']','')\
.replace('},{', ','))\
.map(lambda x: json.loads(x).get('phone')\
.map(lambda x: Row(x))\
.toDF(df.select('customer_external_id','phones').schema)
phones.show()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 4 times, most recent failure: Lost task 0.3 in stage 38.0 (TID 2740, 10.112.80.248, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
를
분명히 나는 Json으로 캐스팅 할 수없고 열을 폭발시킬 수 없습니다.
당신이 문자열을 배열로 변환하는from_json
방법을 사용한다 수행 할 작업을
+-----------+--------+--------------+
|customer_id|areaCode|phone |
+-----------+--------+--------------+
|x8x46x5 |null |(xx) 35xx4x80 |
|x8x46x5 |null |(xx) xxxx46605|
|xx44xx5 |null |(xx) xxx3x8443|
|4xxxxx5 |null |(xx) x6xx083x3|
|4xxxxx5 |xx |null |
|4xxxxx5 |null |(xx) 3xxx83x3 |
|xx6564x |null |(x3) x88xx344x|
|xx8x4x0 |null |(xx) x83x5x8xx|
|xx0434x |null |(0x) 3x6x4080 |
|xx0434x |xx |null |
|x465x40 |null |(6x) x6x445xx |
|x0684x8 |null |(xx) x8x88x4x4|
|x0684x8 |null |(xx) x8x88x4x4|
|x84x850 |null |(xx) 55x56xx4 |
|x0604xx |null |(xx) x8x4xxx68|
|4x6xxx0 |null |(xx) x588x43xx|
|4x6xxx0 |null |(xx) 5x6465xx |
|4x6xxx0 |null |(xx) x835xxxx8|
|4x6xxx0 |null |(xx) x5x6465xx|
|x6x000x |null |(xx) xxx044xx4|
|5x65533 |null |(xx) xx686x0xx|
|x3668xx |null |(5x) 33x8x3x4 |
|x3668xx |null |(5x) 8040x8x6 |
+-----------+--------+--------------+
왜 데이터 집합에 대한 스키마를 만든 다음 파일을 읽는 동안 전달할 수 있습니까? – ashwinids
왜'string.replace()'단계를 수행하고 있습니까? 'json.loads()'할 수 없습니까? 그러면'dicts'의'list'가 반환 될 것이고'flatMap()'을 호출 할 수 있어야합니다. 또한 원하는 출력물이 어떻게 보이는지 보여 줄 수 있습니까? – pault
제 대답을보십시오, https://stackoverflow.com/a/47944084/3251351 이것은 순수한 DataFrame API를 고수하고 JVM과 Python 간의 레코드 직렬화/비 직렬화를 피하고 처리를 위해 내장 된 최적화 된 기능을 사용합니다 JSON. – Silvio