2017-12-21 18 views
1

새로운 소비자 (비단뱀 소비자) 목록이 있습니다. 나는이 명령으로 그룹을 검색 할 수 있습니다특정 카프카 주제에 연결된 (new-) 소비자를 보는 방법

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list 
각각 그들이

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group TheFoundGroupId 
심지어 모든 그룹 (바람직하게는 모든 소비자를 얻을 수 있습니다 어떻게
  1. 에 연결되어있는 주제를 내가 얻을 수

    그룹에 없을 때) 주제와 연결되어 있습니까?

  2. 쉘 명령으로 실행하는 것 외에 파이썬에서 액세스 할 수있는 방법이 있습니까?

답변

0

그것은 최선의 해결책은 아니지만 아무도 대답을 갖고있는 것 같아요 없기 때문에 나는 말 (소비자에게 그룹을 할당하고 대체 ___YOURGROUP____)에 그것을 해결하는 방법, 여기에 있습니다 :

import subprocess 
    import os 
    if "KAFKA_HOME" in os.environ: 
     kafkapath = os.environ["KAFKA_HOME"] 
    else: 
     kafkapath = oms_cfg.kafka_home 
     # error("Please set up $KAFKA_HOME environment variable") 
     # exit(-1) 

    instances = [] 
    # cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server {} --list'.format(oms_cfg.bootstrap_servers) 
    # result = subprocess.run(cmd.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    igr = ____YOURGROUP_____ # or run over all groups from the commented out command 
    print("Checking topics of consumer group {}".format(igr)) 
    topic_cmd = kafkapath + '/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server ' + oms_cfg.bootstrap_servers + ' --describe --group {gr}' 
    result = subprocess.run(topic_cmd.format(gr=igr).split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    table = result.stdout.split(b'\n') 
    # You could add a loop over topic here 
    for iline in table[1:]: 
     iline = iline.split() 
     if not len(iline): 
      continue 
     topic = iline[0] 
     # we could check here for the topic. multiple consumers in same group -> only one will connect to each topic 
     # if topic != oms_cfg.topic_in: 
     #  continue 
     client = iline[-1] 
     instances.append(tuple([client, topic])) 
     # print("Client {} Topic {} is fine".format(client, topic)) 
    if len(instances): 
     error("Cannot start. There are currently {} instances running. Client/topic {}".format(len(instances), 
                            instances)) 
     exit(-1) 
1

감사에 대한 이 질문을합니다.

소비자가 어떤 주제에 등록되어 있는지 소비자 그룹 ID와 같은 모든 소비자 구성은 사육사에 저장됩니다. 2181

다음

LS를 실행/소비자

: 명령 아래

실행 연결

./bin/zookeeper-shell 로컬 호스트 사육사에 도착

귀하는 모든 소비자 그룹을 현재. 소비자 그룹도 제공하지 않는 경우. Kafka는 무작위 소비자 그룹을 배정합니다. 콘솔 소비자는

당신은 파이썬 코드 조각 아래에서 모든 소비자 그룹을 얻을 수

zookeeper python client

from kazoo.client import KazooClient 

zk = KazooClient(hosts='127.0.0.1:2181') 
zk.start() 


# get all consumer groups 
consumer_groups = zk.get_children("/consumers") 
print("There are %s consumer group(s) with names %s" % (len(consumer_groups), consumer_groups)) 

# get all consumers in group 
for consumer_group in consumer_groups: 
    consumers = zk.get_children("/consumers/"+consumer_group) 
    print("There are %s consumers in %s consumer group. consumer are : %s" % (len(consumers), consumer_group, consumers)) 

소비자 또는 연결되어 소비자 단체를 얻으려면 설치 콘솔 소비자-XXXXX ID를 할당합니다 이야기.

GET은/소비자/consumergroup_id/IDS/consumer_id/

소비자가 가입 한 모든 항목을 객체 가입에 따라

{"version":1,"subscription":{"test":1},"pattern":"white_list","timestamp":"1514218381246"} 

처럼 출력을 제공 할 것입니다. 귀하의 유스 케이스에 따라 로직을 구현하십시오.

+0

감사합니다. 목록이 비어 있습니다. –

+0

목록이 비어 있습니까? 소비자 또는 소비자 그룹? @ 콜린 -SBI – shakeel