2016-10-20 3 views
1

병렬 처리를 사용하지만 특정 제한 시간 내에 여러 "값 비싼"결과를 검색하고 싶습니다.GPars를 사용하여 주어진 시간 내에 여러 개의 비동기 결과를 얻으려면 어떻게해야합니까?

Dataflow.task를 사용하고 있는데 모든 데이터 흐름 변수가 바인딩되어있을 때만 프로세스가 반환되기 때문에 뭔가 빠졌습니다.

def timeout = 500 
def mapResults = [] 
GParsPool.withPool(3) { 
    def taskWeb1 = Dataflow.task { 
     mapResults.web1 = new URL('http://web1.com').getText() 
    }.join(timeout, TimeUnit.MILLISECONDS) 
    def taskWeb2 = Dataflow.task { 
     mapResults.web2 = new URL('http://web2.com').getText() 
    }.join(timeout, TimeUnit.MILLISECONDS) 
    def taskWeb3 = Dataflow.task { 
     mapResults.web3 = new URL('http://web3.com').getText() 
    }.join(timeout, TimeUnit.MILLISECONDS) 
} 

나는 제한 시간 내에 가장 빠른 결과를 얻기 위해 선택을 사용하도록 GPars Timeouts doc 방식으로 표시했다. 그러나 주어진 시간 프레임에서 가능한 한 많은 결과를 검색하는 방법을 찾고 있습니다.

더 나은 "GPars"방법이 있나요? 또는 Java 8 Future/Callable을 사용 하시겠습니까? 당신이 너무 자바 (8)에 관심 기반 솔루션을 것 때문에

답변

0

, 여기에 그것을 할 수있는 방법 :

int timeout = 250; 
ExecutorService executorService = Executors.newFixedThreadPool(3); 
try { 
    Map<String, CompletableFuture<String>> map = 
     Stream.of("http://google.com", "http://yahoo.com", "http://bing.com") 
      .collect(
       Collectors.toMap(
        // the key will be the URL 
        Function.identity(), 
        // the value will be the CompletableFuture text fetched from the url 
        (url) -> CompletableFuture.supplyAsync(
         () -> readUrl(url, timeout), 
         executorService 
        ) 
       ) 
      ); 
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS); 

    //print the resulting map, cutting the text at 100 chars 
    map.entrySet().stream().forEach(entry -> { 
     CompletableFuture<String> future = entry.getValue(); 
     boolean completed = future.isDone() 
       && !future.isCompletedExceptionally() 
       && !future.isCancelled(); 
     System.out.printf("url %s completed: %s, error: %s, result: %.100s\n", 
      entry.getKey(), 
      completed, 
      future.isCompletedExceptionally(), 
      completed ? future.getNow(null) : null); 
    }); 
} catch (InterruptedException e) { 
    //rethrow 
} finally { 
    executorService.shutdownNow(); 
} 

이 당신에게 당신이 URL에 많은 Future의를 제공하지만, 당신이 볼 수있는 기회를 제공합니다 예외로 인해 실패한 태스크가있는 경우 당신은 이러한 예외, 성공적인 취득에서는의 내용에 관심이 있다면 코드는 간단하게 할 수있다 :

int timeout = 250; 
ExecutorService executorService = Executors.newFixedThreadPool(3); 
try { 
    Map<String, String> map = Collections.synchronizedMap(new HashMap<>()); 
    Stream.of("http://google.com", "http://yahoo.com", "http://bing.com") 
     .forEach(url -> { 
      CompletableFuture 
       .supplyAsync(
        () -> readUrl(url, timeout), 
        executorService 
       ).thenAccept(content -> map.put(url, content)); 
     }); 
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS); 

    //print the resulting map, cutting the text at 100 chars 
    map.entrySet().stream().forEach(entry -> { 
     System.out.printf("url %s completed, result: %.100s\n", 
      entry.getKey(), entry.getValue()); 
    }); 
} catch (InterruptedException e) { 
    //rethrow 
} finally { 
    executorService.shutdownNow(); 
} 

코드의 모두 (더에만 작은 조금 걸릴 것입니다 약 250 밀리 초 동안 대기의 때문에 결과를 인쇄하기 전에 실행 프로그램 서비스에 태스크 제출). 약 250 밀리 초가 내 네트워크에서 이러한 URL 중 일부를 가져올 수있는 임계 값이지만 전체는 아닙니다. 실험 시간 제한을 자유롭게 조정하십시오.

readUrl(url, timeout) 방법의 경우 Apache Commons IO과 같은 유틸리티 라이브러리를 사용할 수 있습니다. Executor 서비스에 제출 된 태스크는 사용자가 명시 적으로 timeout 매개 변수를 고려하지 않더라도 인터럽트 신호를받습니다. 구현을 제공 할 수는 있지만 문제의 주요 문제의 범위를 벗어났습니다.

+0

답장을 보내 주셔서 감사합니다. 실제로는 awaitTermination이 핵심입니다.) – Wavyx

+0

및 URL 읽기에 대한 유용한 정보 타임 아웃 – Wavyx