2017-04-07 7 views
2

나는 카프카를 통해 매우 간단한 JSON 객체를 보내고 파이썬과 카프카 - 파이썬을 사용하여 반대편에서 읽으려고한다. 나는 몇 가지 조사를 수행 한이 오류의 가장 일반적인 원인은 JSON은 잘못된 것입니다카프카에서 JSON 메시지를 소비 할 수 없다. 카프카 - 파이썬의 디시리얼라이저 사용하기

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback 
Traceback (most recent call last): 
    File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs 
    f(value) 
    File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response 
    unpacked = list(self._unpack_message_set(tp, messages)) 
    File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set 
    tp.topic, msg.value) 
    File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize 
    return f(bytes_) 
    File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda> 
    value_deserializer=lambda m: json.loads(m).decode('utf-8')) 
    File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads 
    return _default_decoder.decode(s) 
    File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode 
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 
    File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode 
    raise ValueError("No JSON object could be decoded") 
ValueError: No JSON object could be decoded 

: 그러나, 나는 다음과 같은 오류를보고 계속. JSON을 인쇄하기 전에 코드에 다음을 추가하고 JSON을 오류없이 인쇄 해 보았습니다.

while True: 
     json_obj1 = json.dumps({"dataObjectID": "test1"}) 
     print json_obj1 
     producer.send('my-topic', {"dataObjectID": "test1"}) 
     producer.send('my-topic', {"dataObjectID": "test2"}) 
     time.sleep(1) 

이것은 내가 json을 생산할 수는 있지만 소비하지 않을 것이라고 생각하게합니다. 나는 성공적으로 보내고 나는 value_serializer 및 value_deserializer을 제거하는 경우 문자열을받을 수 있습니다

import threading 
import logging 
import time 
import json 

from kafka import KafkaConsumer, KafkaProducer 


class Producer(threading.Thread): 
    daemon = True 

    def run(self): 
     producer = KafkaProducer(bootstrap_servers='localhost:9092', 
           value_serializer=lambda v: json.dumps(v).encode('utf-8')) 

     while True: 
      producer.send('my-topic', {"dataObjectID": "test1"}) 
      producer.send('my-topic', {"dataObjectID": "test2"}) 
      time.sleep(1) 


class Consumer(threading.Thread): 
    daemon = True 

    def run(self): 
     consumer = KafkaConsumer(bootstrap_servers='localhost:9092', 
           auto_offset_reset='earliest', 
           value_deserializer=lambda m: json.loads(m).decode('utf-8')) 
     consumer.subscribe(['my-topic']) 

     for message in consumer: 
      print (message) 


def main(): 
    threads = [ 
     Producer(), 
     Consumer() 
    ] 

    for t in threads: 
     t.start() 

    time.sleep(10) 

if __name__ == "__main__": 
    logging.basicConfig(
     format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' + 
       '%(levelname)s:%(process)d:%(message)s', 
     level=logging.INFO 
    ) 
    main() 

:

여기 내 코드입니다. . 내가 그 코드를 실행하면 나는 여기에 짧은 snipit이다 나는에 전송하고있는 JSON을 볼 수

ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25) 
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25) 
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4) 
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17) 
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4) 
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17) 

그래서 내가 소비자로부터 value_deserializer을 제거하는 노력이, 그 코드는 메시지가 나오는 디시리얼라이저하지 않고 있지만 실행 String이 필요합니다. 그렇다면 왜 value_deserializer가 작동하지 않습니까? 내가 사용해야하는 Kafka 메시지에서 JSON을 가져 오는 다른 방법이 있습니까?

답변

1

value_deserializer=lambda m: json.loads(m).decode('utf-8')의 디코드 부분이 바뀌면 value_deserializer=lambda m: json.loads(m)으로 바뀌면 Kafka에서 읽은 객체의 유형이 현재 사전으로 표시됩니다. 파이썬의 JSON 문서에서 다음과 같은 정보를 기반으로 어느 것이 올바른 :

|---------------------|------------------| 
|  JSON   |  Python  | 
|---------------------|------------------| 
|  object   |  dict  | 
|---------------------|------------------| 
|  array   |  list  | 
|---------------------|------------------| 
|  string   |  unicode  | 
|---------------------|------------------| 
|  number (int) |  int, long | 
|---------------------|------------------| 
|  number (real) |  float  | 
|---------------------|------------------| 
|  true   |  True  | 
|---------------------|------------------| 
|  false   |  False  | 
|---------------------|------------------| 
|  null   |  None  | 
|---------------------|------------------| 
4

내 문제는 UTF-8로 먼저 메시지를 디코딩 한 후 해결 한 후, json.load/그것을 덤프 :

value_deserializer=lambda m: json.loads(m.decode('utf-8')) 

대신 :

value_deserializer=lambda m: json.loads(m).decode('utf-8') 

희망이 또한 생산자의 측면

을 위해 작동합니다