2017-11-30 18 views
1

유형 II를 들어, .CoGroupByKey()테이블 행 PCollection을 키, 값 PCollections로 변환하는 방법은 무엇입니까?

상황 기본적으로 나는 두 개의 큰 pCollections을 가지고 있고이 둘 사이의 차이를 찾을 수 있어야에 입력에 필요한 pCollections에 pCollections을 변환하는 방법에 대한 설명서가 없습니다 ETL이 변경되면 (pColl1에없는 경우 pColl2에서 중첩 된 필드에 추가) BigQuery에서 이러한 레코드의 기록을 유지할 수있게됩니다.

파이프 라인 아키텍처 : 2 pCollections에

  1. 읽기 BQ 테이블 : dwsku 및 제품.
  2. 두 세트에 CoGroupByKey()를 적용하여 결과를 반환합니다. -> 결과
  3. dwsku의 모든 변경 사항을 찾아내어 제품에 중첩합니다.

도움이 될 것입니다. 그래서 내가 성취해야 할 것과 동일한 일을하는 자바 링크를 발견했습니다 (그러나 파이썬 SDK에는 아무것도 없습니다).

Convert from PCollection<TableRow> to PCollection<KV<K,V>>

아파치 빔, 특히 파이썬 SDK에 대한 문서/지원이 있습니까? 데이터-CoGroupByKey() 작업을 진행하기 위해

답변

1

, 당신은 첫 번째 요소는 와 두 번째가 될 것입니다있는 tuplesPCollections을 가질 필요가있다.

BigQuerySource이 있으며 현재 Apache Beam 버전에서 PCollection of dictionaries (code)을 출력하며 모든 항목은 읽은 테이블의 행을 나타냅니다. 위에서 설명한 것처럼이 PCollection을 튜플에 매핑해야합니다. 이 ParDo를 사용하여 쉽게 할 수 있습니다 :

class MapBigQueryRow(beam.DoFn): 
    def process(self, element, key_column): 
     key = element.get(key_column) 
     yield key, element 


data1 = (p 
      | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1")) 
      | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1")) 

data2 = (p 
      | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2")) 
      | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2")) 

co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey()) 

# do your processing with co_grouped here 

BTW, 아파치 빔 파이썬 SDK의 문서는 here를 찾을 수 있습니다.