2017-12-11 37 views
3

저는 Akka Distributed Pub/Sub를 사용하고 있으며 발행인과 가입자가 하나입니다. 내 발행인은 구독자보다 훨씬 빠릅니다. 특정 시점 이후에 게시자의 속도를 늦출 수있는 방법이 있습니까?Akka Distributed Pub/Sub 백 프레셔

게시자 코드 :

public class Publisher extends AbstractActor { 
    private ActorRef mediator; 

    static public Props props() { 
     return Props.create(Publisher.class,() -> new Publisher()); 
    } 

    public Publisher() { 
     this.mediator = DistributedPubSub.get(getContext().system()).mediator(); 
     this.self().tell(0, ActorRef.noSender()); 
    } 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
      .match(Integer.class, msg -> { 
       // Sending message to Subscriber 
       mediator.tell(
        new DistributedPubSubMediator.Send(
         "/user/" + Subscriber.class.getName(), 
         msg.toString(), 
         false), 
        getSelf()); 

       getSelf().tell(++msg, ActorRef.noSender()); 
      }) 
      .build(); 
    } 
} 

가입자 코드 :

public class Subscriber extends AbstractActor { 
    static public Props props() { 
     return Props.create(Subscriber.class,() -> new Subscriber()); 
    } 

    public Subscriber() { 
     ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); 
     mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf()); 
    } 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
      .match(String.class, msg -> { 
       System.out.println("Subscriber message received: " + msg); 
       Thread.sleep(10000); 
      }) 
      .build(); 
    } 
} 

답변

1

불행하게도, 같은 현재 설계, 나는 원래 "배압"를 제공 할 수있는 방법이 있다고 생각하지 않는다 보내는 사람. ActorRef.tell을 사용하여 mediator으로 메시지를 보내므로 다운 스트림 수신기가 백업한다는 신호를받을 방법이 없습니다. 이는 사용중인 방법이 tell이고 void을 반환하기 때문입니다. 당신이 당신의 tell 최소한 특정 기간 내에 응답을받지 못한 때 알려드립니다 적절한 Timeout 값을 설정할 수 있습니다 ask로 전환하면

스위치

를 요청합니다. 스트림

"Back-pressure" is a primary feature of akka streams으로

전환합니다. 따라서 스트림 구현으로 전환하면 원하는 목표를 달성 할 수 있습니다. 가능 원본 데이터에서 스트림 Source을 만들 경우

, 당신은 mediator에서 Sink을 만들 Sink.actorRef을 사용하고 중재자로 흐름의 속도를 제어 할 수 Flow.throttle를 사용할 수 있습니다.