2017-05-15 21 views
0

우분투 서버에서 Divolte Collector를 설정하여 웹 사이트에서 클릭 스트림 데이터를 수집합니다. 데이터는 divolte-data라는 Kafka 채널에 기록됩니다. 그럼 난 druid.io (스파크를 읽을 수있는)를 포함하여 일반적인 데이터베이스에 여러 커넥터가 에어 비앤비 상위 집합으로 데이터를 시각화하고 싶은Druid-Tranquility (수퍼 셋)를 사용하여 신분 데이터 카프카 채널을 읽는 방법은 무엇입니까?

V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer 
LinuxCanonical Ltd. 

: 카프카 소비자를 설정하여 나는 데이터가오고 볼 수 있습니다.

Divolte가 비정형 방식으로 카프카에 데이터를 저장하고있는 것으로 보입니다. 하지만 분명히 구조화 된 방식으로 데이터를 매핑 할 수 있습니다. 입력 데이터가 JSON으로 구조화되어야 하는가?

그리고 Druid-Tranquility에서 읽는 데이터는 카톨릭 채널 데이터에서받은 데이터입니까? conf 예제에서 채널 이름을 변경하려고 시도했지만이 소비자는 제로 메시지를받습니다.

답변

0

내가 발견 한 해결책은 예를 들어 Kafka Python 라이브러리 나 Confluent Kafka Python을 사용하여 Python에서 Kafka 메시지를 처리 ​​한 다음 Avro 판독기로 메시지를 디코딩 할 것입니다.

편집 : 여기에 내가 그것을 어떻게했는지에 대한 업데이트는 다음과 같습니다

내가 아 브로 라이브러리 브로 파일을 읽을 단지라고 생각하지만, 실제로는 다음과 디코딩 카프카 메시지의 문제를 해결 : 내가 먼저 라이브러리를 가져 스키마 파일을 매개 변수로 제공 한 다음 소비자 루프에서 사용할 수있는 사전에 메시지를 디코딩하는 함수를 만듭니다.

from confluent_kafka import Consumer, KafkaError 
from avro.io import DatumReader, BinaryDecoder 
import avro.schema 

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read()) 
reader = DatumReader(schema) 

def decode(msg_value): 
    message_bytes = io.BytesIO(msg_value) 
    decoder = BinaryDecoder(message_bytes) 
    event_dict = reader.read(decoder) 
    return event_dict 

c = Consumer() 
c.subscribe(topic) 
running = True 
while running: 
    msg = c.poll() 
    if not msg.error(): 
     msg_value = msg.value() 
     event_dict = decode(msg_value) 
     print(event_dict) 
    elif msg.error().code() != KafkaError._PARTITION_EOF: 
     print(msg.error()) 
     running = False