2017-05-04 12 views
0

내 코드는 기본적으로 공식 자습서를 따르며 주된 목적은 한 구독 (Constants.UNFINISHEDSUBID)에서 모든 메시지를 수집하고 다른 구독에 다시 게시하는 것입니다. 그러나 현재 나는 문제를 직면하고있다. 나는 그것을 풀 수 없다. 내 구현에서는 다음과 같은 예외 subscriber.stopAsync()를 호출하면 :Subscriber.stopAsync() 결과 RejectedExecutionException이 발생합니다.

Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener 
SCHWERWIEGEND: RuntimeException while executing runnable [email protected] with executor j[email protected]2f3c6ac4 
java.util.concurrent.RejectedExecutionException: Task java.[email protected]60d40af2 rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) 
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) 
    at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) 
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) 
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817) 
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753) 
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613) 
    at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458) 
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437) 
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428) 
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76) 
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514) 
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431) 
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546) 
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52) 
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

는 또한 무작위의 종류, 때로는 모든 메시지를 발견하고 때로는 몇 여부를 하나 하나 수집 얻을. subscriber.stopAsync()를 올바른 방식으로 호출하지 않습니까?

내 현재의 구현 :

protected void pullUnfinished() throws Exception { 
    List<PubsubMessage> jobsToRepublish = new ArrayList<>(); 
    SubscriptionName subscription = 
      SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID); 

    MessageReceiver receiver = new MessageReceiver() { 
     @Override 
     public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { 
      synchronized(jobsToRepublish){ 
       jobsToRepublish.add(message); 
      } 
      String unfinishedJob = message.getData().toStringUtf8(); 
      LOG.info("got message: {}", unfinishedJob); 
      consumer.ack(); 
     } 
    }; 

    Subscriber subscriber = null; 
    try { 
     ChannelProvider channelProvider = new PlainTextChannelProvider(); 
     subscriber = Subscriber.defaultBuilder(subscription, receiver) 
           .setChannelProvider(channelProvider) 
           .build(); 
     subscriber.addListener(new Subscriber.Listener() { 
      @Override 
      public void failed(Subscriber.State from, Throwable failure) { 
       System.err.println(failure); 
      } 
     }, MoreExecutors.directExecutor()); 
     subscriber.startAsync().awaitRunning(); 
     Thread.sleep(60000); 
    } finally { 
     if (subscriber != null) { 
      subscriber.stopAsync(); //Causes the exception 
     } 
    } 
    publishJobs(jobsToRepublish); 
} 

public class PlainTextChannelProvider implements ChannelProvider { 

    @Override 
    public boolean shouldAutoClose() { 
     // TODO Auto-generated method stub 
     return false; 
    } 

    @Override 
    public boolean needsExecutor() { 
     // TODO Auto-generated method stub 
     return false; 
    } 

     @Override 
     public ManagedChannel getChannel() throws IOException { 
     return NettyChannelBuilder.forAddress("localhost", 8085) 
      .negotiationType(NegotiationType.PLAINTEXT) 
      .build(); 
     } 

     @Override 
     public ManagedChannel getChannel(Executor executor) throws IOException { 
     return getChannel(); 
     } 
} 
+0

정지 신호를 추가하십시오. ['startSync']에서 샘플 코드보기 (http://googlecloudplatform.github.io/google-cloud-java/0.9.4/apidocs/com/google/cloud/pubsub/spi/v1/Subscriber.html#stopAsync- -) 구현 방법을 확인하십시오. 또한이 관련 [SO post] (http://stackoverflow.com/a/19006386/5995040)에서 언급했듯이 RejectedExecutionException은 큐가 꽉 차서 더 이상 스레드를 추가 할 수 없거나 ThreadPool이 종료 된 것으로 인해 발생합니다. 코드 구현을 확인하십시오. 희망이 도움이됩니다. –

+0

이 (가) 이미 시도했지만 나에게 변경되지 않았습니다. 여전히 동일한 rejectedExecutionException이 발생합니다. 나는이 [스 니펫]을 시도했습니다 (https://github.com/GoogleCloudPlatform/google-cloud-java/blob/d476ef7904467233e83168b8d1f5a934a0aae711/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets). /ITPubSubSnippets.java) 아직 예외가 있지만 메시지를받는 것처럼 보입니다. 예외로 실행되는 경우가 있습니다. 다른 아이디어가 있습니까? –

+0

subscriber.stopAsync()와 똑같은 문제가 있습니다. 최신 도서관에서 이걸 알아 냈어? – hubpixel

답변

0

나는 JUnit 테스트에서 비슷한 코드를 실행하는 동일한 문제가 있고, 수신기는 여전히 참조하는 동안 ThreadPool이가 닫혀 있음을 시사 일반적으로 멀티 스레딩에이 related answer 발견 그것에. 또한 GitHub의 Subscriber.java 코드를 살펴본 후 startAsync()에서 JavaDoc에있는 여러 메시지를 수신하여 stopAsync()가 종료 될 때까지 기다리는 것이 좋습니다.

에 한번 나를 위해 일한

subscriber.stopAsync(); 

subscriber.stopAsync().awaitTerminated(); 

을 변경.