0
Apache Beam JavaSDK를 사용하여 PubSub에 쓸 수 없습니다.Apache 빔 PubSubIO 쓰기
빔을 사용하여 PubSub에서 읽고 처리하고 PubSub 항목에 데이터를 쓰려고하는데 PubSub에 쓰는 방법에 대한 작업 예제를 찾을 수 없습니다.
누군가 PubSub 주제에 글을 쓰는 데 적합한 변형을 도울 수 있습니까?
.apply("Create pubsub messages", ParDo.of(new DoFn<String, PubsubMessage>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) throws Exception {
PubsubMessage pubsubMessage = new PubsubMessage(c.element());
c.output(pubsubMessage);
}
}))
.apply("Write messages to topic",PubsubIO.writeMessages().to("projects/project_id/topics/topic_name"))
나는 현재 GCS에서 읽기의 예는 컴파일 오류가 여기에
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /home/username/src/main/java/com/domain/JavaClass.java:[336,1] no suitable method found for apply(java.lang.String,org.apache.beam.sdk.transforms.ParDo.SingleOutput<java.lang.String,org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage>)
method org.apache.beam.sdk.values.PCollection.<OutputT>apply(org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>>,OutputT>) is not applicable
(cannot infer type-variable(s) OutputT
(actual and formal argument lists differ in length))
method org.apache.beam.sdk.values.PCollection.<OutputT>apply(java.lang.String,org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>>,OutputT>) is not applicable
(inference variable InputT has incompatible bounds
equality constraints: java.lang.String
lower bounds: org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>)
[ERROR] /home/username//src/main/java/com/domain/JavaClass.java:[339,39] constructor PubsubMessage in class org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be applied to given types;
required: byte[],java.util.Map<java.lang.String,java.lang.String>
found: java.lang.String
reason: actual and formal argument lists differ in length
하지하는 설명하기 위해 귀하의 질문을 수정하시기 바랍니다 포함 된 코드 스 니펫으로 작업 할 수 있습니다. – jkff
이 sinpet 전에 파이프 라인 부분을 게시 할 수 있습니까? 즉, "pubsub 메시지 작성"변환에 어떤 PCollection을 사용하고 있습니까? snipet 자체는 괜찮아 보입니다. 나는 비슷한 것을 사용합니다. 잘 작동합니다. – Arqu