그것은 최선의 해결책은 아니지만 아무도 대답을 갖고있는 것 같아요 없기 때문에 나는 말 (소비자에게 그룹을 할당하고 대체 ___YOURGROUP____)에 그것을 해결하는 방법, 여기에 있습니다 :
import subprocess
import os
if "KAFKA_HOME" in os.environ:
kafkapath = os.environ["KAFKA_HOME"]
else:
kafkapath = oms_cfg.kafka_home
# error("Please set up $KAFKA_HOME environment variable")
# exit(-1)
instances = []
# cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server {} --list'.format(oms_cfg.bootstrap_servers)
# result = subprocess.run(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
igr = ____YOURGROUP_____ # or run over all groups from the commented out command
print("Checking topics of consumer group {}".format(igr))
topic_cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server ' + oms_cfg.bootstrap_servers + ' --describe --group {gr}'
result = subprocess.run(topic_cmd.format(gr=igr).split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
table = result.stdout.split(b'\n')
# You could add a loop over topic here
for iline in table[1:]:
iline = iline.split()
if not len(iline):
continue
topic = iline[0]
# we could check here for the topic. multiple consumers in same group -> only one will connect to each topic
# if topic != oms_cfg.topic_in:
# continue
client = iline[-1]
instances.append(tuple([client, topic]))
# print("Client {} Topic {} is fine".format(client, topic))
if len(instances):
error("Cannot start. There are currently {} instances running. Client/topic {}".format(len(instances),
instances))
exit(-1)
감사합니다. 목록이 비어 있습니다. –
목록이 비어 있습니까? 소비자 또는 소비자 그룹? @ 콜린 -SBI – shakeel