2017-12-22 26 views
1

내가 전화 휴대폰 다음 스키마 및 내용처럼 열을 폭발하려고했다와 Jsons의 목록에 열을 폭발 : Pyspark

(customer_external_id,StringType 
phones,StringType) 

customer_id phones 
x8x46x5  [{"phone" : "(xx) 35xx4x80"},{"phone" : "(xx) xxxx46605"}] 
xx44xx5  [{"phone" : "(xx) xxx3x8443"}] 
4xxxxx5  [{"phone" : "(xx) x6xx083x3"},{"areaCode" : "xx"},{"phone" : "(xx) 3xxx83x3"}] 
xx6564x  [{"phone" : "(x3) x88xx344x"}] 
xx8x4x0  [{"phone" : "(xx) x83x5x8xx"}] 
xx0434x  [{"phone" : "(0x) 3x6x4080"},{"areaCode" : "xx"}] 
x465x40  [{"phone" : "(6x) x6x445xx"}] 
x0684x8  [{"phone" : "(xx) x8x88x4x4"},{"phone" : "(xx) x8x88x4x4"}] 
x84x850  [{"phone" : "(xx) 55x56xx4"}] 
x0604xx  [{"phone" : "(xx) x8x4xxx68"}] 
4x6xxx0  [{"phone" : "(xx) x588x43xx"},{"phone" : "(xx) 5x6465xx"},{"phone" : "(xx) x835xxxx8"},{"phone" : "(xx) x5x6465xx"}] 
x6x000x  [{"phone" : "(xx) xxx044xx4"}] 
5x65533  [{"phone" : "(xx) xx686x0xx"}] 
x3668xx  [{"phone" : "(5x) 33x8x3x4"},{"phone" : "(5x) 8040x8x6"}] 

그래서 나는이 코드를 실행하려고하고 subsequential 오류가 발생했습니다 :

df.select('customer_external_id', explode(df.phones)) 

AnalysisException: u"cannot resolve 'explode(`phones`)' due to data type mismatch: input to function explode should be array or map type, not StringType;; 
'Project [customer_external_id#293, explode(phones#296) AS List()]\n+- Relation[order_id#292,customer_external_id#293,name#294,email#295,phones#296,phones_version#297,classification#298,locale#299] parquet\n" 

그리고 내가 브래킷을 제거하고 JSON으로 변환하는이 코드를 실행하므로 내 열이 StringType이었다 발견이 오류로 :

