2016-06-26 3 views
2

나는 다음과 같은 구성 요소가 있습니다결합 여러 CompletableFutures

private JobInfo aggregateJobInfo() { 
    final JobsResult jobsResult = restClient().getJobs(); 
    final List<String> jobIds = extractJobIds(jobsResult); 

    //fetch details, exceptions and config for each job 
    final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> { 
     final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId); 
     final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId); 
     final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId); 
     return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult); 
    }).collect(Collectors.toList()); 
    return new JobInfo(jobsResult, jobDetails); 
} 

private static List<String> extractJobIds(final JobsResult jobsResult) { 
    final ArrayList<String> jobIds = new ArrayList<>(); 
    jobIds.addAll(jobsResult.getRunning()); 
    jobIds.addAll(jobsResult.getFinished()); 
    jobIds.addAll(jobsResult.getCanceled()); 
    jobIds.addAll(jobsResult.getFailed()); 
    return jobIds; 
} 

그냥 약간의 엔드 포인트를 호출하고 일부 데이터를 aggergates합니다. 지금은

private CompletableFuture<JobInfo> aggregateJobInfo() { 
    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs(); 
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds); 

    //fetch details, exceptions and config for each job 
    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture = jobIdsFuture.thenApply(jobIds -> { 
     return jobIds.stream().map(jobId -> { 
      final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId); 
      final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId); 
      final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId); 
      return jobDetailsResultFuture.thenCompose(jobDetailResult -> { 
       return jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> { 
        return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult); 
       }); 
      }); 

     }).collect(Collectors.toList()); 
    }); 
    return null; 

내 문제는 JobInfo가`새로운 JobInfo (jobsResult, jobDetails 때 여기 CompletableFuture를 만드는 방법입니다 .. 정말 이전에 사용하지 않은 CompletableFutures를 사용하여 그 이외의 차단을 만들려고 해요)!

내가 말했듯이, 나는 이것에 익숙하지 않은 것 같다. 어쩌면 내 접근법이 나쁘고 더 좋은 해결책이 있을까?

에서

평가 모든 아이디어, 덕분에 첫 번째 작업 버전 :

private CompletableFuture<JobInfo> aggregateJobInfo() { 

    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs(); 
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds); 

    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFutureListFuture = 
      jobIdsFuture.thenApply(jobIds -> jobIds.stream().map(jobId -> { 
       final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId); 
       final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId); 
       final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId); 
       return jobDetailsResultFuture.thenCompose(jobDetailResult -> 
         jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> 
           new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult))); 
      }).collect(Collectors.toList())); 

    return jobDetailsFutureListFuture.thenCompose(jobDetailsFutures -> 
      CompletableFuture.allOf(jobDetailsFutures.toArray(
        new CompletableFuture[jobDetailsFutures.size()])).thenApply(aVoid -> 
        jobDetailsFutures.stream() 
          .map(CompletableFuture::join) 
          .collect(Collectors.toList()))) 
      .thenApply(jobDetails -> jobsResultFuture.thenApply(jobsResult -> 
        new JobInfo(jobsResult, jobDetails))) 
      .join(); 
} 
+1

붙여 넣은 코드의 대부분이 질문과 관련이없는 것처럼 보입니다. 당신이 필요로하는 것의 최소한의 예로 그것을 줄일 수 있습니까? – the8472

+0

질문은 위의 미래 데이터를 "매핑"하는 방법이므로 CompletableFuture 이 반환됩니다. –

답변

4

당신은이 :

  • CompletableFuture<JobsResult> jobsResultFuture
  • CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture
  • JobInfo(JobsResult a, List<JobDetails> b)

당신은

CompletableFuture<JobInfo>

추가 관찰을 원하는 : jobsResultFuture가 완료되면 jobDetailsFuture 만 완료 할 수 있습니다. 포착 (thenApply

  • List<JobDetails> + CompletableFuture<JobsResult>를 통해>List<JobDetails> - (캡처 VAR 등)
  • Void + List<CompletableFuture<JobDetails>>thenCompose에서 allOf를 통해>Void -

    1. List<CompletableFuture<JobDetails>> :

      그래서 다음을 구현할 수 있습니다 var) ->JobInfo via thenApply

    해당 매퍼 함수 내에서 get()을 통해 선물을 간단하게 언 랩핑 할 수 있습니다. 왜냐하면 해당 시점에 선조 선물의 종속성으로 인해 해당 시점에 선물이 완료되었음을 보장하기 때문입니다.

    thenCombine을 사용하고 스트림을 줄이는 다른 방법도 가능하지만 좀 더 자세한 정보를 제공하고 더 많은 중간 선물을 만듭니다.

  • +0

    죄송합니다. 이해가 안되네 ... '기다려주십시오'와 '공허'가 무엇을 의미합니까? –

    +0

    은 [allOf] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#allOf-java.util.concurrent.CompletableFuture...-)를 의미합니다.'CompletableFuture '을 반환하므로 데이터를 가져 오는 다음 단계는 – the8472

    +0

    여전히 이해할 수 없습니다. jobDetailsListFuture.thenCompose 시도 (jobDetailsFutures를 -> { 창 CompletableFuture.allOf (jobDetailsFutures.toArray ( 새로운 CompletableFuture [jobDetailsFutures.size()])) .thenApply (피하기 -> {} ???) }); 하지만 무효화로해야 할 일을 모릅니다. –