2016-07-19 3 views
0

단순한 Spark 작업과 관련하여 문제가 발생했습니다. 이 코드Spark 작업이 반환 된 직후에 투기 작업을 계속 실행할 수 있습니까?

JavaRDD<ObjectNode> rdd = pullAndProcessData(); 
ManifestFilesystem fs = getOutputFS(); 
List<WriteObjectResult> writeObjectResults = rdd.mapPartitions(fs::write).collect(); 
fs.writeManifest(Manifest.makeManifest(writeObjectResults)); 

내 기대는 무슨 일이 생기면 모든 작업을 완료하고 성공적으로 S3에 자신의 파티션을 작성하고 경우에만 경우, writeManifest를 호출 할 예정이다. 문제는 분명히 발생해야하는 매니 페스트 후에 일부 작업이 S3에 작성하는 것입니다. (있는 경우) 일반 워크 플로를해야하기 때문에

ManifestFilesystem.write, 나는 그것을 무효로 기존 매니페스트를 삭제합니다

  • 가 S3
  • 에 매니페스트 쓰기 S3에 모든 파티션을 쓰기 나는 그것이 때문에 추측 작업으로 일어날 수 의심하고있어

다음 시나리오 :

    ,
  • 일부 작업이 speculatable 표시 및
  • 모든 추측 작업들이 보내졌다 적어도 하나 개의 슬레이브에 반환하는 다른 노예에 다시 보내지 만, 그들 중 일부는 느린 노예에서 계속 실행하는 작업을 방해하지 않는
  • 스파크 작업이
  • 추측 작업을 중단하기 전에 자신의 파티션

쓰기 전에 여전히 ManifestTimeslice.write을 실행하고 매니페스트를 삭제 마지막으로 실행 된 드라이버에 collect의 결과를 반환 뭔가 일어날 수인가요? 아무도 그러한 행동에 대한 또 다른 가설을 가지고 있습니까? 사실 난 내 직감을 확인하는 경향이 this을 발견,하지만 여전히 확인 때문에을 가지고 좋은 것 : 내장 데이터 게시 방법은 옵션

2 주 아닙니다 사용 :

주 표준 HDFS 또는 S3 읽기/쓰기 메서드를이 질문의 범위를 벗어나는 이유로 사용하지 않습니다.

+0

안녕하세요, 당신은 깊은 다이빙을하는 전체 코드와 로그를 게시하시기 바랍니다 수 있습니까? –

+0

안녕 Praveen, 제안 주셔서 감사합니다,하지만 난 멀리이 질문에 가지 않을 것입니다. 나는 로그에서 내 자신의 깊은 다이빙을했고 이것은 내가이 가설을 생각해내는 방법이다. 나는 그게 근본 원인이라고 증명할 수 없었고, 그것이 합당한 설명인지를 확인하기를 원했다. 부작용 작업 기능과 추측 작업 간의 이러한 종류의 상호 작용을 처음 경험하게됩니다. – Dici

답변

0

나는 Spark의 관점에서 그 주위에 방법이 없다는 것을 깨닫고 나 자신의 질문에 답할 것입니다. 완료 할 시간이 오기 전에 모든 투기 작업을 어떻게 죽일 수 있는지 어떻게 생각합니까? 실제로 파일이 완전히 실행되도록하는 것이 좋습니다. 그렇지 않으면 파일에 쓰는 동안 파일이 삭제되어 잘릴 수 있습니다.

이 다른 가능한 방법은 다음과 같습니다 this thread

  • 몇 가지 메시지를 하나의 공통 연습 그것은을 때문에 대부분의 파일 시스템에 싼 원자 이름 바꾸기를 (수행하기 전에 임시 시도 파일에 기록하는 것을 제안 단순한 포인터 스위치).추측 적 작업이 임시 파일의 이름을 기존 이름으로 변경하려고 시도 할 경우 (작업이 아톰 인 경우 동시에 발생하지 않음) 이름 변경 요청은 무시되고 임시 파일이 삭제됩니다.

  • 제 생각에는 S3은 원자 이름을 바꾸지 않습니다. 또한 위에 설명 된 프로세스가 구현하기가 쉽지만 현재는 가정용 솔루션을 최대로 제한하고 시스템을 간단하게 유지하려고합니다. 따라서 최종 해결책은 jobId (예 : 작업이 시작된 시간 소인)을 사용하여 슬레이브로 전달하고 매니페스트에 기록하는 것입니다. FS에 파일을 기록 할 때, 다음과 같은 논리가 적용됩니다 :

    public WriteObjectResult write(File localTempFile, long jobId) { 
        // cheap operation to check if the manifest is already there 
        if (manifestsExists()) { 
         long manifestJobId = Integer.parseInt(getManifestMetadata().get("jobId")); 
         if (manifestJobId == jobId) { 
          log.warn("Job " + jobId + " has already completed successfully and published a manifest. Ignoring write request." 
          return null; 
         } 
         log.info("A manifest has already been published by job " + jobId + " for this dataset. Invalidating manifest."); 
         deleteExistingManifest(); 
        }  
        return publish(localTempFile); 
    } 
    
1

스파크는 추측 적 작업을 사전에 죽이지 않습니다. 작업이 끝날 때까지 기다렸다가 결과를 무시합니다. collect 호출 후에는 투기 작업이 계속 쓰여질 수 있다고 생각합니다.

+0

그래, 내가 생각했던대로 = (나는 그것이 도움이 될 수 없다고 생각한다. 그런 엣지 경우를 피할 방법이 없다. 나는 이것을 위해 사용자 정의 논리를 추가해야 할 것이다. – Dici