0

다음은 카프카 제작자를위한 파이썬 코드입니다. 카프카 브로커에 게시 할 수있는 메시지인지 확실하지 않습니다. 소비자 측은 어떤 메시지도받지 못했기 때문입니다. 내 소비자 Python 프로그램은 제작자 콘솔 명령을 사용하여 테스트하는 동안 잘 작동합니다.카프카 소비자가 생산자로부터 아무런 메시지도받지 못했습니다.

from __future__ import print_function 

import sys 
from pyspark import SparkContext 
from kafka import KafkaClient, SimpleProducer 

if __name__ == "__main__": 

if len(sys.argv) != 2: 
    print("Usage:spark-submit producer1.py <input file>", file=sys.stderr) 
    exit(-1) 

sc = SparkContext(appName="PythonRegression") 

def sendkafka(messages): 
    ## Set broker port 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka, async=True, batch_send_every_n=5, 
batch_send_every_t=10) 
    send_counts = 0 
    for message in messages: 
     try: 
      print(message) 
      ## Set topic name and push messages to the Kafka Broker 
      yield producer.send_messages('test', message.encode('utf-8')) 
     except Exception, e: 
      print("Error: %s" % str(e)) 
     else: 
      send_counts += 1 
    print("The count of prediction results which were sent IN THIS PARTITION 
is %d.\n" % send_counts) 

## Connect and read the file.  
rawData = sc.textFile(sys.argv[1]) 

## Find and skip the first row 
dataHeader = rawData.first() 
data = rawData.filter(lambda x: x != dataHeader) 

## Collect the RDDs. 
sentRDD = data.mapPartitions(sendkafka) 
sentRDD.collect() 

## Stop file connection 
sc.stop() 

from __future__ import print_function 
import sys 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

if len(sys.argv) < 3: 
print ("Program to pulls the messages from kafka brokers.") 
print("Usage: consume.py <zk> <topic>", file=sys.stderr) 

else: 
## Flow 
## Loads settings from system properties, for launching of spark-submit. 
sc = SparkContext(appName="PythonStreamingKafkaWordCount") 

## Create a StreamingContext using an existing SparkContext. 
ssc = StreamingContext(sc, 10) 

## Get everything after the python script name 
zkQuorum, topic = sys.argv[1:] 

## Create an input stream that pulls messages from Kafka Brokers. 
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1}) 

## 
lines = kvs.map(lambda x: x[1]) 

## Print the messages pulled from Kakfa Brokers 
lines.pprint() 

## Save the pulled messages as file 
## lines.saveAsTextFiles("OutputA") 

## Start receiving data and processing it 
ssc.start() 

## Allows the current process to wait for the termination of the context 
ssc.awaitTermination() 

답변

0

코딩 내 "소비자"파이썬이 나는 보통 당신이 생산 시도 항목에서 소비 카프카 콘솔 소비자 (아파치 카프카의 일부)를 사용하여 이러한 문제를 디버깅 할 수 있습니다. 콘솔 사용자가 메시지를 받으면 Kafka에 도착했음을 알게됩니다.

처음으로 제작자를 실행하고 마쳤 으면 소비자를 시작하면 소비자가 로그 끝에서 시작하여 추가 메시지를 기다리는 것이 문제 일 수 있습니다. 소비자를 먼저 시작하거나 처음 시작할 때 자동으로 시작하도록 구성하십시오 (미안하지만, 파이썬 클라이언트로 그렇게하는 방법을 모르는 경우). 그들이 생산 요청에 따라 증가하는 경우

+0

자신의 노트북과 함께 시도 전에 작동하지 않습니다 하지만 다른 서버에서 그 작업을 시도했습니다 어쨌든, 고마워요 –

0

당신은 항목의 메시지 수를 확인할 수 있습니다

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ 
--broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \ 
--time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' 

메시지의 수가 증가하는 경우, 그것은 생산자가 잘 작동을 의미합니다.

0

괜찮아요. 내 사육사 나 카프카에게 문제가 있다고 생각합니다. 다른 서버에서 테스트했기 때문에 완벽하게 작동합니다. 그러나, 나를 회신하는 사람들을위한 감사합니다;)