2017-01-25 10 views
1

다른 클러스터 시스템에 분산 된 pub-sub를 만들려고합니다.하지만 시도 할 때마다 작동하지 않습니다.오류 : 액터에서 액터로의 메시지가 배달되지 않았습니다. [1] 죽은 글자가 발생했습니다. 분산 된 pub-sub가 작동하지 않는 클러스터에서 작동합니다.

내가하려는 것은 간단한 예제를 만드는 것입니다.

1) "content"라는 주제를 만듭니다.

2) jvm A의 한 노드가 토픽을 만들고, 구독하고, 퍼블리셔를 발표합니다.

3) 다른 노드에서 다른 포트의 jvm B라고 말하면 가입자를 만듭니다.

4) jvm A에서 주제에 대한 메시지를 보낸 경우 jvm B의 구독자가 동일한 주제에 가입 한 상태로 수신하도록합니다.

Java의 다른 포트에있는 다른 클러스터 시스템에있는 구독자와 게시자가있는 분산 된 pub sub의 간단한 예제가 도움이 될 것입니다.

여기 app1 및 config 파일의 코드입니다.

public class App1{ 

    public static void main(String[] args) { 

    System.setProperty("akka.remote.netty.tcp.port", "2551"); 
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem"); 
    ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem); 
    ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1"); 
    clusterClientReceptionist1.registerSubscriber("content", subcriber1); 
    ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1"); 
    clusterClientReceptionist1.registerSubscriber("content", publisher1); 
    publisher1.tell("testMessage1", ActorRef.noSender()); 

    } 
} 

APP2에 대한 app1.confi

akka { 
loggers = ["akka.event.slf4j.Slf4jLogger"] 
loglevel = "DEBUG" 
stdout-loglevel = "DEBUG" 
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 
actor { 
provider = "akka.cluster.ClusterActorRefProvider" 
} 
remote { 
log-remote-lifecycle-events = off 
enabled-transports = ["akka.remote.netty.tcp"] 
netty.tcp { 
    hostname = "127.0.0.1" 
    port = 2551 
    } 
} 
cluster { 
seed-nodes = [ 
    "akka.tcp://[email protected]:2551" 
] 
auto-down-unreachable-after = 10s 
} 
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub", 
"akka.contrib.pattern.ClusterReceptionistExtension"] 
    akka.cluster.pub-sub { 
name = distributedPubSubMediator 
role = "" 
routing-logic = random 
gossip-interval = 1s 
removed-time-to-live = 120s 
max-delta-elements = 3000 
use-dispatcher = "" 
} 

akka.cluster.client.receptionist { 
name = receptionist 
role = "" 
number-of-contacts = 3 
response-tunnel-receive-timeout = 30s 
use-dispatcher = "" 
heartbeat-interval = 2s 
acceptable-heartbeat-pause = 13s 
failure-detection-interval = 2s 
    } 
} 

코드와 그 설정 파일

public class App 
{ 
    public static Set<ActorPath> initialContacts() { 
    return new HashSet<ActorPath>(Arrays.asList(   
    ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist"))); 
} 

public static void main(String[] args) { 
    System.setProperty("akka.remote.netty.tcp.port", "2553"); 
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2"); 
    ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem); 
    final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
      clusterSystem).withInitialContacts(initialContacts())), "client"); 
    ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2"); 
    clusterClientReceptionist2.registerSubscriber("content", subcriber2); 
    ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2"); 
    publisher2.tell("testMessage2", ActorRef.noSender()); 
    clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null); 

} 
}    

app2.confi

akka { 
loggers = ["akka.event.slf4j.Slf4jLogger"] 
loglevel = "DEBUG" 
stdout-loglevel = "DEBUG" 
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 
actor { 
provider = "akka.cluster.ClusterActorRefProvider" 
} 
remote { 
log-remote-lifecycle-events = off 
enabled-transports = ["akka.remote.netty.tcp"] 
netty.tcp { 
    hostname = "127.0.0.1" 
    port = 2553 
    } 
} 
cluster { 
seed-nodes = [ 
    "akka.tcp://[email protected]:2553" 
] 
auto-down-unreachable-after = 10s 
} 
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub", 
"akka.contrib.pattern.ClusterReceptionistExtension"] 
    akka.cluster.pub-sub { 
name = distributedPubSubMediator 
role = "" 
routing-logic = random 
gossip-interval = 1s 
removed-time-to-live = 120s 
max-delta-elements = 3000 
use-dispatcher = "" 
} 

akka.cluster.client.receptionist { 
name = receptionist 
role = "" 
number-of-contacts = 3 
response-tunnel-receive-timeout = 30s 
use-dispatcher = "" 
heartbeat-interval = 2s 
acceptable-heartbeat-pause = 13s 
failure-detection-interval = 2s 
    } 
} 

게시자와 구독자 클래스는 모두 응용 프로그램에 대해 동일합니다 이것은 아래에 주어진다.

제작사 :

가입자
public class Publisher extends UntypedActor { 
private final ActorRef mediator = 
     DistributedPubSub.get(getContext().system()).mediator(); 

@Override 
public void onReceive(Object msg) throws Exception { 
    if (msg instanceof String) { 
     mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf()); 
    } else { 
     unhandled(msg); 
    } 
} 

} 

: 두 응용 프로그램을 실행하는 동안

public class Subscriber extends UntypedActor { 
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 

public Subscriber(){ 

    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); 
    mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf()); 

} 

public void onReceive(Object msg) throws Throwable { 
    if (msg instanceof String) { 
     log.info("Got: {}", msg); 
    } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { 
     log.info("subscribing"); 
    } else { 
     unhandled(msg); 
    } 
} 
} 

내가 수신기 측 응용 프로그램에서이 오류가 발생했습니다. 죽은 문자

[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

와 발신자 측 응용 프로그램 메시지 로그에 표시됩니다 성공적으로 보내 encounterd.

[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist 

답변

0

정말 이해가되지 않는 방법으로 ClusterClient를 사용하고, 분산 술집 서브를 사용하여 두하여 노드가 클러스터의 일부로서 당신은 단지 분산 술집 서브를 사용할 수와 함께 할 아무것도하지 않습니다 API를 직접.여기

는 예상대로 작동이 정확한 게시자와 구독자 배우를 사용하여 두 개의 노드 클러스터를 만드는 간단한 주를 포함하여 구성이다 : 배달의 보증을 제공하지 않습니다 술집 하위 분산

public static void main(String[] args) throws Exception { 

    final Config config = ConfigFactory.parseString(
    "akka.actor.provider=cluster\n" + 
    "akka.remote.netty.tcp.port=2551\n" + 
    "akka.cluster.seed-nodes = [ \"akka.tcp://[email protected]:2551\"]\n"); 

    ActorSystem node1 = ActorSystem.create("ClusterSystem", config); 
    ActorSystem node2 = ActorSystem.create("ClusterSystem", 
    ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") 
     .withFallback(config)); 

    // wait a bit for the cluster to form 
    Thread.sleep(3000); 

    ActorRef subscriber = node1.actorOf(
    Props.create(Subscriber.class), 
    "subscriber"); 

    ActorRef publisher = node2.actorOf(
    Props.create(Publisher.class), 
    "publisher"); 

    // wait a bit for the subscription to be gossiped 
    Thread.sleep(3000); 

    publisher.tell("testMessage1", ActorRef.noSender()); 
} 

주, 그렇다면 중재자가 서로 접촉하기 전에 메시지를 보내면 메시지가 사라집니다 (따라서 Thread.sleep 문, 실제 코드에서 수행해야하는 것은 아닙니다).