2016-07-10 6 views
0

질문 : RDD 구축에 걸리는 시간을 테스트하는 유효한 방법입니까?Apache Spark timing for JavaRDD에 대한 각각의 작업

여기서는 두 가지 작업을 수행하고 있습니다. 기본 접근법은 DropEvaluation 및 N DropResults라고하는 M 인스턴스를 사용한다는 것입니다. 우리는 각각의 N DropResult를 M DropEvaluations 각각과 비교해야합니다. 각 N은 각 M에 의해 보여 져야만합니다. 결국 M 결과를 제공해야합니다.

RDD가 빌드 된 후 .count()를 사용하지 않으면 드라이버는 다음 코드 행으로 넘어가 30 분 정도 걸리는 RDD를 빌드 할 시간이 거의 없다고 말합니다.

나는 단지 .count() 시간이 오래 걸리는 것처럼 뭔가 놓치지 않았는지 확인하고 싶습니다. Spark의 소스를 수정해야하는 .count() 시간을 맞춰야할까요?

M = 1000 또는 2000. N = 10^7.

사실상 데카르트 문제입니다. 누적 기는 각 M에 쓰기를해야하기 때문에 선택되었습니다. 나는 완전한 직교 RDD를 만드는 것은 또한 추한 것입니다.

우리는 M 누적 계산기 목록을 작성합니다 (Java에서 목록 누산기를 수행 할 수 없습니까?). 그런 다음 foreach를 사용하여 RDD에서 각각의 N을 반복합니다.

질문의 명확화 : 총 소요 시간이 정확하게 측정되었습니다. RDD의 .count()가 RDD가 완료 될 때까지 기다렸다가 카운트를 실행할 수 있는지 묻습니다. .count() 시간이 중요합니까?

가 여기에 우리의 코드 : 당신은 RDD에 count처럼 모든 조치를 호출 할 때까지

// assume standin exists and does it's thing correctly 

// this controls the final size of RDD, as we are not parallelizing something with an existing length 
List<Integer> rangeN = IntStream.rangeClosed(simsLeft - blockSize + 1, simsLeft).boxed().collect(Collectors.toList()); 

// setup bogus array of size N for parallelize dataSetN to lead to dropResultsN  
JavaRDD<Integer> dataSetN = context.parallelize(rangeN); 

// setup timing to create N 
long NCreationStartTime = System.nanoTime(); 

// this maps each integer element of RDD dataSetN to a "geneDropped" chromosome simulation, we need N of these: 
JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY()); 

// **** this line makes the driver wait until the RDD is done, right? 
long dummyLength = dropResultsN.count(); 


long NCreationNanoSeconds = System.nanoTime() - NCreationStartTime; 
double NCreationSeconds = (double)NCreationNanoSeconds/1000000000.0; 
double NCreationMinutes = NCreationSeconds/60.0; 

logger.error("{} test sims remaining", simsLeft); 

// now get the time for just the dropComparison (part of accumulable's add) 
long startDropCompareTime = System.nanoTime(); 

// here we iterate through each accumulator in the list and compare all N elements of dropResultsN RDD to each M in turn, our .add() is a custom AccumulableParam 
for (Accumulable<TholdDropTuple, TholdDropResult> dropEvalAccum : accumList) { 
    dropResultsN.foreach(new VoidFunction<TholdDropResult>() { 
        @Override 
        public void call(TholdDropResult dropResultFromN) throws Exception { 
          dropEvalAccum.add(dropResultFromN); 
        } 
       }); 
      } 

    // all the dropComparisons for all N to all M for this blocksize are done, check the time... 
    long dropCompareNanoSeconds = System.nanoTime() - startDropCompareTime; 
    double dropCompareSeconds = (double)dropCompareNanoSeconds/1000000000.0; 
    double dropCompareMinutes = dropCompareSeconds/60.0; 

    // write lines to indicate timing section 
    // log and write to file the time for the N-creation 

    ... 

} // end for that goes through dropAccumList 
+0

나는 Dikei의 대답에 대해 아래에서 언급했는데, 나는 + 1'd, 핵심 문제에 대답하지 않는다. 정확한 시간은 RDD를 만드는 데 유효한 방법 일까? 이 카운트가 RDD 생성 시간을 단축시키는 데 상당한 시간이 걸립니까? 타이밍의 좋은 예에 대한 링크는 무엇인가? – JimLohse

답변

1

스파크 프로그램이 게으른, 그것은 실행되지 않습니다. 이 경우에 dropResultsN이 계산 강제 count, 그래서 시간이 오래 걸릴거야, Spark's document

// **** this line makes the driver wait until the RDD is done, right? 
long dummyLength = dropResultsN.count(); 

예에서 일반적 행동의 목록을 찾을 수 있습니다. 두 번째로 count을 수행하면 RDD가 이미 계산되고 캐시되어 있기 때문에 매우 빠르게 반환됩니다.

+0

RDD가 계산되지 않았다면 나는 당신과 동의 할 것입니다. 내가'dropResultsN.foreach'를 호출하기 때문에 이것은 계산되게한다. Foreach가 연결된 목록에 있습니다. 그래서 반쯤 잘못 됐나 봐요? 또한 나는 캐시 된 것으로 생각했다 :'JavaRDD dropResultsN = dataSetN.map (s -> standin.call). persist (StorageLevel.MEMORY_ONLY());'... 아니면 dataSetN만을 지속 하는가? 와우 당신은 방금 내 마음을 날려 버린 것 같습니다 :) – JimLohse

+1

내 실수는, 나는 '지속'전화를 보지 못했어요. MEMORY_ONLY로'persist'를 호출하는 것은'cache'를 호출하는 것과 같습니다. – Dikei

+0

내 질문에 위의 의견을 참조하십시오, 당신은 첫 번째 .count() 시간이 오래 걸린다는 생각에 타격을받습니다. Spark 또는 일부 링크/예제 타이밍에 대한 유효한 접근법에 대한 다른 관찰이 있었다면이 대답을 받아 들일 수 있습니다. 그렇지 않으면 며칠 간 가고 누가 차임을하는지 보겠습니다. 고마워요. – JimLohse