내 코드는 기본적으로 공식 자습서를 따르며 주된 목적은 한 구독 (Constants.UNFINISHEDSUBID)에서 모든 메시지를 수집하고 다른 구독에 다시 게시하는 것입니다. 그러나 현재 나는 문제를 직면하고있다. 나는 그것을 풀 수 없다. 내 구현에서는 다음과 같은 예외 subscriber.stopAsync()를 호출하면 :Subscriber.stopAsync() 결과 RejectedExecutionException이 발생합니다.
Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener
SCHWERWIEGEND: RuntimeException while executing runnable [email protected] with executor j[email protected]2f3c6ac4
java.util.concurrent.RejectedExecutionException: Task java.[email protected]60d40af2 rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
는 또한 무작위의 종류, 때로는 모든 메시지를 발견하고 때로는 몇 여부를 하나 하나 수집 얻을. subscriber.stopAsync()를 올바른 방식으로 호출하지 않습니까?
내 현재의 구현 :
protected void pullUnfinished() throws Exception {
List<PubsubMessage> jobsToRepublish = new ArrayList<>();
SubscriptionName subscription =
SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID);
MessageReceiver receiver = new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
synchronized(jobsToRepublish){
jobsToRepublish.add(message);
}
String unfinishedJob = message.getData().toStringUtf8();
LOG.info("got message: {}", unfinishedJob);
consumer.ack();
}
};
Subscriber subscriber = null;
try {
ChannelProvider channelProvider = new PlainTextChannelProvider();
subscriber = Subscriber.defaultBuilder(subscription, receiver)
.setChannelProvider(channelProvider)
.build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
System.err.println(failure);
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync(); //Causes the exception
}
}
publishJobs(jobsToRepublish);
}
public class PlainTextChannelProvider implements ChannelProvider {
@Override
public boolean shouldAutoClose() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean needsExecutor() {
// TODO Auto-generated method stub
return false;
}
@Override
public ManagedChannel getChannel() throws IOException {
return NettyChannelBuilder.forAddress("localhost", 8085)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}
@Override
public ManagedChannel getChannel(Executor executor) throws IOException {
return getChannel();
}
}
정지 신호를 추가하십시오. ['startSync']에서 샘플 코드보기 (http://googlecloudplatform.github.io/google-cloud-java/0.9.4/apidocs/com/google/cloud/pubsub/spi/v1/Subscriber.html#stopAsync- -) 구현 방법을 확인하십시오. 또한이 관련 [SO post] (http://stackoverflow.com/a/19006386/5995040)에서 언급했듯이 RejectedExecutionException은 큐가 꽉 차서 더 이상 스레드를 추가 할 수 없거나 ThreadPool이 종료 된 것으로 인해 발생합니다. 코드 구현을 확인하십시오. 희망이 도움이됩니다. –
이 (가) 이미 시도했지만 나에게 변경되지 않았습니다. 여전히 동일한 rejectedExecutionException이 발생합니다. 나는이 [스 니펫]을 시도했습니다 (https://github.com/GoogleCloudPlatform/google-cloud-java/blob/d476ef7904467233e83168b8d1f5a934a0aae711/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets). /ITPubSubSnippets.java) 아직 예외가 있지만 메시지를받는 것처럼 보입니다. 예외로 실행되는 경우가 있습니다. 다른 아이디어가 있습니까? –
subscriber.stopAsync()와 똑같은 문제가 있습니다. 최신 도서관에서 이걸 알아 냈어? – hubpixel