SideOutputs의 결과로 두 개의 출력 PCollection을 만들고 일부 조건에 따라 BigQuery에 하나만 쓰고 싶다고 가정합니다. 이 작업을 수행하는 방법?특정 PCollection을 BigQuery에 작성하십시오.
기본적으로 내가 사용하는 경우 Write_Append 및 Write_Truncate를 동적으로 만들려고합니다. BigQuery에서 유지 관리하는 config 테이블의 정보 (append/truncate)를 가져옵니다. 따라서 내가 config 테이블에 가지고있는 것에 따라 Truncate 나 Append를 적용해야합니다.
그래서 SideOutputs를 사용하여 두 개의 PCollection (각각 Append와 Truncate)을 만들 수있었습니다.이 중 하나는 비어 있습니다. 모든 행을 포함하는 행을 BigQuery에 작성해야합니다. 이 접근법이 맞습니까? 내가 사용하고
코드 :
final TupleTag<TableRow> truncate =
new TupleTag<TableRow>(){};
// Output that contains word lengths.
final TupleTag<TableRow> append =
new TupleTag<TableRow>(){};
PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c)
{
String value = c.sideInput(configView).get(0).toString();
LOG.info("config: "+value);
if(value.equals("truncate")){
LOG.info("outputting to truncate");
c.output(new TableRow().set("color", c.element()));
}
else
{
LOG.info("outputting to append");
c.output(append,new TableRow().set("color", c.element()));
}
//c.output(new TableRow().set("color", c.element()));
}
}).withSideInputs(configView).withOutputTags(truncate,
TupleTagList.of(append)));
results.get(truncate).apply("truncate",BigQueryIO.writeTableRows()
.to("projectid:datasetid.tableid")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
results.get(append).apply("append",BigQueryIO.writeTableRows()
.to("projectid:datasetid.tableid")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
나는 두 가지 중 하나를 수행해야합니다. 두 테이블을 모두 처리하면 어쨌든 잘립니다.
P. 나는 자바 SDK (아파치 빔 2.1)
"이 방법이 올바른지 여부"에 대한 일반적인 질문입니까? 또는 일부 코드 솔루션을 원하십니까? –
@Marcin Zablocki 그럼 둘 다 ... 코드 솔루션을 좀 갖고 싶습니다. – rish0097
두 개의 PCollection이 있다고 했으므로 문제가 무엇입니까? 분할과 쓰기의 접근법은 괜찮은 것 같습니다. –