실험용 카프카 환경을 3 개의 브로커와 3 개의 파티션이있는 주제로 설정했습니다. 제작자와 소비자가 있습니다. 특정 소비자를위한 파티션의 오프셋을 수정하고 싶습니다. kafka 문서에서 소비자 커밋/페치 API 인 kafka가 특정 오프셋을 커밋하거나 소비자가 읽은 최신 오프셋을 가져올 수 있음을 읽었습니다. 나는 특정 소비자에서 오프셋을 가져 오기 위해 내 코드를 작성하기 위해 아래 페이지에서 코드를 사용한kafka 소비자 반입 API가 올바른 오프셋 값을 반환하지 않습니다.
: 여기 API에 대한 링크입니다. 그러나 fetch API는 요청 된 오프셋에 대해 "-1"값을 반환합니다.
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
나는 또한 첫 번째 링크에서 읽은 어떤 브로커가 (오류 코드를 설정하지 않는 소비자 그룹에서 주제 파티션과 관련이 상쇄되는 경우 는 "이 없기 때문에 것을 : 여기 예제 코드는 실제로 오류가 아님), 빈 메타 데이터를 반환하고 오프셋 필드를 -1로 설정합니다. "
그러나 일부 메시지를 생성 했으므로 소비자가 메시지를 소비하고 각 읽음 메시지에 대한 오프셋을 출력했습니다.
누구든지 도움을받을 수 있다면 정말 감사 할 것입니다. 내 코드의 어느 부분이 잘못된 것인지 알고 싶습니다. 아니면 API에 문제가있을 수 있습니다. 유용한 의견을 말하기를 주저하지 마십시오. 내 코드는 내가 제공 한 링크의 코드와 정확히 같습니다. 그러나 만약 당신이 내 코드를 볼 필요가 여기에 넣어 말해.
중개인 : 1 : 포트 9093
브로커 2 : 포트 9094
브로커 3 : 포트 9095
카프카 버전은 내 카프카의 설정은 0.10.2.0
입니다
주제 : "testpic3"
.......... ............
소비자 구성은 :
props.put("group.id", "test");
props.put("client.id", "MyConsumer");
................ 여기
내 코드입니다 :public class KafkaOffsetManage {
public static void main(String[] args) {
BlockingChannel channel = new BlockingChannel("localhost", 9095,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "test";
final String MY_CLIENTID = "MyConsumer";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
System.out.println("+++++++++++++++++++++++++++");
System.out.println(metadataResponse.errorCode());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
System.out.println("Connected to Offset Manager");
System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port());
} else {
// retry (after backoff)
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
//partitions.add(testPartition1);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
System.out.println(retrievedMetadata);
System.out.println(result.toString());
}
}
catch (Exception e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
}
}
코드의 출력은 여기에 있습니다 :
+++++++++++++++++++++++++++
0
Connected to Offset Manager
user-virtual-machine, Port:9093
------------------------
The retrieved offset is:-1
OffsetMetadataAndError[-1,,3]
Process finished with exit code 0
하나 이상한 것은 내가 Kafka 의존성에 관한 것입니다. 나는이 종속성을 추가 할 때, 내 코드는 프로그램의 일부 클래스를 인식하지 않습니다 "ConsumerMetadataRequest"와 "ConsumerMetadataResponse"는 인식되지 않습니다
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
클래스를.
그래서 내가 대신이 종속성을 추가 :<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
감사합니다,
코드의 관련 부분을 제공하십시오. 포함 된 링크가 어느 시점에서 작동을 멈추고 질문이 쓸모 없게 될 수도 있습니다. –
Patrick 대단히 감사합니다. 여기에 내 코드가있다 – Farhad9660