2016-11-21 3 views
0

consumer.poll() 메서드를 사용하는 동안 문제가 발생합니다. poll() 메서드를 사용하여 데이터를 가져온 다음에는 소비자가 커밋 할 데이터가 없으므로 카프카 항목에서 특정 줄 수를 제거하는 데 도움이됩니다.파이썬을 사용하거나 inbuilt 메서드를 사용하여 kafka 항목에서 특정 줄 수를 삭제하는 방법은 무엇입니까?

+0

질문을 이해할 수 없습니다. 그러나 카프카 주제는 추가 만되며 수동으로 삭제할 수는 없습니다. 데이터가 삭제되는 유일한 방법은 로그 보존 또는 로그 압축을 사용하는 것입니다. –

+0

@Matthias J. Sax에 응답 해 주셔서 감사합니다. 하지만 실제로 내 문제는 consumer.poll()을 사용하는 동안입니다. 특정 양의 데이터를 가져 오지만, 프로그램이 실패하면 새 서버가 와드의 첫 줄에서 읽기 시작하고 "auto commit "이 True이면 한 서버에 오류가 발생하면 데이터가 손실됩니다. – surya

답변

0

소비자가 실패 할 경우를 대비하여 "데이터 손실"을 피하기 위해 커밋하기 전에 데이터가 완전히 처리되었는지 확인해야합니다. 당신이 auto.commit을 사용하는 경우 각 poll() 암시 이전 poll()에서 모든 데이터를 커밋 때문에 다음 poll()을 실행하기 전에

따라서, 완전히 poll() 후 모든 데이터를 처리 있는지 확인합니다.

허용되지 않는 경우 consumer.commit(...)을 통해 데이터를 완전히 처리 한 후에 auto.commit을 사용 중지하고 수동으로 커밋해야합니다. 이를 위해서는 각 메시지를 개별적으로 커밋 할 필요가 없으며 오프셋이 X 인 커밋은 모든 메시지를 암시 적으로 < X 오프셋을 사용하여 커밋합니다 (예 : 오프셋 5 메시지를 처리 ​​한 후 커밋 된 오프셋 마지막으로 성공적으로 처리 된 메시지는 아니지만 처리 할 다음 메시지). 그리고 6 번 오프셋 커밋은 0부터 5까지의 오프셋으로 모든 메시지를 커밋합니다. 따라서 더 작은 오프셋을 가진 모든 메시지가 완전히 처리되기 전에 오프셋 6을 실행해서는 안됩니다.

+0

감사합니다. poll() 메소드가 1000 개의 행을 가져와 자동으로 1000 개의 행을 제거한다고 가정 해 보겠습니다. 그러면 500 번째 행에서 서버가 실패하면 다른 서버가 처음 500 줄을 다시 처리합니다. 하지만 제 상황에서는 100 줄마다 물통을 만드는 것처럼 양동이로 나눠서 버킷을 만든 다음 처리하도록 보냅니다. 그래서이 경우 출력의 데이터가 복제됩니다. – surya

+0

예. Kafka는 최소 한 번만 처리하고 실패한 경우 중복 될 수 있습니다. 아직 정확히 한 번 처리가 없습니다. IIRC, 이것도 여기에서 논의됩니다 : http://docs.confluent.io/current/clients/consumer.html –

+0

Btw : 당신은 아무것도 삭제하지 않습니다 ... 커밋을 한 후에도 데이터가 여전히 카프카에 있습니다. seek()을 해당 오프셋 (들)로 바꾸면 다시 읽을 수 있습니다. –