2017-12-13 19 views
1

그래서 나는이 streming 데이터 프레임을 가지고 있으며이 'customer_ids'열을 간단한 문자열로 변환하려고합니다. 다음은이 변환하고자하는MapType (StringType, StringType)의 열을 StringType으로 변환하는 방법은 무엇입니까?

schema = StructType()\ 
    .add("customer_ids", MapType(StringType(), StringType()))\ 
    .add("date", TimestampType()) 

original_sdf = spark.readStream.option("maxFilesPerTrigger", 800)\ 
    .load(path=source, ftormat="parquet", schema=schema)\ 
    .select('customer_ids', 'date') 

original_sdf.groupBy('customer_ids')\ 
    .agg(max('date')) \ 
    .writeStream \ 
    .trigger(once=True) \ 
    .format("memory") \ 
    .queryName('query') \ 
    .outputMode("complete") \ 
    .start() 

처럼 최대 (날짜)에 의해이 열 및 agregate에 의해 그룹화 해,하지만 난 주조 수있는 방법이 예외

AnalysisException: u'expression `customer_ids` cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type. 

있어 이런 종류의 스트리밍 DataFrame 열 또는 다른 방법으로 groupBy이 열?

+0

일부 입력/출력 데이터를 제공하십시오. – Suresh

답변

1

TL; DRgetItem 사용 방법은 MapType 열에 키당 값에 액세스하기.


진짜 문제는 어떤 키 (들) MapType 열이 키의 다양한있을 수 있기 때문에 당신이 groupBy하려는 것입니다. 모든 키는 맵 열의 값이있는 열이 될 수 있습니다.

의 getItem (키 : 모두) : 콜럼 배열에서 위치 순서에서 항목을 가져 식, 또는

당신은 Column.getItem 방법 (또는 유사한 파이썬 부두)를 사용하여 키에 액세스 할 수 MapType의 키 키로 값을 가져옵니다.

는 (I 스칼라를 사용하고 가정 운동으로 pyspark로 변환 떠납니다)

val ds = Seq(Map("hello" -> "world")).toDF("m") 
scala> ds.show(false) 
+-------------------+ 
|m     | 
+-------------------+ 
|Map(hello -> world)| 
+-------------------+ 

scala> ds.select($"m".getItem("hello") as "hello").show 
+-----+ 
|hello| 
+-----+ 
|world| 
+-----+