나는 두 가지 방법이 문제가 발생하고있어, 처음 예상대로 작동하지 :폐쇄는 scala.concurrent.Future에 대한 루프에서 두 번째로 다음과 같은 <code>Future</code>을 만드는 두 번째를 호출
public class WorkersCoordinator {
private static Logger LOGGER =
LoggerFactory.getLogger(WorkersCoordinator.class);
private final Timeout timeout;
private final ActorSystem system;
private final List<Class<? extends BaseWorker>> workers;
private final Map<Class, ActorRef> refMap;
private final WorkResultPackageQueue pkgQueue;
private final ActorFactory actorFactory;
@Autowired
public WorkersCoordinator(final ApplicationConfiguration config,
final ActorSystem system,
final ActorFactory actorFactory,
final WorkerFactory workerFactory,
final WorkResultPackageQueue pkgQueue) {
timeout = new Timeout(config.getTimeoutInMilliseconds(),
TimeUnit.MILLISECONDS);
this.system = system;
this.actorFactory = actorFactory;
this.pkgQueue = pkgQueue;
refMap = Map.newHashMap();
workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
}
public void delegateWorkers() {
for (Class<? extends BaseWorker> worker : workers) {
if (refMap.containsKey(worker) continue;
sendWork(worker);
}
}
private void sendWork(Class<? extends BaseWorker> worker) {
// GetDataActor extends AbstractActor
ActorRef actorRef = actorFactory.create(GetDataActor.class);
Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
responseRef.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
LOGGER.error("Worker {} encountered a problem - cancelling.",
worker.getSimpleName());
if (refMap.containsKey(worker)) {
refMap.remove(worker);
}
}
}, system.dispatcher());
responseRef.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
if (msg instanceof WorkResultPackage) {
final WorkResultPackage reportPackage = (WorkResultPackage) msg;
LOGGER.info(
"Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(reportPackage);
} else {
LOGGER.eror(
"Expected to receive WorkResultPackage Object but received: {}",
msg.getClass());
throw new UnknownObjectReceived(msg);
}
}
}, system.dispatcher());
refMap.put(worker, actorRef);
}
}
내가 생각하기에 클로저가 responseRef.onFailure
이 내가 기대했던 것처럼 행동하지 않는다는 것이 문제입니다. 내가 3 명의 노동자와 함께 이것을 부르면 그 중 하나는 실패로 처리되지만, 실패한 것으로 표시된 작업자는 일관되게 실패한 것으로보고되는 작업자에 관해서는 로깅이 불확실합니다. 저는이 기술 스택 (Java, Scala Futures 및 AKKA)에 익숙하지 않으며이 설정된 패턴을 찾을 수있는 확립 된 코드 기반을 사용하므로 Java/Scala Futures에서 클로저를 간과하거나 오해하고 있는지 여부를 알 수 없습니다. 여기서 주목해야 할 점은 어떤 작업자가 실패했는지보고하고 더 이상 프로세스에서 고려하지 않도록 refMap
에서 제거해야한다는 것입니다. 심지어 낯선 사람은 잘못된 작업자가 실패한 것으로보고 된 동안에도 모든 근로자가 완료되어 refMap
에서 제거 된 것처럼 보입니다.
업데이트 : 폐쇄가 작동하지 않는 이유에 대한 답을 얻기에 운이없는, 나는 몇 가지 조사를했고, 자바 (8)도 폐쇄 지원하는지 여부를 응답 또 다른 포스트 발견 후 :
짧은 대답을, 나는 그것을 믿는다. 그러나 final
또는 효과적으로 final
변수라고했습니다. 따라서 다음과 같이 코드를 업데이트했습니다. 다행히도 클로저를 이해하는 사람들이 익숙해 져서 왜 작동하지 않는지 이해하는 데 도움이되기를 바랍니다 (C# 및 JavaScript). 나는 아무런 노력을하지 않으려 고 노력한 것을 강조하기 위해서만 sendWork(...)
에 대한 업데이트를 게시하고 있습니다.
private void sendWork(Class<? extends BaseWorker> worker) {
// GetDataActor extends AbstractActor
ActorRef actorRef = actorFactory.create(GetDataActor.class);
Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
Consumer<Throwable> onFailureClosure = (ex) -> {
LOGGER.error("Worker {} encountered a problem - cancelling.",
worker.getSimpleName());
if (refMap.containsKey(worker)) {
refMap.remove(worker);
}
}
responseRef.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
onFailureClosure.accept(failure);
}
}, system.dispatcher());
responseRef.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
if (msg instanceof WorkResultPackage) {
final WorkResultPackage reportPackage = (WorkResultPackage) msg;
LOGGER.info(
"Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(reportPackage);
} else {
LOGGER.eror(
"Expected to receive WorkResultPackage Object but received: {}",
msg.getClass());
throw new UnknownObjectReceived(msg);
}
}
}, system.dispatcher());
refMap.put(worker, actorRef);
}
이것은 무엇 [: completable - 미래 태그] 함께 할 수 있는가? – shmosel
아마도 scala.concurrent.Future에 대한 태그가 없었기 때문에 그 주제에서 확실히 호출해야했습니다. 당신이 그것에 대해 강하게 생각한다면 그것을 편집하여 삭제하십시오. –
실험적으로 생각해 보면, 앞서 언급 한 Java 문서를 가리키는 훌륭한 사람들이 어떻게 이해 했는가에도 불구하고 클로저는 Java로 구현되지 않았으며, 적어도 완전히는 구현되지 않았습니다. 오히려 가변적 인 리프팅이 있지만 '최종'요구 사항에도 불구하고 비동기를 지원하지 않습니다. –