시간이 흐르면서 바뀔 수있는 열이있는 세션이라는 데이터 프레임이 있습니다. (수정을 위해 : 열에 대한 사례 클래스가 없습니다. 반영된 스키마 만 있습니다.) 추적 이벤트를 구성 할 수있는 다른 내부 및 외부 범위 열과 함께 외부 범위에 uuid 및 clientId를 일관되게 사용합니다. 그래서 ... 뭔가 같이 :행 개체에 묻혀있는 정의되지 않은 사례 클래스의 배열을 연결하는 UDF
val mergeTEs = udf((oldTEs : Seq[Row], newTEs : Seq[Row]) =>
// do some stuff - figure best way
// - to merge both groups of tracking events
// - remove duplicates tracker events structures
// - limit total tracking events < 500
return result // same type as UDF input params
)
UDF를 반환 :
root
|-- runtimestamp: long (nullable = true)
|-- clientId: long (nullable = true)
|-- uuid: string (nullable = true)
|-- oldTrackingEvents: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timestamp: long (nullable = true)
| | |-- actionid: integer (nullable = true)
| | |-- actiontype: string (nullable = true)
| | |-- <tbd ... maps, arrays and other stuff matches sibling> section
...
|-- newTrackingEvents: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timestamp: long (nullable = true)
| | |-- actionid: integer (nullable = true)
| | |-- actiontype: string (nullable = true)
| | |-- <tbd ... maps, arrays and other stuff matches sibling>
...
나는 코드 로직 해결 될 아직 - 투 - 지금이 매개 변수 등을 포함하는 UDF와 oldTrackingEvents 및 newTrackingEvents을 병합하고 싶습니다 결과는 연결된 두 f의 결과 목록 인 구조의 배열이됩니다. f ields.
질문 : 내 질문 구성하는 방법입니다 같은 UDF - 정확한 전달 된 매개 변수 유형, (2) UDF 내에서 이러한 컬렉션을 조작하는 방법을 (3) 명확한 방법 (1) 사용 컴파일러 오류가없는 값을 반환합니다. 나는 실패 val testUDF = udf((trackingEvents : Seq[Row]) => trackingEvents)
와 입력/출력 Seq[Row]
을 (테스트 trackingEvents
의 직접 반환에 대한 오류 java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
을 받았다. 그러나, 나는되도록 컬렉션을 조작 할 수있는 가장 좋은 방법은 무엇 ... 대신 trackingEvents
의 Some(1)
을 반환하는 오류를 얻을 UDF에이 코멘트 섹션에서 활동을 사용하여 위에서 스키마에 의해 제안 나는 동일한 구조의 2 개 목록을 연결할 수 있습니다 목표는이 작업을 사용하는 것입니다.
sessions.select(mergeTEs('oldTrackingEvents, 'newTrackingEvents).as("cleanTrackingEvents"))
그리고 각 행의
... 돌아 'trackingEvents'구조의 단일 배열을 메모리/속도 효율적인 방식으로 전달합니다.보충 : 관련성이있는 경우
나에게 같이 질문을 보면 ...이 다른 게시물에 유용/관련 ... 아마 ..., Defining a UDF that accepts an Array of objects in a Spark DataFrame? ... To create struct function passed to udf has to return Product type (Tuple* or case class), not Row.
을 가능한 힌트가있다.
아마도 Select 문을 사용하여 새로운 열에서 Seq [Elem]의 두 열을 Seq [Elem]의 한 열로 연결하는 방법이 있을까요? – codeaperature
예. 'trackingEvents'(배열 'trackingEvent') 열에는 목록,지도 및 구조체가 있습니다. 이러한 변수가 변경되어 사례 클래스를 하드 코딩 할 수 없습니다. – codeaperature