고정 된 수의 문자열 (테스트에 사용 된 800,000 개의 1KB)을 PubSub 항목에 넣고 다음 Apache Beam (2.1.0) 작업을에서 실행하는 경우 정확히 한 번 데이터 흐름은 예상대로 보존됩니다. (데이터 흐름 콘솔에서와 같이), 그리고 다시 출력 파일을 쫓겨 같은 작업이 실행되는 경우, 모든 요소 전에 배수PubSub에서 읽고 데이터를 Google Cloud Storage에 쓰는 데이터 흐름 작업
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface PubSubToGsPipelineOptions extends PipelineOptions {
@Description("PubSub subscription")
String getInput();
void setInput(String input);
@Description("Google Cloud Storage output path")
String getOutput();
void setOutput(String output);
}
은 읽기
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGsSimpleJob {
public static void main(String[] args) {
PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PubSubToGsPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
p.run();
}
}
PipelineOptions
구현 이하 PubSub 주제에 공개 된 원래 데이터 세트보다 적은 수의 레코드가 있습니다. 작업을 배수하고 교체하면 데이터 손실이 발생할 수 있음을 알 수 있습니다. 이는 this google cloud blog post에 적어도 에 의미가 있어야한다고 언급하기 때문에 이상합니다. 이 파이프 라인은 작업을 배수하고 교체 할 때 적어도 한 번 이상의 의미 (또는 한 번 더 정확하게 한 번 의미론)를 달성하도록 설계되어야합니까?
그렇다면 TextIO can/should가 위와 같은 경우를 더 잘 처리해야합니다. 출력 경로를 변경하면 모든 레코드가 보존되는지 확인하십시오. 그러면 Dataflow의 보증을 확인할 수 있습니다. –
로그 패턴과 일치하는 행을 찾을 수 없습니다. 별도의 출력에 쓰도록 대체 작업을 구성하면 적어도 한 번 의미가 생깁니다. PubSub을 소스로 사용하여 의미가 유지되지 않으면 정확하게 중복 레코드가 있습니다. 블로그 게시물에는'사용자 지정 소스가 정확히 한 번 배달을 보장하고 소스 쪽 버퍼링을 제공하는 경우 드레인 및 바꾸기가 정확히 한 번 의미를 제공 할 수 있습니다. 'PubSub 메시지가 ACK로 응답하기 전에 버퍼링 될 수 있으므로이 PubSub은 정확히 한 번 의미를 제공 할 수 있다고 제안합니까? – JonSim
해결 방법을 알려 주셔서 감사합니다. TextIO에 대한 버그를 제기 할 것입니다. 나는 당신이 중복 된 것을보고 놀랐습니다. PubSub은이 경우 정확히 한 번 의미를 제공해야합니다. 더 많은 정보 (그리고 할 수있는 job_id ID)를 제공 할 수 있다면 매우 유용 할 것입니다. 확실히 중복 소스를 더 파헤 치는 데 관심이 있습니다. –