는 우리의 IoT 플랫폼에서 인바운드 메시지 흐름이다 고객 기반.카프카 생산자 할당량 여기
현재 전략은 각 장치가 로컬로 캐시 된 인스턴스를 가져 오는 Guava의 RateLimiter를 사용합니다. 장치 메시지가 수신되면 해당 deviceId에 매핑 된 RateLimiter가 캐시에서 페치되고 tryAquire()
메서드가 호출됩니다. 허가가 성공적으로 획득되면 튜플은 평소처럼 카프카로 전달되고, 그렇지 않으면 할당량이 초과되고 메시지는 자동으로 삭제됩니다. 이 방법은 다소 번거롭고 어느 시점에서 실패하거나 병목 현상이 될 수 있습니다.
저는 Kafka의 바이트 속도 할당량을 읽었으며 Kafka 클라이언트가 동적으로 구성 될 수 있기 때문에 이것이 우리의 경우 완벽하게 작동한다고 생각합니다. 우리 플랫폼에 가상 장치가 생성되면 client.id == deviceId
인 곳에 새로운 client.id가 추가되어야합니다.
- 관리자가이 가상 장치 생성 : 습도 & 온도 센서를
- 규칙은 위의 장치 을위한 카프카의 새로운 사용자/클라이언트 ID 항목을 작성 해고
- 은 두 디바이스는 인바운드 이벤트 메시지를 방출 카프카 CLI
- 를 통해 자신의 생산 할당량 값을 설정
- ...?
여기 내 질문이 있습니다. 단일 Producer 인스턴스를 사용하는 경우 send()
을 호출하기 전에 ProducerRecord 또는 Producer의 어딘가에 client.id
을 지정할 수 있습니까? 프로듀서에 client.id
이 하나만 허용되면 각 장치마다 고유 한 프로듀서가 있어야합니까? 일대일 매핑 만 허용되는 경우 수천 개가 아닌 수천 개의 Producer 인스턴스 (각 장치에 하나씩)를 잠재적으로 캐시하는 것이 좋습니다. 내가 아직 알지 못하는 더 나은 접근법이 있습니까?
참고 : Google 플랫폼은 '개방형 시스템'으로 클라이언트가 '요금 초과'와 같은 오류 응답 또는 그 문제에 대한 오류를 다시 보내지 않음을 의미합니다. 그것은 최종 사용자에게 모두 투명합니다. 이런 이유로 RabbitMQ의 데이터를 간섭하거나 다른 대기열로 메시지를 다시 라우팅 할 수는 없습니다.이 옵션을 통합하는 유일한 방법은 스톰 또는 카프카 사이에 있습니다.
로 넣을 수 있습니다 _ 당신이에 정교한 수 입력하고 expla "카프카 메시지 헤더는 다음 실제로 데이터를 생산하는 장치를 식별하는 데 사용할 수 있습니다." 그것을 구현하는 방법에? 비록 내가 사용자 단위로 했더라도, 나는 카프카에게 클라이언트 X가 보낸 메시지 X, 클라이언트 Y가 보낸 메시지 Y 등등을 어떻게 알려주는지 알아 내야한다. 모든 것은 * * 싱글 **, 프로듀서 인스턴스 공유. – user2208562