2017-03-09 12 views
1

객체를 입력 매개 변수로 사용하여 객체를 가져올 라이브러리에서 작업하고 있습니다. URL을 생성 한 다음 응용 프로그램 서버를 호출합니다. 아파치 http 클라이언트를 사용하여 우리의 라이브러리를 사용하는 고객에게 응답을 돌려 보낸다. 일부 고객은 executeSync 메서드를 호출하여 동일한 기능을 얻게되며 일부 고객은 executeAsync 메서드를 호출하여 데이터를 가져옵니다.다중 스레드 환경에서 각 하위 작업을 병렬로 실행하십시오.

  • executeSync() - 내가 결과있을 때까지 대기, 결과를 반환합니다.
  • - 필요하다면 다른 일이 처리 된 후 즉시 처리 할 수있는 미래를 즉시 반환합니다.

는 다음 두 가지 방법으로 위가 내 DataClient 클래스 : 아래

public class DataClient implements Client { 
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16); 
    private CloseableHttpClient httpClientBuilder; 

    // initializing httpclient only once 
    public DataClient() { 
    try { 
     RequestConfig requestConfig = 
      RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500) 
       .setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build(); 
     SocketConfig socketConfig = 
      SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build(); 

     PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = 
      new PoolingHttpClientConnectionManager(); 
     poolingHttpClientConnectionManager.setMaxTotal(300); 
     poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200); 

     httpClientBuilder = 
      HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager) 
       .setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build(); 
    } catch (Exception ex) { 
     // log error 
    } 
    } 

    @Override 
    public List<DataResponse> executeSync(DataRequest key) { 
    List<DataResponse> responsList = null; 
    Future<List<DataResponse>> responseFuture = null; 

    try { 
     responseFuture = executeAsync(key); 
     responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit()); 
    } catch (TimeoutException | ExecutionException | InterruptedException ex) { 
     responsList = 
      Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, 
       DataStatusEnum.ERROR)); 
     responseFuture.cancel(true); 
     // logging exception here 
    } 
    return responsList; 
    } 

    @Override 
    public Future<List<DataResponse>> executeAsync(DataRequest key) { 
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder); 
    return this.forkJoinPool.submit(task); 
    } 
} 

또한 URL을함으로써 우리의 응용 프로그램 서버를 호출하는 정적 클래스 DataRequestTask을 가지고 내 DataFetcherTask 클래스입니다 :

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> { 
    private final DataRequest key; 
    private final CloseableHttpClient httpClientBuilder; 

    public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) { 
    this.key = key; 
    this.httpClientBuilder = httpClientBuilder; 
    } 

    @Override 
    protected List<DataResponse> compute() { 
    // Create subtasks for the key and invoke them 
    List<DataRequestTask> requestTasks = requestTasks(generateKeys()); 
    invokeAll(requestTasks); 

    // All tasks are finished if invokeAll() returns. 
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size()); 
    for (DataRequestTask task : requestTasks) { 
     try { 
     responseList.add(task.get()); 
     } catch (InterruptedException | ExecutionException e) { 
     Thread.currentThread().interrupt(); 
     return Collections.emptyList(); 
     } 
    } 
    return responseList; 
    } 

    private List<DataRequestTask> requestTasks(List<DataRequest> keys) { 
    List<DataRequestTask> tasks = new ArrayList<>(keys.size()); 
    for (DataRequest key : keys) { 
     tasks.add(new DataRequestTask(key)); 
    } 
    return tasks; 
    } 

    // In this method I am making a HTTP call to another service 
    // and then I will make List<DataRequest> accordingly. 
    private List<DataRequest> generateKeys() { 
    List<DataRequest> keys = new ArrayList<>(); 
    // use key object which is passed in contructor to make HTTP call to another service 
    // and then make List of DataRequest object and return keys. 
    return keys; 
    } 

    /** Inner class for the subtasks. */ 
    private static class DataRequestTask extends RecursiveTask<DataResponse> { 
    private final DataRequest request; 

    public DataRequestTask(DataRequest request) { 
     this.request = request; 
    } 

    @Override 
    protected DataResponse compute() { 
     return performDataRequest(this.request); 
    } 

    private DataResponse performDataRequest(DataRequest key) { 
     MappingHolder mappings = DataMapping.getMappings(key.getType()); 
     List<String> hostnames = mappings.getAllHostnames(key); 

     for (String hostname : hostnames) { 
     String url = generateUrl(hostname); 
     HttpGet httpGet = new HttpGet(url); 
     httpGet.setConfig(generateRequestConfig()); 
     httpGet.addHeader(key.getHeader()); 

     try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) { 
      HttpEntity entity = response.getEntity(); 
      String responseBody = 
       TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(), 
        StandardCharsets.UTF_8); 

      return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK); 
     } catch (IOException ex) { 
      // log error 
     } 
     } 
     return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR); 
    } 
    } 
} 

DataRequest 오브젝트에는 DataResponse 오브젝트가 있습니다. 이제 누군가가 DataRequest 객체를 전달하여 라이브러리를 호출하면 내부적으로 List<DataRequest> 객체를 생성 한 다음 각 DataRequest 객체를 병렬로 호출하고 List<DataResponse>을 반환합니다. 여기서 DataResponse 객체는 해당 DataRequest 객체에 대한 응답을 갖습니다. 다음은

