Google 그룹에 이미 this 질문을했지만 아직 응답을받지 못했습니다. 다른 잠재 고객을 위해 여기에 게시하십시오.Reactive-Kafka : 예외적으로 소비자를 일시 중지하고 요청시 다시 시도하는 방법
우리는 Reactive-Kafka를 사용하고 있습니다. 다음과 같은 시나리오가 있습니다. 메시지를 처리하는 동안 예외가 발생하면 소비자에게 메시지를 보내지 않습니다. 메시지는 규정 된 시간 이후에 또는 소비자 측의 명시 적 요청 후에 재 시도되어야합니다. 현재의 접근 방식에 따르면, 소비자의 데이터베이스가 언젠가 다운 될 경우 카프카 (kafka)에서 읽으려고 시도하고 메시지를 처리하려고 시도하지만 DB 문제로 인해 처리가 실패합니다. 이렇게하면 응용 프로그램이 불필요하게 작동합니다. 이 대신에 우리는 소비자가 정해진 시간 동안 메시지를 받도록 일시 정지하려고합니다 (다시 말하면 30 분을 기다리십시오). 우리는이 사건을 어떻게 처리 할 지 잘 모릅니다.
동일한 작업을 수행 할 수 있습니까? 내가 놓친 게 있니?
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset).recover {
case ex => {
/**
* HOW TO DO ????????
* On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
* or on demand from the last committed offset
*/
throw ex
}
}
}
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
응답 해 주셔서 감사합니다. :) 그것을 밖으로 시도하자. –