2017-11-24 7 views
1

나는 사용하고있다 Akka Stream 2.5.5. 나는 Stream을 가지고 있습니다.Stream 구독자는 구독을 언제 취소합니까?

  • 액터가 스트림에 대한 소스 역할을합니다. 나는 이것을 사용했다 : Source.actorPublisher
  • 스트림에는 여러 단계가있다. map, collect 등의 경우, 발생할 수있는 예외를 처리하기 위해 Recover을 사용했습니다. 내가 스트림을 실행하면 이제 .withAttributes(supervisionStrategy(resumingDecider)))

, 내 소스 배우로 전파 akka.stream.actor.ActorPublisherMessage.Cancel 무엇입니까 : mapAsync()에서 오류를 처리하기위한

  • , 나는 같은 감독의 전략을 사용하고 있습니다. 워드 프로세서에서 :

    /** 
        * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the 
        * subscription. 
        */ 
        final case object Cancel extends Cancel with NoSerializationVerificationNeeded 
        sealed abstract class Cancel extends ActorPublisherMessage 
    

    는 놀랍게도, 스트림의 모든 stage 던져에는 어떤 Exception이 없습니다. 그래서 나는 이해하지 못하고있다. why the stream subscriber cancels the subscription. 따라서 내 스트림이 실패한 이유에 대해 정확히 Cause 또는 Error을 찾을 수 없습니다.

    이 시나리오에 대한 통찰력과 추론은 매우 유용합니다.

  • +0

    '싱크'이외의 모든 것을 설명했습니다. 스트림에 대한 코드도 함께 보여 주면 도움이 될 것입니다. –

    답변

    0

    마침내 내 스트림의 마지막 단계로 recover stage을 사용하여 원인을 발견했습니다. 이전에는 오류를 기록하고 마지막으로 원하는 값을 전달하는 스트림의 각 단계 이후에 여러 개의 recover 단계를 사용했습니다.

    지금, 내가 좋아 아래에 보이는 내 스트림의 마지막에 하나의 recover stage를 사용하는이를 추가 한 후

    .recover { 
          case e => 
          println("****************************") 
          println("Throwing Exception. Cause: "+e.getMessage) 
          println("****************************") 
          throw e 
         } 
    

    , 실제 예외가 발생하고 난 실제 원인을 발견했다. 제 경우에는 무대의 하나가 예외를 던진 코드를 가지고있었습니다.