2017-11-12 3 views
0

지연을 피하고 프로세스 속도를 높이기 위해 스파크 스트리밍에서 스레드 풀을 빌드합니다.스파크 스트리밍 프로그램에서 스레드 풀 구축

stream.foreachRDD(rdd=> { 
    rdd.foreachPartition { rddPartition => { 
    val client: Client = ESClient.getInstance.getClient 
    var num = Random.nextInt() 
    val threadPool: ExecutorService = Executors.newFixedThreadPool(5) 
    val confs = new Configuration() 
    rddPartition.foreach(x => { 
     threadPool.execute(new esThread(x._2, num, client, confs)) 
    }  )  }  } } ) 

esThread의 기능은 첫째, 우리는 elasticsearch 문의 있다는 것입니다 우리는 마침내 우리가 HDFS에 결과를 기록, ES의 쿼리 결과를 얻을, 다음과 같이 메인 프로그램이 나열됩니다. 그러나 HDFS에서 결과 파일의 데이터가 많이 부족하다는 사실을 발견했습니다. 이는 약간 남았습니다. 스파크 스트리밍에서 쓰레드 풀을 만들 수 있는지 궁금합니다. 스파크 스트리밍의 스레드 풀이 일부 데이터가 누락 되었습니까?

귀하의 도움에 감사드립니다.

답변

0

분할 영역은 이미 별도의 스레드에서 처리되며 이전 분할 영역이 완료 될 때까지 다음 배치로 진행되지 않습니다. 그래서 당신에게 무엇이든 사주는 것은 아니며 자원 사용 추적을 덜 투명하게 만듭니다.

동시에 코드가 구현됨에 따라 데이터가 손실 될 가능성이 있습니다. threadPoolawaitTermination이 아니기 때문에 모든 데이터가 처리되기 전에 상위 스레드가 종료 될 수 있습니다.

전반적으로 유용한 접근법이 아닙니다. 처리량을 늘리려면 파티션 수와 컴퓨팅 리소스 양을 조정해야합니다.

+0

답변을 주셔서 감사합니다. 또한 질문이 있습니다 .1. '부모 스레드'가 파티션 스레드를 참조합니까? 2. threadPool이 대기 중이 지 않다는 것을 의미합니까? threadPool awaitTermination을 만들면 접근 방식이 작동합니까? ? 내가 얻을 수있는 컴퓨팅 리소스의 한계 때문에, 처리량을 높이기 위해 더 많은 스레드를 만들고 싶습니다. – mumumuyanyanyan