0

나는 데비지 변경 데이터 캡처로 mysql에서 데이터를 캡처하고 kafka connect jdbc sink를 사용하여 다른 MySQL로 데이터를 소비하는 문제가있다.어떻게 debezium 변경 데이터 캡처와 MySQL에서 데이터를 캡처하고 kafka에 jdbc 싱크대와 연결 소비?

deffzium이 kafka 주제로 생성하는 스키마 및 페이로드가 kafka connect jdbc sink가 기대하는 스키마와 호환되지 않기 때문에.

jdbc 싱크가 다른 MySQL에서 데이터를 소비하고 레코드를 생성하려고 할 때 예외가 발생합니다.

이 문제는 어떻게 해결해야합니까?

답변

5

메시지 구조가 Debezium 인 경우 실제로는 JDBC 싱크에서 예상 한 것과 다릅니다. JDBC 싱크는 메시지의 각 필드가 행의 필드와 일치 할 것으로 예상하므로 메시지는 행의 "이후"상태에 해당합니다. OTOH의 경우 Debezium MySQL connector은 변경 데이터 캡처를 수행합니다. 이는 단순히 행의 최신 상태를 포함하는 것 이상의 의미를 가짐을 의미합니다. 즉, 커넥터는 행의 기본 또는 고유 키 열을 포함하는 키 메시지를 출력하고, 메시지의 값을 갖는 엔벌 로프 구조를 포함 : 등이 삽입, 업데이트, 또는 삭제 여부로서

  • 작동
  • 변화는 변화가 서버의 메타 데이터를 포함 (삭제에 NULL)
  • 소스 별 정보를 발생 후 (삽입에 NULL) 행
  • 상태를 발생 전에 행 상태 트랜잭션 ID, 다 이벤트가 발견 된 약 tabase 테이블 명, 이벤트가 발생한 서버 소인 및 세부 커넥터 이벤트

이 차이를 해결하기위한 간단한 방법을 생성하는 등

  • 타임 스탬프 사용하는 Kafka 0.10.2.x (현재 최신 버전은 0.10.2.1)와 Kafka Connect의 새로운 Single Message Transforms (SMTs)입니다. 각 Kafka Connect 커넥터는 메시지가 Kafka에 쓰여지기 전에 원본 커넥터의 출력을 변환하거나 Kafka에서 읽은 메시지를 싱크 커넥터에 입력으로 전달하기 전에 변환 할 수있는 0 이상의 SMT 체인으로 구성 할 수 있습니다. SMT는 의도적으로 매우 단순하고 단일 메시지를 처리하며 외부 리소스에 액세스하거나 상태를 유지하면 안되므로 훨씬 더 강력한 Kafka Streams 또는 기타 스트림 처리 시스템을 대체 할 수는 없으며 여러 입력 스트림에 참여할 수 있습니다. 매우 복잡한 작업을 수행하고 여러 메시지에서 상태를 유지 관리합니다.

    Kafka Streams를 사용하여 모든 종류의 처리를 수행하는 경우 Kafka Streams 응용 프로그램에서 메시지 구조를 조작하는 것을 고려해야합니다. 그렇지 않다면 SMT는 문제를 해결할 수있는 좋은 방법입니다. 사실 SMT를 사용하여 메시지 구조를 조정하는 두 가지 방법이 있습니다.

    첫 번째 옵션은 Debezium 커넥터가있는 SMT를 사용하여 행의 "애프터"상태를 추출/유지하고 Kafka에 쓰여지기 전에 다른 모든 정보를 버리는 것입니다. 물론 카프카 (Kafka) 주제에 적은 정보를 저장하고 미래에 귀중한 CDC 정보를 버리는 것이 좋습니다.

    두 번째 및 IMO 기본 옵션은 원본 커넥터를 그대로두고 Kafka 항목의 모든 CDC 메시지를 유지하지만 싱크 커넥터와 함께 SMT를 사용하여 "after" 메시지를 JDBC 싱크 커넥터에 전달하기 전에 행의 상태를 확인하고 다른 모든 정보를 버립니다. Kafka Connect에 포함 된 기존 SMT 중 하나를 사용할 수는 있지만 원하는대로 정확히 SMT를 작성하는 것이 좋습니다.

  • +0

    대단한 답변은 Randall에게 감사드립니다. –