저는 JMS 기반 시스템에서 새 시스템과 레거시 시스템 간의 트랜잭션을 동기화하는 책임이있는 Kafka로 이동하고 있습니다. 새 시스템에서 게시 한 모든 메시지는 구독자/소비자가 성공적으로 처리해야합니다. 나는 메시지의 순서에 대해 걱정할 필요가 없다. 레거시 시스템에서 일부 설계 문제 (비관적 인 잠금)로 인해 메시지가 도착할 때 트랜잭션이 거의 실패하지 않을 수 있습니다.이 경우 일부 지연 후에 메시지가 다시 나타나기를 바랍니다. 나는 카프카와 함께이 상황을 어떻게 처리 할 것인지를 고민하고있다.Confluent Kafka를 사용하여 소비자의 메시지 처리 오류를 처리하는 방법은 무엇입니까?
내 소스 및 대상 응용 프로그램은 .NET 4.6.1 및 C#에 있습니다. Confluent.Kafka v0.9.5 클라이언트 라이브러리를 사용하고 있습니다. 카프카는 0.10 버전입니다. 소비자 응용 프로그램에 대한
, 나는 장애인 자동 커밋 메시지가 성공적으로 처리 한 후 명시 적으로 commitAsync 메소드를 호출하면 오프셋 커밋해야합니다. 다음은 소비자를 만드는 방법입니다.var config = new Dictionary<string, object>()
{
{"group.id", GroupId},
{"client.id", ClientId},
{"enable.auto.commit", false},
{"bootstrap.servers", _consumerConnectionConfig.BrokerUrl},
{
"default.topic.config", new Dictionary<string, object>()
{
{"auto.offset.reset", "latest"}
}
}
};
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8));
폴링 설정 방법입니다.
consumer.Subscribe(topics);
while (!SubscriberCancellationToken.IsCancellationRequested)
{
consumer.Poll(TimeSpan.FromMilliseconds(1000));
}
가 여기에 메시지를 처리하는 메시지가 성공적으로 처리 할 수있는 경우
consumer.OnMessage += (sender, message) => {
try
{
var payload = GetPayload(message);
if (_messageHandlerService.ProcessMessage(payload))
{
consumer.CommitAsync(message).Wait(SubscriberCancellationToken);
}
else
{
//(case 1) Now what should I do?????
}
}
catch(Exception ex)
{
Log.Error("Unable to process xyz messages", ex);
throw; //??? (Case 2) should I throw the exception or should I not?
}
};
에 대한 책임의 onMessage 이벤트, 난 commitAsync를 호출하고 그 매력처럼 작동합니다. 이제는 메시지를 처리 할 수 없을 때 (케이스 1) 또는 예외가 발생했을 때 (케이스 2) 제 질문은 무엇을해야합니까입니다. 이 두 가지 상황을 처리하기 위해 어떤 선택을해야합니까?
는 JMS의 세계에 대한 (사례 1), I는 지연 재 게시 전략을 적용했다. 기본적으로 1 분 동안 기다린 다음 동일한 주제로 메시지를 다시 게시하고 현재 메시지를 커밋하므로 다시 게시 된 메시지가 다시 표시됩니다. 어떤 이유로 재 게시 할 수 없으면 프로세스가 다시 시작될 때까지 또는 프로세스가 다시 시작될 때까지 계속 재 시도합니다. 다시 게시하기 전에 프로세스가 다시 시작되면 커밋되지 않은 메시지가 다시 돌아와서 다시 시작됩니다.일단 다시 게시가 성공하면 이미 토픽에서 기다리고있는 다른 메시지가 있으면 이제부터까지 이동을 시작합니다. 기차는 모든 것이 처리 될 때까지 원을 그리며 나아 간다. 사례 1에서와 같이 메시지를 처리 할 수 없을 때마다 오류를 기록하고이를 기반으로 적절한 경고가 생성되므로 레거시 시스템에서 일부 데이터를 수정해야 할 경우에 대비하여 애플리케이션 지원 팀이 조치를 취할 수 있습니다. 그 때까지는 메시지가 계속 실패하고 다시 게시됩니다.
그리고 케이스 2
, 나는 세부 사항을 로그인하고 JMS 구현에서 예외를 throw했다. 지금 나는 사건 1처럼 처리 해야할지 궁금합니다.이제 내 질문은 어떻게 카프카의 세계에서이 두 상황을 처리해야합니까? 더 좋은 옵션이 있습니까? 당신의 소비자에서
감사합니다. 일시 중지 및 다시 시작 기능을 사용해 보겠습니다. 내가 C# 클라이언트 – Vinod
지연된 응답을 위해 죄송합니다 결과를 게시 할 것입니다. Confluent.Kafka C# 클라이언트에는 아직 일시 중지 및 다시 시작 방법이 없습니다. – Vinod