2016-09-30 2 views
2

병렬 처리가 8 인 MapStream이 있습니다. 두 개의 싱크를 DataStream에 추가합니다. 하나는 느리고 (Elasticsearch) 다른 하나는 빠릅니다 (HDFS). 그러나 내 이벤트는 ES에 플러시 된 후에 만 ​​HDFS에 기록되므로 ES가없는 경우보다 ES의 경우 시간이 오래 걸립니다.Apache Flink에서 싱크에 병렬 쓰기 방법

dataStream.setParallelism(8); 
dataStream.addSink(elasticsearchSink); 
dataStream.addSink(hdfsSink); 

두 싱크 모두 동일한 스레드를 사용합니다. 가능하다면 두 개의 싱크가있는 동일한 소스를 사용하거나 출력을 병렬로 쓰려면 싱크 싱크를 위해 다른 작업을 추가해야합니까?

지도에서 Map (1/8)에서 Map (8/8)으로 전개되고 데이터가 수신되는지 로그에서 확인했습니다.

답변

1

Elasticsearch 싱크가 입력 속도를 따라 잡지 못하면 입력 연산자가 느려집니다. 이 개념은 배압 (backpressure)이라 불리는데, 이는 느린 소비자가 빠른 생산자가 처리하는 것을 막는다는 것을 의미합니다.

예상대로 프로그램을 동작시키는 유일한 방법은 HDFS 싱크가 작성했지만 Elasticsearch 싱크가 아직 작성하지 않은 모든 레코드를 버퍼링하는 것입니다. Elasticsearch 싱크가 지속적으로 느려지면 어느 시점에서 메모리/디스크 공간이 부족합니다.

느린 소비자 문제를 해결하기위한 Flink의 접근 방식은 배후 압력입니다.

  1. 을 늘 ElasticsearchSink의 병렬 :

    나는이 문제를 해결하는 두 가지 방법을 참조하십시오. Elasticsearch 설정의 기능에 따라 도움이 될 수도 있고 도움이되지 않을 수도 있습니다.

  2. 두 작업을 모두 독립적 인 파이프 라인으로 실행하십시오. 이 경우 모든 결과를 두 번 계산해야합니다.
+0

역압에 대해 생각해 보면 같은 방향으로 돌고있었습니다. –