2017-12-22 24 views
0

TLDR : 같은 주제의 다른 사람들보다 먼저 긴급한 메시지를 처리하고 싶습니다.kafka 바인더로 봄철 구름 스트림을 사용하여 메시지의 우선 순위를 지정하는 방법

더 설명 : 나는 몇 가지 작업을 위해 서로간에 이야기 할 수있는 카프카 - 바인더와 스프링 클라우드 스트림에 의존하는 많은 봄 microservices를 사용하여 상당히 큰 응용 프로그램에서 일하고 있어요

.

예약 된 작업으로 인해 많은 이벤트가 발생할 수 있습니다. 이러한 이벤트가 소비되면 중개 이벤트가 생성되어 전송 될 수 있으며 모든 초기 및 중개 메시지가 소비되고 모든 작업이 완료 될 때까지 상당한 시간이 걸릴 수 있습니다.

이 시간 동안 동일한 유형의 이벤트를 수동으로 보내려면 다른 모든 작업이 완료된 경우에만 대기하고 처리됩니다.

다른 모든 이벤트보다 우선적으로 이러한 "수동 이벤트"를 처리 할 수 ​​없는지 확인해야합니다.

일반적으로이 경우 @StreamListener을 복제하여 모든 주제 (예 : topic-normaltopic-urgent)를 복제하는 사용자를 볼 수 있습니다. 이것은 다소 복잡한 옵션으로 보입니다. 왜냐하면 여기에 관련된 많은 주제가 있기 때문입니다. 또한 이것을 고려하여 재 작성해야하는 카프카 - 스트림 집계가 있습니다.

동료 개발자들과 함께 긴급한 메시지가 필요한 각 주제에 대해 소비자 그룹 (consumerGroupNormal, consumerGroupUrgentusing)을 복제하려고 생각했습니다. 그런 다음 각 메시지에 우선 순위 헤더를 추가하고 StreamListener을 복제하고 수신기 중 하나에 priority: urgent 헤더가있는 메시지 만 사용하고 다른 수신기에는 priority:normal 메시지 만 사용합니다.

또한이 "우선 순위"헤더를 자동으로 추가하고 읽는 ChannelInterceptorAdapter를 만들 수도 있습니다 (a bit like Sleuth does it).

그러나 저는 여전히 바인더 작성을 위해 많은 구성을 추가해야하고 리스너를 복제하고 헤더를 검사하고 리스너 코드를 실행하지 않는 작은 코드를 추가해야합니다.

전혀 다른 사람에게 유용 할 수있는 아주 일반적이며 간단한 유스 케이스처럼 보이기 때문에이 작업을 수행하는 것이 쉽지 않은 것처럼 보입니다.

답변

1

각 주제에 대해 두 개의 소비자 그룹을 사용하면 거의 모든 작업을 수행 할 수 있으며 두 개의 수신기가 있습니다.

그러나 사용자 정의 채널 인터셉터가 필요하지는 않습니다. 과 같은 condition 속성을 사용할 수 있습니다.

+0

흠, 나는 ChannelInterceptorAdapter가 자동으로 긴급을 유지하기 위해 후속 메시지를 생성 할 때 헤더를 전파 할 것을 생각하고있었습니다. 이것은 Sleuth가 SpanId 헤더로하는 일입니다. 개발자는 우리가 트레이시드에 신경 쓸 필요가 없기 때문에 필요에 따라 자동으로 주입하고 읽습니다. –

+0

나는 조건 속성을 생각하지 않았지만, 이것은 'StreamListener'를 단순화합니다, 감사합니다! –

+0

스프링 클라우드 스트림의 github에서 이러한 기능을 요청할 때 문제가 될만한 가치가 있다고 생각합니까? –