2017-09-18 6 views
1

나는 어떻게되는지 알아 내려고합니다 : 여러 개의 socketTextStream에서 프로그램을 읽는 중이고이 텍스트 스트림이 다른 데이터 흐름으로 유입됩니다. 일). 내가 클러스터에서 작업을 실행할 때Flink SocketTextStream 소스를 단일 컴퓨터로 예약

for(int i =0; i< hosts.length; i++) { 

    DataStream<String> someStream = env.socketTextStream(hosts[i], ports[i]); 
    DataStream<Tuple2<String, String>> joinedAdImpressions = rawMessageStream.rebalance() ... 
} 

는 그러나, 나는이 기계의 성능에 대한 심각한 병목 현상 있도록 모든 소스 작업이 하나 개의 시스템에 예약 된 것을 발견 : 그것은 아래에 비슷한 보인다. 어떤 아이디어가 이런 일이 생길까요?

감사합니다.

+0

나는 socketTextStream을 많이 뒤죽박죽로 사용하지 않으므로 방향을 조사 할 것을 권장 할 수 있습니다. kafka 항목을 데이터 소스 (env.addSource (FlinkKafkaConsumer))로 사용하는 경우 클러스터에 파티션이 하나만있는 경우 kafka 데이터 소스에서받은 모든 데이터는 단일 시스템으로 만 전송됩니다. 따라서 병렬 처리가 3 인 경우 데이터는 3 중 하나를 통해 만 전달됩니다 (데이터가 한 컴퓨터에서만 흐른다는 의미인지 확인하고 싶음). 다른 종류의 데이터 소스를 사용하는 것과 비슷합니다. – Jicaar

답변

0

모든 다른 SocketTextStreamFunction 소스가 동일한 시스템에 예약 된 이유는 슬롯 공유 때문입니다. 슬롯 공유를 통해 Flink는 다른 운영자의 작업을 동일한 슬롯에 예약 할 수 있습니다. 이는 예를 들어, 서로 의존하는 작업 (예 : 빌드 - 사이드, 프로브 - 사이드 및 동일한 슬롯에서 실행중인 실제 조인 연산자)간에보다 나은 코 로케이션을 수행 할 수있게합니다. 또한 응용 프로그램에 필요한 슬롯 수에 대해 쉽게 추론 할 수 있습니다. 이는 작업의 최대 병렬 처리입니다.

그러나 작업의 독립적 인 구성 요소는 클러스터를 통해 확산되지 않지만 일반적으로 슬롯 공유로 인해 동일한 슬롯 (결과적으로 동일한 시스템)에있게됩니다.

명시 적으로 다른 슬롯 공유 그룹 이름을 설정하면 작업의 일부분에 대해 슬롯 공유를 비활성화 할 수 있습니다. 그런 다음 동일한 슬롯 공유 그룹에 할당 된 연산자 만 슬롯 공유의 대상이됩니다. 다운 스트림 운영자는 입력에서 슬롯 공유 그룹을 상속받습니다. 따라서, 당황스럽게 평행 한 일을하는 경우에는 출처의 슬롯 공유 그룹 만 설정하면 충분합니다.

for(int i =0; i< hosts.length; i++) { 
    DataStream<String> someStream = env 
     .socketTextStream(hosts[i], ports[i]) 
     .slotSharingGroup("socket_" + i); 

    DataStream<Tuple2<String, String>> joinedAdImpressions = rawMessageStream.rebalance() ... 
} 
+0

고마워, 그건 내 문제를 완벽하게 해결했다. –