0

배경는 :kafka에서 elasticsearch로 데이터를 가져올 때 가져 오기 진행률 및 오류 로그를 가져 오는 방법은 무엇입니까?

나는 elasticsearch하는 카프카에서 데이터를 가져 오기 위해 노력하고있어, 클라이언트 2 종류가있다. 하나는 웹 클라이언트이고 다른 하나는 에이전트 클라이언트입니다.

웹 클라이언트는 사용자가 업로드 할 때 csv 파일을 처리하고 웹 클라이언트는 csv 파일에서 매 10,000 행을 읽고 csv 총 행 수가 Producer 인 데이터 메시지를 보냅니다. 생산자가 kafka에게 메시지를 보낸 다음 소비자가 메시지를 가져 와서 elasticsearch로 데이터를 가져옵니다. 동시에 소비자는 데이터 메시지 길이 및 CSV 총계를 사용하여 작업 진행률을 업데이트하고 오류 로그가있는 경우이를 업데이트합니다. 마침내 우리 웹 클라이언트는 오류를 알고 진행 상황을 가져옵니다.

에이전트 클라이언트 감시 로그 파일이 변경되면 새로운 로그가 생성되면 웹 클라이언트와 동일한 메시지를 생산자에게 전송하지만 진행 상황은 신경 쓰지 않습니다. 로그는 항상 nginx 로그처럼 커집니다.

프레임 워크 : enter image description here

생산자와 소비자가 kafka-python을 사용하는 우리의 파이썬 프로그램입니다 : 여기

은 내가 사용하는 프레임 워크입니다.

문제 :

  1. 때때로 소비자는 자동 재시작 된 것이며, 다시 동일한 데이터를 다시 가져온, 추락했다.
  2. 언젠가 클라이언트가 너무 많은 메시지를 보내면 http 요청에 제한 사항이 있으므로 제작자가 일부 메시지를 놓칠 수도 있습니다.

질문 :

그 일을 할 수있는 더 나은 프레임 워크가 있습니까? kafka-connect-elasticsearch를 사용하면 스파크 스트리밍이 가능합니까?

+0

Logstash를 보셨습니까? 어떤 종류의 작업 제작자 및 소비자가 수행해야 할 것인가? 데이터 푸시 또는 사용자 정의 논리뿐입니다. 데이터에 어떤 형식이 사용됩니까? 질문에 대한 자세한 내용을 입력하십시오 –

+0

안녕하세요 알렉스, 지켜봐 줘서 고마워, 내 질문을 업데이 트했습니다. 그리고 전에 logstash를 사용하지 않았습니다. 설명하는 시나리오에 맞습니까? –

답변

1

예 - Kafka Connect Elasticsearch connector을 사용하십시오. 이것은 당신의 인생을 훨씬 쉽게 만들어 줄 것입니다. Kafka Connect API는 이러한 어려운 작업 (재시작, 오프셋 관리 등)을 모두 수행하도록 특별히 설계되었습니다. 최종 사용자는 구성 파일을 설정하기 만하면됩니다. Kafka Connect here을 사용하는 예를 읽을 수 있습니다.

카프카 커넥트는 아파치 카프카의 일부입니다. Elasticsearch 커넥터는 오픈 소스이며 github에서 사용할 수 있습니다. 또는 최신 버전의 카프카를 커넥터 (Elasticsearch, HDFS 등) 및 기타 유용한 도구로 번들로 제공하는 Confluent Platform을 다운로드하십시오.

+0

kafka connect elasticsearch를 사용하는 경우 데이터 가져 오기 진행률 및 오류 로그를 가져올 생각이 있습니까? –