2017-12-27 41 views
0

Spring Cloud Streams + Kafka Binding을 사용하여 Apache Kafka에서 "정확히 하나의 전달"개념으로 PoC를 수행하려고합니다.Spring Cloud Stream 카프카 바인더 : "IN_TRANSACTION 상태에서 IN_TRANSACTION 상태로의 변환이 잘못되었습니다."

Apache Kafka "kafka_2.11-1.0.0"을 설치하고 제작자에 "transactionIdPrefix"를 정의했습니다. 봄 카프카에서 트랜잭션을 사용하려면이 작업을 수행해야합니다.하지만 그렇게 할 때 동일한 응용 프로그램 내에서 단순한 소스 & 싱크 바인딩을 실행하면 일부 메시지가 수신되어 소비자에게 인쇄되고 일부는 오류가 발생하는 것을 볼 수 있습니다.

예를 들어, 메시지 # 6 수신 : 무엇을

2017-12-27 16:15:07.405 ERROR 7731 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafk[email protected]7d3bbc0b]; nested exception is org.apache.kafka.common.KafkaException: TransactionalId my-transaction-3: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, failedMessage=GenericMessage [payload=byte[13], headers={my-transaction-id=my-id-7, id=d31656af-3286-99b0-c736-d53aa57a5e65, contentType=application/json, timestamp=1514384107399}] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153) 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:575) 
  • :

    [49] Received message [Payload String content=FromSource1 6][Headers={kafka_offset=1957, scst_nativeHeadersPresent=true, [email protected]695c9a9, kafka_timestampType=CREATE_TIME, my-transaction-id=my-id-6, id=302cf3ef-a154-fd42-6b43-983778e275dc, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=test10, kafka_receivedTimestamp=1514384106395, timestamp=1514384106419}] 
    

    하지만 메시지 # 7에 오류가 있었다 "잘못된 전환 상태 IN_TRANSACTION에 상태 IN_TRANSACTION에서 시도" 이 오류는 무엇을 의미합니까?

  • 구성에 뭔가가 누락 되었습니까?
  • 트랜잭션을 사용할 때 소스 또는 싱크를 다르게 구현해야합니까?

UPDATE : 내가 프로젝트의 GitHub의에 문제가있을 토론을 참조하시기 바랍니다 열었다.


카프카 + Trasanctions 봄 부트 버전으로 생성하는 간단한 받는다는 프로젝트에 필요 재현하려면

을 활성화 바인딩 봄 클라우드 스트림을 사용하는 방법의 예를 찾을 수 없습니다 "2.0.0.M5

,536 :

server: 
    port: 8082 
spring: 
    kafka: 
    producer: 
     retries: 5555 
     acks: "all" 
    cloud: 
    stream: 
     kafka: 
     binder: 
      autoAddPartitions: true 
      transaction: 
      transactionIdPrefix: my-transaction- 
     bindings: 
     output1: 
      destination: test10 
      group: test111 
      binder: kafka 
     input1: 
      destination: test10 
      group: test111 
      binder: kafka 
      consumer: 
      partitioned: true 

나는 또한 간단한 소스 및 싱크 클래스를 생성 :이 구성으로 간단한 응용 프로그램 작성에 "와"스프링 클라우드 스트림 종속 "버전"Elmhurst.M3 "

@EnableBinding(SampleSink.MultiInputSink.class) 
public class SampleSink { 

    @StreamListener(MultiInputSink.INPUT1) 
    public synchronized void receive1(Message<?> message) { 
     System.out.println("["+Thread.currentThread().getId()+"] Received message " + message); 
    } 

    public interface MultiInputSink { 
     String INPUT1 = "input1"; 

     @Input(INPUT1) 
      SubscribableChannel input1(); 

    } 
} 

과 :

@EnableBinding(SampleSource.MultiOutputSource.class) 
public class SampleSource { 

    AtomicInteger atomicInteger = new AtomicInteger(1); 

    @Bean 
    @InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) 
    public synchronized MessageSource<String> messageSource1() { 
     return new MessageSource<String>() { 
      public Message<String> receive() { 
       String message = "FromSource1 "+atomicInteger.getAndIncrement(); 
       m.put("my-transaction-id","my-id-"+ UUID.randomUUID()); 
       return new GenericMessage(message, new MessageHeaders(m)); 
      } 
     }; 
    } 

    public interface MultiOutputSource { 
     String OUTPUT1 = "output1"; 

     @Output(OUTPUT1) 
      MessageChannel output1(); 

    } 
} 

답변

0

나는 프로젝트의 GitHub의에 그에서 티켓을 열었다.

https://github.com/spring-cloud/spring-cloud-stream/issues/1166

을하지만, 첫 번째 대답이 있었다 : 거기에 대한 답변 및 토론을 참조하십시오

바인더는 현재 생산 개시 트랜잭션을 지원하지 않습니다.

트랜잭션은 소비자가 트랜잭션을 시작하고 프로듀서가 해당 트랜잭션에 참여하는 프로세서에서 지원됩니다.

소비자가없는 경우 spring-kafka를 직접 사용하여 생산자 측에서 트랜잭션을 시작할 수 있어야합니다.