다른 클러스터 시스템에 분산 된 pub-sub를 만들려고합니다.하지만 시도 할 때마다 작동하지 않습니다.오류 : 액터에서 액터로의 메시지가 배달되지 않았습니다. [1] 죽은 글자가 발생했습니다. 분산 된 pub-sub가 작동하지 않는 클러스터에서 작동합니다.
내가하려는 것은 간단한 예제를 만드는 것입니다.
1) "content"라는 주제를 만듭니다.
2) jvm A의 한 노드가 토픽을 만들고, 구독하고, 퍼블리셔를 발표합니다.
3) 다른 노드에서 다른 포트의 jvm B라고 말하면 가입자를 만듭니다.
4) jvm A에서 주제에 대한 메시지를 보낸 경우 jvm B의 구독자가 동일한 주제에 가입 한 상태로 수신하도록합니다.
Java의 다른 포트에있는 다른 클러스터 시스템에있는 구독자와 게시자가있는 분산 된 pub sub의 간단한 예제가 도움이 될 것입니다.
여기 app1 및 config 파일의 코드입니다.
는public class App1{
public static void main(String[] args) {
System.setProperty("akka.remote.netty.tcp.port", "2551");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
clusterClientReceptionist1.registerSubscriber("content", subcriber1);
ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
clusterClientReceptionist1.registerSubscriber("content", publisher1);
publisher1.tell("testMessage1", ActorRef.noSender());
}
}
APP2에 대한 app1.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
코드와 그 설정 파일
public class App
{
public static Set<ActorPath> initialContacts() {
return new HashSet<ActorPath>(Arrays.asList(
ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist")));
}
public static void main(String[] args) {
System.setProperty("akka.remote.netty.tcp.port", "2553");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
clusterSystem).withInitialContacts(initialContacts())), "client");
ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
clusterClientReceptionist2.registerSubscriber("content", subcriber2);
ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
publisher2.tell("testMessage2", ActorRef.noSender());
clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);
}
}
app2.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
게시자와 구독자 클래스는 모두 응용 프로그램에 대해 동일합니다 이것은 아래에 주어진다.
제작사 :
가입자public class Publisher extends UntypedActor {
private final ActorRef mediator =
DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof String) {
mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
} else {
unhandled(msg);
}
}
}
: 두 응용 프로그램을 실행하는 동안
public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber(){
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());
}
public void onReceive(Object msg) throws Throwable {
if (msg instanceof String) {
log.info("Got: {}", msg);
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
log.info("subscribing");
} else {
unhandled(msg);
}
}
}
내가 수신기 측 응용 프로그램에서이 오류가 발생했습니다. 죽은 문자
[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
와 발신자 측 응용 프로그램 메시지 로그에 표시됩니다 성공적으로 보내 encounterd.
[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist