2017-02-03 6 views
2

Google 그룹에 이미 this 질문을했지만 아직 응답을받지 못했습니다. 다른 잠재 고객을 위해 여기에 게시하십시오.Reactive-Kafka : 예외적으로 소비자를 일시 중지하고 요청시 다시 시도하는 방법

우리는 Reactive-Kafka를 사용하고 있습니다. 다음과 같은 시나리오가 있습니다. 메시지를 처리하는 동안 예외가 발생하면 소비자에게 메시지를 보내지 않습니다. 메시지는 규정 된 시간 이후에 또는 소비자 측의 명시 적 요청 후에 재 시도되어야합니다. 현재의 접근 방식에 따르면, 소비자의 데이터베이스가 언젠가 다운 될 경우 카프카 (kafka)에서 읽으려고 시도하고 메시지를 처리하려고 시도하지만 DB 문제로 인해 처리가 실패합니다. 이렇게하면 응용 프로그램이 불필요하게 작동합니다. 이 대신에 우리는 소비자가 정해진 시간 동안 메시지를 받도록 일시 정지하려고합니다 (다시 말하면 30 분을 기다리십시오). 우리는이 사건을 어떻게 처리 할 지 잘 모릅니다.

동일한 작업을 수행 할 수 있습니까? 내가 놓친 게 있니?

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
     .mapAsync(1) { msg => 
     Future { 
      /** 
      * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down 
      */ 
     }.map(_ => msg.committableOffset).recover { 
      case ex => { 
      /** 
       * HOW TO DO ???????? 
       * On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time 
       * or on demand from the last committed offset 
       */ 
      throw ex 
      } 
     } 
     } 
     .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => 
     batch.updated(elem) 
     } 
     .mapAsync(3)(_.commitScaladsl()) 
     .runWith(Sink.ignore) 

답변

0

이 목적을 위해 recoverWithRetries 콤비가 : 여기

는 반응 카프카에서 가져온 샘플 코드입니다. 참고로 this answerdocs을 참조하십시오.

당신은 당신의 소스

val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
     .mapAsync(1) { msg => 
     Future { 
      /** 
      * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down 
      */ 
     }.map(_ => msg.committableOffset) 

를 추출하고 할 수

src 
    .recoverWithRetries(attempts = -1, {case e: MyDatabaseException => 
    logger.error(e) 
    src.delay(30.minutes, DelayOverflowStrategy.backpressure)}) 
    ... 

+0

응답 해 주셔서 감사합니다. :) 그것을 밖으로 시도하자. –

1

주 (시도와 다시 시도가 = -1 재 시도 무기한을 의미한다), 당신은 아마지도를해야합니다 의 구체화 된 값은 :

val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
    .mapAsync(1) { msg => 
    Future { 
     /** 
     * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down 
     */ 
    }.map(_ => msg.committableOffset) 
    .mapMaterializedValue(_ => akka.NotUsed)