2017-10-18 6 views
0

SideOutputs의 결과로 두 개의 출력 PCollection을 만들고 일부 조건에 따라 BigQuery에 하나만 쓰고 싶다고 가정합니다. 이 작업을 수행하는 방법?특정 PCollection을 BigQuery에 작성하십시오.

기본적으로 내가 사용하는 경우 Write_Append 및 Write_Truncate를 동적으로 만들려고합니다. BigQuery에서 유지 관리하는 config 테이블의 정보 (append/truncate)를 가져옵니다. 따라서 내가 config 테이블에 가지고있는 것에 따라 Truncate 나 Append를 적용해야합니다.

그래서 SideOutputs를 사용하여 두 개의 PCollection (각각 Append와 Truncate)을 만들 수있었습니다.이 중 하나는 비어 있습니다. 모든 행을 포함하는 행을 BigQuery에 작성해야합니다. 이 접근법이 맞습니까? 내가 사용하고

코드 :

final TupleTag<TableRow> truncate = 
        new TupleTag<TableRow>(){}; 
       // Output that contains word lengths. 
       final TupleTag<TableRow> append = 
        new TupleTag<TableRow>(){}; 

       PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){ 
       @ProcessElement 
       public void processElement(ProcessContext c) 
       { 
        String value = c.sideInput(configView).get(0).toString(); 
        LOG.info("config: "+value); 
        if(value.equals("truncate")){ 
         LOG.info("outputting to truncate"); 
         c.output(new TableRow().set("color", c.element())); 
        } 
        else 
        { 
         LOG.info("outputting to append"); 
         c.output(append,new TableRow().set("color", c.element())); 
        } 
        //c.output(new TableRow().set("color", c.element())); 
       } 
      }).withSideInputs(configView).withOutputTags(truncate, 
        TupleTagList.of(append))); 

       results.get(truncate).apply("truncate",BigQueryIO.writeTableRows() 
         .to("projectid:datasetid.tableid") 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

       results.get(append).apply("append",BigQueryIO.writeTableRows() 
         .to("projectid:datasetid.tableid") 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

나는 두 가지 중 하나를 수행해야합니다. 두 테이블을 모두 처리하면 어쨌든 잘립니다.

P. 나는 자바 SDK (아파치 빔 2.1)

+0

"이 방법이 올바른지 여부"에 대한 일반적인 질문입니까? 또는 일부 코드 솔루션을 원하십니까? –

+0

@Marcin Zablocki 그럼 둘 다 ... 코드 솔루션을 좀 갖고 싶습니다. – rish0097

+0

두 개의 PCollection이 있다고 했으므로 문제가 무엇입니까? 분할과 쓰기의 접근법은 괜찮은 것 같습니다. –

답변

0

를 사용하고 난 데이터가 존재하지 않는 경우는 파이프 라인의 모든 WRITE_TRUNCATE와 BigQuery를 테이블에 쓰기에 를 포함하는 경우, 그 권리이며, 현재 테이블도 잘릴 것으로 예상 . 이 경우 더 구성 가능한 동작을 지원하려면 file a JIRA으로 자유롭게 이동하십시오.

조건부로 잘리지 않으려면 조건부로 쓰기 변환을 전혀 포함하지 않아야합니다. 해당 수준으로 조건을 푸시 할 수있는 방법이 있습니까? 아니면 실제로 파이프 라인의 다른 데이터에서 조건을 계산해야합니까?

(내가 생각할 수있는 유일한 대안은 DynamicDestinations를 사용하여 동적으로 테이블 이름을 잘라내어 대신 다른 더미 빈 테이블을 잘라내는 것입니다. 이전 단락에 대한 답을 작성한 후 더 자세히 설명 할 수 있습니다)

+0

안녕하세요 @ jkff ... 그 수준으로 조건을 밀어 방법은 내가 여기에 찾을 수있는 솔루션으로 기대하고있어 ... 그리고 예 그것은 데이터를 계산해야합니다 내가 config 테이블에서 검색 할 수 있습니다 APPEND 또는 TRUNCATE ... DynamicDestinations를 사용하는 해결 방법이 있다면 정말 알고 싶습니다 ... – rish0097

+0

주 프로그램에서 if (condition) {p.apply (... APPEND ...) } else {p.apply (... TRUNCATE ...)}? 또는 조건 자체가 파이프 라인에 의해 계산 된 데이터에 의존하며 파이프 라인을 구성하는 주 프로그램에 의해 평가 될 수 없습니까? – jkff

+0

파이프 라인으로 계산 된 데이터에 따라 다릅니다. – rish0097