phones = df.select('customer_external_id', 'phones').rdd\ 
    .map(lambda x: str(x).replace('[','')\ 
         .replace(']','')\ 
         .replace('},{', ','))\ 
    .map(lambda x: json.loads(x).get('phone')\ 
    .map(lambda x: Row(x))\ 
    .toDF(df.select('customer_external_id','phones').schema) 
phones.show() 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 4 times, most recent failure: Lost task 0.3 in stage 38.0 (TID 2740, 10.112.80.248, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last) 

분명히 나는 ​​Json으로 캐스팅 할 수없고 열을 폭발시킬 수 없습니다.

당신이 문자열을 배열로 변환하는 from_json 방법을 사용한다 수행 할 작업을
+-----------+--------+--------------+ 
    |customer_id|areaCode|phone   | 
    +-----------+--------+--------------+ 
    |x8x46x5 |null |(xx) 35xx4x80 | 
    |x8x46x5 |null |(xx) xxxx46605| 
    |xx44xx5 |null |(xx) xxx3x8443| 
    |4xxxxx5 |null |(xx) x6xx083x3| 
    |4xxxxx5 |xx  |null   | 
    |4xxxxx5 |null |(xx) 3xxx83x3 | 
    |xx6564x |null |(x3) x88xx344x| 
    |xx8x4x0 |null |(xx) x83x5x8xx| 
    |xx0434x |null |(0x) 3x6x4080 | 
    |xx0434x |xx  |null   | 
    |x465x40 |null |(6x) x6x445xx | 
    |x0684x8 |null |(xx) x8x88x4x4| 
    |x0684x8 |null |(xx) x8x88x4x4| 
    |x84x850 |null |(xx) 55x56xx4 | 
    |x0604xx |null |(xx) x8x4xxx68| 
    |4x6xxx0 |null |(xx) x588x43xx| 
    |4x6xxx0 |null |(xx) 5x6465xx | 
    |4x6xxx0 |null |(xx) x835xxxx8| 
    |4x6xxx0 |null |(xx) x5x6465xx| 
    |x6x000x |null |(xx) xxx044xx4| 
    |5x65533 |null |(xx) xx686x0xx| 
    |x3668xx |null |(5x) 33x8x3x4 | 
    |x3668xx |null |(5x) 8040x8x6 | 
    +-----------+--------+--------------+ 
+0

왜 데이터 집합에 대한 스키마를 만든 다음 파일을 읽는 동안 전달할 수 있습니까? – ashwinids

+0

왜'string.replace()'단계를 수행하고 있습니까? 'json.loads()'할 수 없습니까? 그러면'dicts'의'list'가 반환 될 것이고'flatMap()'을 호출 할 수 있어야합니다. 또한 원하는 출력물이 어떻게 보이는지 보여 줄 수 있습니까? – pault

+1

제 대답을보십시오, https://stackoverflow.com/a/47944084/3251351 이것은 순수한 DataFrame API를 고수하고 JVM과 Python 간의 레코드 직렬화/비 직렬화를 피하고 처리를 위해 내장 된 최적화 된 기능을 사용합니다 JSON. – Silvio

답변

2

후 폭발 :

from pyspark.sql.functions import * 
from pyspark.sql.types import * 

phone_schema = ArrayType(StructType([StructField("phone", StringType())])) 

converted = inputDF\ 
    .withColumn("areaCode", get_json_object("phones", "$[*].areaCode"))\ 
    .withColumn("phones", explode(from_json("phones", phone_schema)))\ 
    .withColumn("phone", col("phones.phone"))\ 
    .drop("phones")\ 
    .filter(~isnull("phone")) 

converted.show() 
+1

이것은 대단하다. 나는 'flatMap()'을 Spark와 함께 수행하는 방법을 찾고있다. 나는 이것이 방법이라고 생각한다! – pault

1
그래서 내가 어떻게 제대로이 출력을 얻기 위해 이러한 종류의 데이터를 처리 할 수

replace()을 사용하지 않고 직접 json.loads()으로 전화 할 수 있어야한다고 생각합니다.

  1. stringArrayType(MapType())json.loads()를 사용하여지도.
  2. flatMap()을 호출하여 배열의 각 요소에 대해 Row()을 새로 만듭니다.
  3. 이 행을 원하는 출력에 매핑하십시오.

다음의 예를 살펴 보자 :

from StringIO import StringIO 
from pyspark.sql import Row 
import json 
import pandas as pd 

# mock up some sample data 
data = StringIO("""customer_id\tphones 
x8x46x5\t[{"phone" : "(xx) 35xx4x80"},{"phone" : "(xx) xxxx46605"}] 
xx44xx5\t[{"phone" : "(xx) xxx3x8443"}] 
4xxxxx5\t[{"phone" : "(xx) x6xx083x3"},{"areaCode" : "xx"},{"phone" : "(xx) 3xxx83x3"}] 
xx6564x\t[{"phone" : "(x3) x88xx344x"}] 
xx8x4x0\t[{"phone" : "(xx) x83x5x8xx"}] 
xx0434x\t[{"phone" : "(0x) 3x6x4080"},{"areaCode" : "xx"}] 
x465x40\t[{"phone" : "(6x) x6x445xx"}] 
x0684x8\t[{"phone" : "(xx) x8x88x4x4"},{"phone" : "(xx) x8x88x4x4"}] 
x84x850\t[{"phone" : "(xx) 55x56xx4"}] 
x0604xx\t[{"phone" : "(xx) x8x4xxx68"}] 
4x6xxx0\t[{"phone" : "(xx) x588x43xx"},{"phone" : "(xx) 5x6465xx"},{"phone" : "(xx) x835xxxx8"},{"phone" : "(xx) x5x6465xx"}] 
x6x000x\t[{"phone" : "(xx) xxx044xx4"}] 
5x65533\t[{"phone" : "(xx) xx686x0xx"}] 
x3668xx\t[{"phone" : "(5x) 33x8x3x4"},{"phone" : "(5x) 8040x8x6"}]""") 

pandas_df = pd.read_csv(data, sep="\t") 
df = sqlCtx.createDataFrame(pandas_df) # convert pandas to spark df 

# run the steps outlined above 
df.rdd\ 
    .map(lambda x: Row(customer_id=x['customer_id'], phones=json.loads(x['phones'])))\ 
    .flatMap(lambda x: [Row(customer_id=x['customer_id'], phone=phone) for phone in x['phones']])\ 
    .map(lambda x: Row(customer_id=x['customer_id'], phone=x['phone'].get('phone'), areaCode=x['phone'].get('areaCode')))\ 
    .toDF()\ 
    .select('customer_id', 'areaCode', 'phone')\ 
    .show(truncate=False, n=100) 

출력 :

+-----------+--------+--------------+ 
|customer_id|areaCode|phone   | 
+-----------+--------+--------------+ 
|x8x46x5 |null |(xx) 35xx4x80 | 
|x8x46x5 |null |(xx) xxxx46605| 
|xx44xx5 |null |(xx) xxx3x8443| 
|4xxxxx5 |null |(xx) x6xx083x3| 
|4xxxxx5 |xx  |null   | 
|4xxxxx5 |null |(xx) 3xxx83x3 | 
|xx6564x |null |(x3) x88xx344x| 
|xx8x4x0 |null |(xx) x83x5x8xx| 
|xx0434x |null |(0x) 3x6x4080 | 
|xx0434x |xx  |null   | 
|x465x40 |null |(6x) x6x445xx | 
|x0684x8 |null |(xx) x8x88x4x4| 
|x0684x8 |null |(xx) x8x88x4x4| 
|x84x850 |null |(xx) 55x56xx4 | 
|x0604xx |null |(xx) x8x4xxx68| 
|4x6xxx0 |null |(xx) x588x43xx| 
|4x6xxx0 |null |(xx) 5x6465xx | 
|4x6xxx0 |null |(xx) x835xxxx8| 
|4x6xxx0 |null |(xx) x5x6465xx| 
|x6x000x |null |(xx) xxx044xx4| 
|5x65533 |null |(xx) xx686x0xx| 
|x3668xx |null |(5x) 33x8x3x4 | 
|x3668xx |null |(5x) 8040x8x6 | 
+-----------+--------+--------------+ 

나는 이것이 당신이 바라고 출력이 있는지 확실하지 않습니다,하지만이 도움이 될 것입니다 너 거기 도착 해.