나는 카프카에게 데이터를 보내고 데이터베이스에 쓸 스프링 부트 아래 카프카 스프링 프로듀서를 만들고있다. 나는 그 모든 일을 하나의 거래로 원한다. 나는 카프카에 익숙하지 않고 봄에 대한 전문가도없고 어려움을 겪고 있습니다. 모든 포인터가 많이 감사합니다.스프링 카프카 (Spring Kafka) 글로벌 거래 ID는 프로그램이 끝난 후 계속 열어 둔다.
지금까지 나의 코드는 Kafka에 성공적으로 루프를 작성합니다. 나는 아직 DB 을 설정하지 않은,하지만 구성에서 producerFactory에 transactionIdPrefix을 추가하여 글로벌 transactioning을 설정하기 위해 진행 한 :
producerFactory.setTransactionIdPrefix("MY_SERVER");
을하고 카프카 보내 수행하는 방법에 @Transactional을 추가했다. 결국 같은 방법으로 DB 작업을 수행 할 계획입니다.
문제 : 처음에는 코드가 크게 실행됩니다. 그러나 프로그램을 멈추게해도 깔끔하게 코드가 @Transactional 메서드로 들어가 자마자 두 번째로 실행됩니다. @Transactional을 주석 처리하면 메소드에 들어가지만 kafa 템플릿 send()에 멈 춥니 다.
문제는 트랜잭션 ID 인 것 같습니다. 접두어를 변경하고 다시 실행하면 프로그램이 처음에는 잘 실행되지만 새 접두사를 선택할 때까지 다시 실행할 때 멈 춥니 다. 다시 시작한 후 트랜스 ID 카운터는 0에서 시작하기 때문에 트랜스 ID 접두어가 변경되지 않으면 다시 시작할 때 동일한 트랜스 ID가 사용됩니다.
원래 transID는 서버에서 여전히 열려 있고 결코 커밋되지 않은 것으로 보입니다. (콘솔 소비자를 사용하여 주제에서 데이터를 읽을 수는 있지만 커밋되지는 않습니다.) 그러나 그렇다면 어떻게 봄에 트랜스를 저주 할 수 있습니까? 나는 나의 구성이 잘못되었다고 생각하고 있습니다. 아니면 트랜스 ID가 결코 재사용 될 수없는 문제인가? (이 경우 어떻게 해결할 수 있습니까?)
다음은 관련 코드입니다. 구성은 다음과 같습니다
@SpringBootApplication
public class MYApplication {
@Autowired
private static ChangeSweeper changeSweeper;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory=new DefaultKafkaProducerFactory<>(configProps);
producerFactory.setTransactionIdPrefix("MY_SERVER");
return producerFactory;
}
@Bean
public KafkaTransactionManager<String, String> KafkaTransactionManager() {
return new KafkaTransactionManager<String, String>((producerFactory()));
}
@Bean(name="kafkaProducerTemplate")
public KafkaTemplate<String, String> kafkaProducerTemplate() {
return new KafkaTemplate<>(producerFactory());
}
그리고 트랜잭션을 수행하는 방법은 다음과 같습니다
@Transactional
public void send(final List<Record> records) {
logger.debug("sending {} records; batchSize={}; topic={}", records.size(),batchSize, kafkaTopic);
// Divide the record set into batches of size batchSize and send each batch with a kafka transaction:
for (int batchStartIndex = 0; batchStartIndex < records.size(); batchStartIndex += batchSize) {
int batchEndIndex=Math.min(records.size()-1, batchStartIndex+batchSize-1);
List<Record> nextBatch = records.subList(batchStartIndex, batchEndIndex);
logger.debug("## batch is from " + batchStartIndex + " to " + batchEndIndex);
for (Record record : nextBatch) {
kafkaProducerTemplate.send(kafkaTopic, record.getKey().toString(), record.getData().toString());
logger.debug("Sending> " + record);
}
// I will put the DB writes here
}
>'문제가 아마도 트랜스 ID 년대 reused'되지 않을 수 있다는 것을 필요로하기 때문에 - 우리가 initTransactions()''호출하기 때문에 문제가되지해야한다 - 그것의 javadoc '이전의 인턴 진행중인 거래로 실패했다면 취소 될 것입니다. "- 귀하의 문제를 재현 할 수 있는지 알게 될 것입니다. –
클러스터에 3 개의 브로커가 있습니까 (기본 요구 사항)? 하나의 브로커 만 가지고'initTransactions()'에 대한 응답을 얻었고 서버 로그에서 트랜잭션 상태 주제에 대해'살아있는 브로커의 수 '1이 필수 복제 인수'3 '을 충족시키지 못합니다 (트랜잭션 .state.log.replication.factor ')'-하지만 첫 번째 전송은 작동하지 않습니다. –
저는 브로커가 하나뿐입니다. 단지 로컬에서 개발 중입니다. 당신이 지적했듯이, send는 프로그램을 죽이고 재시작 할 때까지 일괄 적으로 일괄 적으로 작동합니다. 그래서 발송은 괜찮은 것 같지만 거래가 문제인 것처럼 보입니다. 내가 다른 문제를 발견 한 Wndows에서 지금 실행 중임에 유의하십시오. – hbhow