는 흐름이다 :

  • 고객은 DataRequest 객체를 전달하여 DataClient 클래스를 호출합니다. 요구 사항에 따라 executeSync() 또는 executeAsync() 메서드를 호출 할 수 있습니다.
  • 이제 단일 DataRequestkey 주어진 객체 (ForkJoinTask'sRecursiveTask 부속의 하나)에 DataFetcherTask 클래스에서, I는 List<DataRequest>를 생성하고 목록 내의 각 객체 DataRequest 병렬로 각 서브 태스크를 호출한다. 이러한 하위 작업은 상위 작업과 동일한 ForkJoinPool에서 실행됩니다.
  • 이제 DataRequestTask 클래스에서 각 DataRequest 개체를 URL을 만들고 실행하여 해당 DataResponse 개체를 반환합니다.

문제 설명 : 그것은 매우 빠른되어야합니다 있도록이 라이브러리는 매우 높은 처리량 환경에서 호출되고 있기 때문에

. 동기 호출의 경우 별도의 스레드에서 실행하는 것이 좋습니다. 이 경우 스레드의 컨텍스트 전환 비용과 함께 스레드에 대한 추가 비용과 자원이 필요하므로 약간 혼란 스럽습니다.또한 여기에 ForkJoinPool을 사용하고 있는데, 여분의 스레드 풀을 사용하여 저를 절약 할 수 있습니다. 그러나 여기에 올바른 선택입니까?

같은 효과를 낼 수있는 효율적인 방법이 있습니까? 나는 Java 7을 사용하고 있으며 구아바 라이브러리에 액세스 할 수 있습니다. 그렇다면 어떤 것도 단순화 할 수 있다면 개방형입니다.

아주 무거운 상태에서 실행될 때 우리가 어떤 경합을하는 것처럼 보입니다. 매우 무거운 하중에서 실행될 때이 코드가 스레드 경합에 들어갈 수있는 방법이 있습니까?

+0

[ThreadPool] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html)과 같은 소리가 유용 할 수 있지만 조기 최적화는 모든 악의 근원 –

+0

@ScaryWombat 동의하고 부하 테스트를 할 것이지만 질문은 ThreadPool의 특수 형식 인 ForkJoinPool 만 사용하면 합리적입니다. 그리고 executeSync 메서드를 사용하는 방식이 옳았는지 아닌지? – john

+0

어떤 종류의 경쟁이 있습니까? 어쩌면 무거운 하중에 대한'새로운 ForkJoinPool (16);'충분하지 않다,'16'을 더 큰 값으로 늘리려고하자 – Teg

답변

0

비동기 http 호출을 사용하는 것이 더 좋다고 생각합니다 (링크 : HttpAsyncClient). 그리고 당신은 쓰레드 풀을 사용할 필요가 없습니다. executeAsync 방법에

가 비어 CompletableFuture <DataResponse>()를 생성하고 클라이언트 호출에 전달, (예외 올릴 경우 또는 completeExceptionally) 그것에 대한 완전한 호출하여 completableFuture의 결과가 콜백 호출에 설정합니다. ExecuteSync 메서드 구현이 좋아 보인다.

편집 : 자바 7의

단지 ListenableFuture 같은 구아바의 구현, 또는

0

ForkJoinPool가 올바른지 사용하는 선택 유사한 것을 약속하는 completableFuture를 교체 할 필요가있어, 효율성을 위해 설계 모든 스레드를을 찾을 수있는 풀 시도 :

ForkJoinPool 주로 작업 훔치기를 사용하는 덕분에 ExecutorService를 다른 종류의 다릅니다 많은 작은 작업에 d 풀에 제출 된 작업 및/또는 다른 활성 작업에 의해 생성 된 작업을 실행합니다 (존재하지 않으면 작업 대기를 막습니다). 대부분의 태스크가 대부분의 ForkJoinTasks처럼 다른 하위 태스크를 생성 할 때뿐만 아니라 많은 작은 태스크가 외부 클라이언트로부터 풀에 제출 될 때 효율적 처리가 가능합니다. 특히 생성자에서 asyncMode를 true로 설정하면 ForkJoinPools가 결합되지 않은 이벤트 스타일 작업에 적합 할 수도 있습니다.

나는 작업이 가입되지 않습니다 귀하의 경우 이후 생성자에서 asyncMode = true을 시도하는 것이 좋습니다 : 들어

public class DataClient implements Client { 
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16, ForkJoinPool.ForkJoinWorkerThreadFactory, null, true); 
... 
} 

executeSync() 당신이 forkJoinPool.invoke(task)을 사용할 수 있습니다,이 동기를 할 수있는 관리 방법 자원 최적화를위한 풀에서 작업 실행 :

@Override 
public List<DataResponse> executeSync(DataRequest key) { 
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder); 
    return this.forkJoinPool.invoke(task); 
} 

자바 8을 사용할 수있는 경우는 이미 최적화 된 공통 풀이 :

+0

'asyncMode = true'를 사용해야하는 예제를 알려줄 수 있습니까? 또한'executeSync()'메소드는 어떻게 생겼을까요? 이것에 대해 조금 혼란스러워합니다. – john

+0

나는 대답 – Teg

+0

에 몇 가지 예제를 추가했다. 그래서'executeSync' 메서드 내에서'executeAsync' 메서드를 호출 할 필요가 없으며 단지 제안한 것을 할 필요가 없다는 것을 의미합니까? 그렇다면 시간 제한이 현재 사진에 어떻게 적용됩니까? 나는 타임 아웃 응답을 되 돌리려 고했다. 여기서 어떻게 작동할까요? – john