객체를 입력 매개 변수로 사용하여 객체를 가져올 라이브러리에서 작업하고 있습니다. URL을 생성 한 다음 응용 프로그램 서버를 호출합니다. 아파치 http 클라이언트를 사용하여 우리의 라이브러리를 사용하는 고객에게 응답을 돌려 보낸다. 일부 고객은 executeSync
메서드를 호출하여 동일한 기능을 얻게되며 일부 고객은 executeAsync
메서드를 호출하여 데이터를 가져옵니다.다중 스레드 환경에서 각 하위 작업을 병렬로 실행하십시오.
executeSync()
- 내가 결과있을 때까지 대기, 결과를 반환합니다.- - 필요하다면 다른 일이 처리 된 후 즉시 처리 할 수있는 미래를 즉시 반환합니다.
는 다음 두 가지 방법으로 위가 내 DataClient
클래스 : 아래
public class DataClient implements Client {
private final ForkJoinPool forkJoinPool = new ForkJoinPool(16);
private CloseableHttpClient httpClientBuilder;
// initializing httpclient only once
public DataClient() {
try {
RequestConfig requestConfig =
RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500)
.setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build();
SocketConfig socketConfig =
SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build();
PoolingHttpClientConnectionManager poolingHttpClientConnectionManager =
new PoolingHttpClientConnectionManager();
poolingHttpClientConnectionManager.setMaxTotal(300);
poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200);
httpClientBuilder =
HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager)
.setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build();
} catch (Exception ex) {
// log error
}
}
@Override
public List<DataResponse> executeSync(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = executeAsync(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList =
Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT,
DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> executeAsync(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
return this.forkJoinPool.submit(task);
}
}
또한 URL을함으로써 우리의 응용 프로그램 서버를 호출하는 정적 클래스 DataRequestTask
을 가지고 내 DataFetcherTask
클래스입니다 :
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final CloseableHttpClient httpClientBuilder;
public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) {
this.key = key;
this.httpClientBuilder = httpClientBuilder;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
MappingHolder mappings = DataMapping.getMappings(key.getType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
String url = generateUrl(hostname);
HttpGet httpGet = new HttpGet(url);
httpGet.setConfig(generateRequestConfig());
httpGet.addHeader(key.getHeader());
try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) {
HttpEntity entity = response.getEntity();
String responseBody =
TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(),
StandardCharsets.UTF_8);
return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK);
} catch (IOException ex) {
// log error
}
}
return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR);
}
}
}
각 DataRequest
오브젝트에는 DataResponse
오브젝트가 있습니다. 이제 누군가가 DataRequest
객체를 전달하여 라이브러리를 호출하면 내부적으로 List<DataRequest>
객체를 생성 한 다음 각 DataRequest
객체를 병렬로 호출하고 List<DataResponse>
을 반환합니다. 여기서 DataResponse
객체는 해당 DataRequest
객체에 대한 응답을 갖습니다. 다음은
- 고객은
DataRequest
객체를 전달하여DataClient
클래스를 호출합니다. 요구 사항에 따라executeSync()
또는executeAsync()
메서드를 호출 할 수 있습니다. - 이제 단일
DataRequest
인key
주어진 객체 (ForkJoinTask's
RecursiveTask
부속의 하나)에DataFetcherTask
클래스에서, I는List<DataRequest>
를 생성하고 목록 내의 각 객체DataRequest
병렬로 각 서브 태스크를 호출한다. 이러한 하위 작업은 상위 작업과 동일한ForkJoinPool
에서 실행됩니다. - 이제
DataRequestTask
클래스에서 각DataRequest
개체를 URL을 만들고 실행하여 해당DataResponse
개체를 반환합니다.
문제 설명 : 그것은 매우 빠른되어야합니다 있도록이 라이브러리는 매우 높은 처리량 환경에서 호출되고 있기 때문에
. 동기 호출의 경우 별도의 스레드에서 실행하는 것이 좋습니다. 이 경우 스레드의 컨텍스트 전환 비용과 함께 스레드에 대한 추가 비용과 자원이 필요하므로 약간 혼란 스럽습니다.또한 여기에 ForkJoinPool
을 사용하고 있는데, 여분의 스레드 풀을 사용하여 저를 절약 할 수 있습니다. 그러나 여기에 올바른 선택입니까?
같은 효과를 낼 수있는 효율적인 방법이 있습니까? 나는 Java 7을 사용하고 있으며 구아바 라이브러리에 액세스 할 수 있습니다. 그렇다면 어떤 것도 단순화 할 수 있다면 개방형입니다.
아주 무거운 상태에서 실행될 때 우리가 어떤 경합을하는 것처럼 보입니다. 매우 무거운 하중에서 실행될 때이 코드가 스레드 경합에 들어갈 수있는 방법이 있습니까?
[ThreadPool] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html)과 같은 소리가 유용 할 수 있지만 조기 최적화는 모든 악의 근원 –
@ScaryWombat 동의하고 부하 테스트를 할 것이지만 질문은 ThreadPool의 특수 형식 인 ForkJoinPool 만 사용하면 합리적입니다. 그리고 executeSync 메서드를 사용하는 방식이 옳았는지 아닌지? – john
어떤 종류의 경쟁이 있습니까? 어쩌면 무거운 하중에 대한'새로운 ForkJoinPool (16);'충분하지 않다,'16'을 더 큰 값으로 늘리려고하자 – Teg