2017-05-05 7 views
0

저는 JMS 기반 시스템에서 새 시스템과 레거시 시스템 간의 트랜잭션을 동기화하는 책임이있는 Kafka로 이동하고 있습니다. 새 시스템에서 게시 한 모든 메시지는 구독자/소비자가 성공적으로 처리해야합니다. 나는 메시지의 순서에 대해 걱정할 필요가 없다. 레거시 시스템에서 일부 설계 문제 (비관적 인 잠금)로 인해 메시지가 도착할 때 트랜잭션이 거의 실패하지 않을 수 있습니다.이 경우 일부 지연 후에 메시지가 다시 나타나기를 바랍니다. 나는 카프카와 함께이 상황을 어떻게 처리 할 것인지를 고민하고있다.Confluent Kafka를 사용하여 소비자의 메시지 처리 오류를 처리하는 방법은 무엇입니까?

내 소스 및 대상 응용 프로그램은 .NET 4.6.1 및 C#에 있습니다. Confluent.Kafka v0.9.5 클라이언트 라이브러리를 사용하고 있습니다. 카프카는 0.10 버전입니다. 소비자 응용 프로그램에 대한

, 나는 장애인 자동 커밋 메시지가 성공적으로 처리 한 후 명시 적으로 commitAsync 메소드를 호출하면 오프셋 커밋해야합니다. 다음은 소비자를 만드는 방법입니다.

var config = new Dictionary<string, object>() 
       { 
        {"group.id", GroupId}, 
        {"client.id", ClientId}, 
        {"enable.auto.commit", false}, 
        {"bootstrap.servers", _consumerConnectionConfig.BrokerUrl}, 
        { 
         "default.topic.config", new Dictionary<string, object>() 
         { 
          {"auto.offset.reset", "latest"} 
         } 
        } 
       }; 
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)); 

폴링 설정 방법입니다.

consumer.Subscribe(topics); 

while (!SubscriberCancellationToken.IsCancellationRequested) 
{ 
     consumer.Poll(TimeSpan.FromMilliseconds(1000)); 
} 

가 여기에 메시지를 처리하는 메시지가 성공적으로 처리 할 수있는 경우

consumer.OnMessage += (sender, message) => { 
    try 
    { 
    var payload = GetPayload(message); 
    if (_messageHandlerService.ProcessMessage(payload)) 
    {     
     consumer.CommitAsync(message).Wait(SubscriberCancellationToken); 
    } 
    else 
    { 
     //(case 1) Now what should I do????? 
    } 
    } 
    catch(Exception ex) 
    { 
     Log.Error("Unable to process xyz messages", ex); 
     throw; //??? (Case 2) should I throw the exception or should I not? 
    } 
}; 

에 대한 책임의 onMessage 이벤트, 난 commitAsync를 호출하고 그 매력처럼 작동합니다. 이제는 메시지를 처리 ​​할 수 ​​없을 때 (케이스 1) 또는 예외가 발생했을 때 (케이스 2) 제 질문은 무엇을해야합니까입니다. 이 두 가지 상황을 처리하기 위해 어떤 선택을해야합니까?

는 JMS의 세계에 대한 (사례 1), I는 지연 재 게시 전략을 적용했다. 기본적으로 1 분 동안 기다린 다음 동일한 주제로 메시지를 다시 게시하고 현재 메시지를 커밋하므로 다시 게시 된 메시지가 다시 표시됩니다. 어떤 이유로 재 게시 할 수 없으면 프로세스가 다시 시작될 때까지 또는 프로세스가 다시 시작될 때까지 계속 재 시도합니다. 다시 게시하기 전에 프로세스가 다시 시작되면 커밋되지 않은 메시지가 다시 돌아와서 다시 시작됩니다.

일단 다시 게시가 성공하면 이미 토픽에서 기다리고있는 다른 메시지가 있으면 이제부터까지 이동을 시작합니다. 기차는 모든 것이 처리 될 때까지 원을 그리며 나아 간다. 사례 1에서와 같이 메시지를 처리 ​​할 수 ​​없을 때마다 오류를 기록하고이를 기반으로 적절한 경고가 생성되므로 레거시 시스템에서 일부 데이터를 수정해야 할 경우에 대비하여 애플리케이션 지원 팀이 조치를 취할 수 있습니다. 그 때까지는 메시지가 계속 실패하고 다시 게시됩니다.

그리고 케이스 2

, 나는 세부 사항을 로그인하고 JMS 구현에서 예외를 throw했다. 지금 나는 사건 1처럼 처리 해야할지 궁금합니다.

이제 내 질문은 어떻게 카프카의 세계에서이 두 상황을 처리해야합니까? 더 좋은 옵션이 있습니까? 당신의 소비자에서

답변

1

몇 가지 옵션

하나는 그것으로 전용 소비자 거래를 다른 카프카 항목에 이러한 메시지를 밀어 수 있도록하는 것입니다, 또는,

시도는 특정 메시지가 처리됩니다 때까지 또는 특정 도달 문지방. 이것은 일시적으로 메시지 소비를 멈추기 위해 소비자의 pause 메소드를 호출하여 수행 할 수 있습니다 (Java 소비자 관점에서 이야기하고 있으며 pls는 C# 클라이언트와 확인) 재시도 논리를 실행합니다. 또한 설문 조사를 계속 호출해야합니다 (그렇지 않으면 소비자가 그룹에서 퇴장하여 파티션이 다시 균형을 잡을 것입니다).재 시도가 끝나면 재개 메서드를 호출하여 처리를 계속합니다. 이제 폴 메서드가 카프카에서 레코드를 반환합니다.

+0

감사합니다. 일시 중지 및 다시 시작 기능을 사용해 보겠습니다. 내가 C# 클라이언트 – Vinod

+0

지연된 응답을 위해 죄송합니다 결과를 게시 할 것입니다. Confluent.Kafka C# 클라이언트에는 아직 일시 중지 및 다시 시작 방법이 없습니다. – Vinod