0

유스 케이스 달성을 위해 카프카 스트림을 사용하려고합니다. MySQL에는 사용자 및 계정이라는 두 개의 테이블이 있습니다. 그리고 MySQL에서 Kafka MySQL 커넥터를 사용하여 Kafka로 이벤트를 가져 왔습니다.Apache Kafka를 사용하여 MySQL 테이블 쿼리하기

카프카 자체에서 계정 내의 모든 사용자 ID를 가져와야합니다. 그래서 MySQL 출력 항목에 KStream을 사용하여 출력을 구성하고이를 처리하여 account-id로 Key 키를, 쉼표 (,)로 구분 된 userIds 값으로 항목을 게시합니다. 는 그럼 난 ReadOnlyKeyValueStore 클래스의 의 get() 방법, 계정 ID를 사용하는 모든 사용자 ID를 얻을 수 대화 형 쿼리를 사용할 수 있습니다. 이렇게하는 것이 올바른 방법입니까? 더 좋은 방법이 있습니까? 여기에서 KSQL을 사용할 수 있습니까?

+0

당신이 당신이 데이터와 함께 할 싶은 것입니다 것을 요약 할 수 있나요? MySQL에 두 개의 테이블이 있다는 것을 알고 있고, 카프카로 가져오고 싶습니다. 거기에서 그들과 함께 무엇을할까요? 즉 요구 사항을 설명 할 수 있습니까? –

+0

실시간 스트리밍 응용 프로그램에서이 데이터가 필요한 카프카 스트림을 작성했습니다. HTTP 호출을하거나 MySql에서 데이터를 검색하는 것은 시간이 오래 걸릴 것입니다. 따라서 카프카 자체의 모든 데이터를 쿼리 가능한 형식으로 저장하는 것이 좋습니다. – Adarshlal

+1

KSQL에서 "대화식 쿼리"를 사용할 수 없습니다 atm. –

답변

3

Kafka Connect를 사용하여 MySQL에서 데이터를 스트리밍 할 수 있습니다 (예 : Debezium을 사용하십시오. 여기에서 KStreams 또는 KSQL을 사용하여 데이터를 변형 할 수 있습니다. 여기에서 수행하려는 것으로 생각되는 키 다시 지정은 물론 다른 스트림에 참여할 수도 있습니다. MySQL의 데이터를 로그 압축이 설정된 항목으로 처리하면 항목의 모든 키에 대해 항상 최신 값을 유지할 수 있습니다.

+0

유스 케이스를 만들기 위해 Kstreams를 사용하려고했습니다. 여기서 변환 출력은 키가 계정 ID이고 사용자 ID가 쉼표로 구분 된 키 - 값 쌍이어야합니다. KStream이 debezium 사용자 테이블 항목을 사용 중입니다. 문제는 각 insert 이벤트의 기존 문자열에 새 userId를 추가해야한다는 것입니다. 기존 문자열을 가져 오기 위해 출력 항목에 액세스해야합니다. 'stream.map()'메서드 내부에서 출력 항목에 액세스하기 위해 상태 저장소를 사용해야합니까? – Adarshlal