2017-11-11 7 views
0

akka-streams의 문서를 읽으면 메시지의 순서와 같은 것을 분명히 알지 못합니다. chat-server를 위해 작성한 작은 코드로 내 질문 컨텍스트를 설정해 보겠습니다.akka 스트림을 사용할 때의 이벤트 순서

def flowShape(user: User) = GraphDSL 
    .create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) { 
    implicit builder => 
     implicit chatSource => 

     import GraphDSL.Implicits._ 

     val messageFromOutside = builder.add(Flow[String].map { 
     case msg: String => UserTextMessage(user, msg) 
     case _ => InvalidMessage 
     }) 

     val merge = builder.add(Merge[ChatMessage](2)) 
     // UPDATE --> this is where the change comes 
     // val merge = builder.add(Concat[ChatMessage](2)) 

     // val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user)) 
     val channelActorSink = Sink.actorRef(channelActor, UserLeft(user)) 

     val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) } 

     actorAsSource ~> merge.in(0) 
     messageFromOutside.out ~> merge.in(1) 
     merge ~> channelActorSink 

     FlowShape(messageFromOutside.in, chatSource.out) 
} 

매우 간단한 소스와 싱크로이 흐름 모양을 사용합니다. 이와 비슷한 것 -

val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice")) 
val sink = Sink.foreach[ChatMessage] { 
    case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}") 
    case ul: UserLeft => println(s"${ul.user.username} just left the channel") 
    case uj: UserJoined => println(s"${uj.user.username} just joined the channel") 
    case _ => println(s"do not know what I just received") 
} 

val mychatchannel = new Channel(420, myactorsystem) 

source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink) 

이제 내 관심사가 있습니다. 터미널에 인쇄 된 이벤트의 순서는 전혀 잘못되었습니다. 그리고 그것을 고치는 방법을 모르겠습니다. 첫 번째 메시지 hi 출력에서 ​​누락

[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message 
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 

- 여기에 내가 얻을 출력입니다. hi 메시지는 UserJoin message이 인쇄되기 전에 전송 된 것으로 보입니다.

actorRefWithAck (위 코드에서 주석 처리 한 것)을 사용하여 수정 (메시징에 대한 안전성 추가)을 시도했습니다. 비슷한 결과가 나타납니다. 일어나고있는 것 같다 분명히 어떤

[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events 
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message 
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message 
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message 

UserJoin 메시지가 전송되기 전에 소스가 메시지를 보내는 것입니다. 이 문제를 어떻게 해결할 수 있습니까? 개념적으로, 소스가 구체화 되 자마자 실제로 첫 번째 메시지를 보내기 전에 UserJoin message을 보내려고합니다. 그게 가능하니?

감사 수도관 등의 스트림의

답변

0

생각해 : 물이있을 때,이 흐를 것이다. 병합 연산자는 어느 쪽의 요소가오고 있는지 신경 쓰지 않습니다. 이러한 입력을 주문하려면 Concat을 대신 사용하여 Akka에 알려야합니다.

+0

감사합니다. 이것은 나를 도왔고 마침내 문제를 해결했습니다. – shashydhar