2017-11-13 4 views
0

Apache Beam JavaSDK를 사용하여 PubSub에 쓸 수 없습니다.Apache 빔 PubSubIO 쓰기

빔을 사용하여 PubSub에서 읽고 처리하고 PubSub 항목에 데이터를 쓰려고하는데 PubSub에 쓰는 방법에 대한 작업 예제를 찾을 수 없습니다.

누군가 PubSub 주제에 글을 쓰는 데 적합한 변형을 도울 수 있습니까?

.apply("Create pubsub messages", ParDo.of(new DoFn<String, PubsubMessage>() { 
    @DoFn.ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
     PubsubMessage pubsubMessage = new PubsubMessage(c.element()); 
     c.output(pubsubMessage); 
    } 
    })) 
.apply("Write messages to topic",PubsubIO.writeMessages().to("projects/project_id/topics/topic_name")) 

나는 현재 GCS에서 읽기의 예는 컴파일 오류가 여기에

[ERROR] COMPILATION ERROR : 
[INFO] ------------------------------------------------------------- 
[ERROR] /home/username/src/main/java/com/domain/JavaClass.java:[336,1] no suitable method found for apply(java.lang.String,org.apache.beam.sdk.transforms.ParDo.SingleOutput<java.lang.String,org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage>) 
    method org.apache.beam.sdk.values.PCollection.<OutputT>apply(org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>>,OutputT>) is not applicable 
     (cannot infer type-variable(s) OutputT 
     (actual and formal argument lists differ in length)) 
    method org.apache.beam.sdk.values.PCollection.<OutputT>apply(java.lang.String,org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>>,OutputT>) is not applicable 
     (inference variable InputT has incompatible bounds 
     equality constraints: java.lang.String 
     lower bounds: org.apache.beam.sdk.values.KV<java.lang.String,java.lang.Integer>) 
[ERROR] /home/username//src/main/java/com/domain/JavaClass.java:[339,39] constructor PubsubMessage in class org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be applied to given types; 
    required: byte[],java.util.Map<java.lang.String,java.lang.String> 
    found: java.lang.String 
    reason: actual and formal argument lists differ in length 
+0

하지하는 설명하기 위해 귀하의 질문을 수정하시기 바랍니다 포함 된 코드 스 니펫으로 작업 할 수 있습니다. – jkff

+0

이 sinpet 전에 파이프 라인 부분을 게시 할 수 있습니까? 즉, "pubsub 메시지 작성"변환에 어떤 PCollection을 사용하고 있습니까? snipet 자체는 괜찮아 보입니다. 나는 비슷한 것을 사용합니다. 잘 작동합니다. – Arqu

답변

0

에게 있습니다 받고 술집/하위에 쓰기 오전 :

package ... 

import java.io.IOException; 

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; 
import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; 
import org.apache.beam.sdk.options.Default; 
import org.apache.beam.sdk.options.Description; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.values.PCollection; 

public class Gcs2PubSub { 

    public interface Pubsub2DatastoreOptions extends DataflowPipelineOptions { 
     @Description("GCP project name") 
     @Default.String("gcp_project_name") 
     String getProjectId(); 
     void setProjectId(String value); 
    } 

    public static void main(String[] args) throws IOException { 

     Pubsub2DatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
       .as(Pubsub2DatastoreOptions.class); 

     Pipeline p = Pipeline.create(options); 

     PCollection<String> line = p.apply("Read GCS",TextIO.read().from("gs://<bucket>/*")); 
     line.apply("Sending Pub/sub",PubsubIO.writeStrings().to("<topic>")); 

     PipelineResult result = p.run(); 
     try { 
      result.waitUntilFinish(); 
     } catch (Exception exc) { 
      result.cancel(); 
     } 
    } 
}