2017-11-13 4 views
0

시간이 흐르면서 바뀔 수있는 열이있는 세션이라는 데이터 프레임이 있습니다. (수정을 위해 : 열에 대한 사례 클래스가 없습니다. 반영된 스키마 만 있습니다.) 추적 이벤트를 구성 할 수있는 다른 내부 및 외부 범위 열과 함께 외부 범위에 uuid 및 clientId를 일관되게 사용합니다. 그래서 ... 뭔가 같이 :행 개체에 묻혀있는 정의되지 않은 사례 클래스의 배열을 연결하는 UDF

val mergeTEs = udf((oldTEs : Seq[Row], newTEs : Seq[Row]) => 
     // do some stuff - figure best way 
     // - to merge both groups of tracking events 
     // - remove duplicates tracker events structures 
     // - limit total tracking events < 500 
     return result // same type as UDF input params 
    ) 

UDF를 반환 :

root 
|-- runtimestamp: long (nullable = true) 
|-- clientId: long (nullable = true) 
|-- uuid: string (nullable = true) 
|-- oldTrackingEvents: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- timestamp: long (nullable = true) 
| | |-- actionid: integer (nullable = true) 
| | |-- actiontype: string (nullable = true) 
| | |-- <tbd ... maps, arrays and other stuff matches sibling> section 
... 
|-- newTrackingEvents: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- timestamp: long (nullable = true) 
| | |-- actionid: integer (nullable = true) 
| | |-- actiontype: string (nullable = true) 
| | |-- <tbd ... maps, arrays and other stuff matches sibling>  
... 

나는 코드 로직 해결 될 아직 - 투 - 지금이 매개 변수 등을 포함하는 UDF와 oldTrackingEvents 및 newTrackingEvents을 병합하고 싶습니다 결과는 연결된 두 f의 결과 목록 인 구조의 배열이됩니다. f ields.

질문 : 내 질문 구성하는 방법입니다 같은 UDF - 정확한 전달 된 매개 변수 유형, (2) UDF 내에서 이러한 컬렉션을 조작하는 방법을 (3) 명확한 방법 (1) 사용 컴파일러 오류가없는 값을 반환합니다. 나는 실패 val testUDF = udf((trackingEvents : Seq[Row]) => trackingEvents)와 입력/출력 Seq[Row]을 (테스트 trackingEvents의 직접 반환에 대한 오류 java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported을 받았다. 그러나, 나는되도록 컬렉션을 조작 할 수있는 가장 좋은 방법은 무엇 ... 대신 trackingEventsSome(1)을 반환하는 오류를 얻을 UDF에이 코멘트 섹션에서 활동을 사용하여 위에서 스키마에 의해 제안 나는 동일한 구조의 2 개 목록을 연결할 수 있습니다 목표는이 작업을 사용하는 것입니다.

sessions.select(mergeTEs('oldTrackingEvents, 'newTrackingEvents).as("cleanTrackingEvents")) 

그리고 각 행의

... 돌아 'trackingEvents'구조의 단일 배열을 메모리/속도 효율적인 방식으로 전달합니다.

보충 : 관련성이있는 경우

나에게 같이 질문을 보면 ...이 다른 게시물에 유용/관련 ... 아마 ..., Defining a UDF that accepts an Array of objects in a Spark DataFrame? ... To create struct function passed to udf has to return Product type (Tuple* or case class), not Row. 을 가능한 힌트가있다.

+0

아마도 Select 문을 사용하여 새로운 열에서 Seq [Elem]의 두 열을 Seq [Elem]의 한 열로 연결하는 방법이 있을까요? – codeaperature

+0

예. 'trackingEvents'(배열 'trackingEvent') 열에는 목록,지도 및 구조체가 있습니다. 이러한 변수가 변경되어 사례 클래스를 하드 코딩 할 수 없습니다. – codeaperature

답변

1

나는 question you've linked이 그 모든 것을 설명한다고 생각하기 때문에 되풀이합니다. udf 작업시 : StructType에 대한

  • 입력 표현은 객체를 입력 약하게된다.
  • StructType의 출력 유형은 Scala Product이어야합니다. 개체를 반환 할 수 없습니다.

훨씬 부담, 당신은 강하게 TSession 스키마를 나타내는 대수 데이터 형식입니다

val f: T => U 
sessions.as[T].map(f): Dataset[U] 

Dataset을하게 분류해야하고, U는 결과를 나타내는 대수 데이터 형식이됩니다.

+0

당신이 암시하는 것을 이해하기 시작했습니다. 나는이 문제가 '대수 데이터 타입'을 가지고 있지 않다라고 생각한다 ... 나는 단지'sessions.select ('oldTrackingEvents) .schema' (이것은 유연한 타입 임)에 의해 유추 된 스키마를 가지고 있으며 이것을 생각하고있다. 스키마를 scala.product로 변환해야합니다. BTW - 나는 또한 Spark 1.6을 사용해야한다. – codeaperature

0

...일부 임의의 행 구조/스키마 시퀀스를 일부 조작으로 병합하는 것이 목표 인 경우 파티셔닝 토크를 피할 수있는 대체로 일반적으로 언급 된 방법입니다.

마스터 데이터 프레임에서 각 trackingEvents 섹션에 대한 데이터 프레임을 만들려면 newold. 각각에 대해 '추적 이벤트'섹션의 열을 선택하십시오. 이 val 데이터 프레임 선언을 newTEoldTE으로 저장하십시오.

이 선택됩니다 열 oldTrackingEvents의 배열의 각 추적 이벤트에 고유하며 각 등 newTrackingEventsuuid, clientId 및 이벤트 timestamp의 또 다른 dataframe를 만듭니다. 귀하의 의사 스키마는 다음과 같습니다

(uuid: String, clientId : Long, newTE : Seq[Long], oldTE : Seq[Long])

이 구조의 두 가지 간단한 시퀀스를 결합하기 위하여 UDF를 사용하여, 예를 들어 '는 검증되지 않은 같은'모두 Seq[Long] :

val limitEventsUDF = udf { (newTE: Seq[Long], oldTE: Seq[Long], limit: Int, tooOld: Long) => { 
    (newTE ++ oldTE).filter(_ > tooOld).sortWith(_ > _).distinct.take(limit) 
}} 

UDF 정리 된 추적 이벤트의 데이터 프레임을 반환합니다. & 이제는 제거 된 이벤트가있는 매우 슬림 한 데이터 프레임을 사용하여 서로 합쳐진 후 분해 된 newTEoldTE 프레임으로 자체 조인 할 수 있습니다.

GroupBy 필요하면 collect_list를 사용하십시오.

아직도 ... 이것은 많은 작업처럼 보입니다. 이걸로 투표해야합니까? "the answer" - 확실하지 않습니다.