2
파이썬 (2.7) 다중 처리를 사용하여 kafka-python (1.3.5) KafkaProducer를 사용하여 데이터를 Kafka 대기열로 푸시합니다.파이썬 다중 처리를 사용하여 데이터가 Kafka 대기열에 푸시되지 않음
from kafka import KafkaProducer
import multiprocessing
# other imports
class TestClass(object):
def __init__(self, producer):
self.kafka_producer = producer
def main(self, conf, nthreads):
try:
for i in range(nthreads):
logger.info("Starting process number = %d " % (i + 1))
p = Process(target=self.do_some_task, args=(conf, 2))
p.start()
processes.append(p)
for p in processes:
logger.info("Joining process")
p.join()
except Exception, ex:
logger.error("Exception occurred : %s" % str(ex))
def do_some_task(conf, retry):
# some task happening
self.record(arg1, arg2)
# pushing to kafka
def record(self, arg1, arg2)
message = json.dumps({"a": "arg1", "b": "arg2"})
self.kafka_producer.send(KAFKA_TOPIC, message)
if __name__ == '__main__':
kafka_producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
request_timeout_ms=60000,
retries=2)
obj = TestClass(kafka_producer)
try:
parser = argparse.ArgumentParser(description='Description')
parser.add_argument('-threads', type=int, default=1) # 20 threads
parser.add_argument('-debug', type=int, default=0)
args = parser.parse_args()
me = SingleInstance(args.src)
TestClass.main(CONF[args.src], args.threads)
20 스레드가 내부에서 생성되어 kafka에 쓰여집니다. 나는 로그를보고 프로세스가 카프카로 작성된 메시지를 기다리는 것을 알았지 만 결국 카프카에게 편지를 쓰지 않고 계속 진행합니다. 예외는 없습니다.
파이썬 명령 줄에서 스레드없이 동일한 코드를 실행하면 모든 것이 예상대로 작동했습니다. 무엇이 문제 일 수 있습니다.