2

파이썬 (2.7) 다중 처리를 사용하여 kafka-python (1.3.5) KafkaProducer를 사용하여 데이터를 Kafka 대기열로 푸시합니다.파이썬 다중 처리를 사용하여 데이터가 Kafka 대기열에 푸시되지 않음

from kafka import KafkaProducer 
import multiprocessing 
# other imports 


class TestClass(object): 
    def __init__(self, producer): 
     self.kafka_producer = producer 

    def main(self, conf, nthreads): 
     try: 
      for i in range(nthreads): 
       logger.info("Starting process number = %d " % (i + 1)) 
       p = Process(target=self.do_some_task, args=(conf, 2)) 
       p.start() 
       processes.append(p) 
      for p in processes: 
       logger.info("Joining process") 
      p.join() 
     except Exception, ex: 
      logger.error("Exception occurred : %s" % str(ex)) 

    def do_some_task(conf, retry): 
     # some task happening 
     self.record(arg1, arg2) 

    # pushing to kafka 
    def record(self, arg1, arg2) 
     message = json.dumps({"a": "arg1", "b": "arg2"}) 
     self.kafka_producer.send(KAFKA_TOPIC, message) 


if __name__ == '__main__': 
    kafka_producer = KafkaProducer(
     bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, 
     request_timeout_ms=60000, 
     retries=2) 
    obj = TestClass(kafka_producer) 

    try: 
     parser = argparse.ArgumentParser(description='Description') 
     parser.add_argument('-threads', type=int, default=1) # 20 threads 
     parser.add_argument('-debug', type=int, default=0) 
     args = parser.parse_args() 
     me = SingleInstance(args.src) 
     TestClass.main(CONF[args.src], args.threads) 

20 스레드가 내부에서 생성되어 kafka에 쓰여집니다. 나는 로그를보고 프로세스가 카프카로 작성된 메시지를 기다리는 것을 알았지 만 결국 카프카에게 편지를 쓰지 않고 계속 진행합니다. 예외는 없습니다.

파이썬 명령 줄에서 스레드없이 동일한 코드를 실행하면 모든 것이 예상대로 작동했습니다. 무엇이 문제 일 수 있습니다.

답변

0

프로세스를 포크 한 후에 kafka와의 연결을 생성하십시오. 연결을 닫고 연결 관련 오류가 발생하면 다시 연결하십시오.