2017-10-06 5 views
1

Akka websockets을 사용하여 일부 클라이언트에 데이터를 푸시합니다. 액자를 사용하여 Akka 웹 소켓에 데이터 전송

내가 지금까지 무엇을했는지이다! '안녕하세요'클라이언트 측에서
import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.TimeUnit; 

import akka.NotUsed; 
import akka.actor.ActorSystem; 
import akka.http.javadsl.ConnectHttp; 
import akka.http.javadsl.Http; 
import akka.http.javadsl.ServerBinding; 
import akka.http.javadsl.model.HttpRequest; 
import akka.http.javadsl.model.HttpResponse; 
import akka.http.javadsl.model.ws.Message; 
import akka.http.javadsl.model.ws.WebSocket; 
import akka.japi.Function; 
import akka.stream.ActorMaterializer; 
import akka.stream.Materializer; 
import akka.stream.javadsl.Flow; 
import akka.stream.javadsl.Sink; 
import akka.stream.javadsl.Source; 

public class Server { 

    public static HttpResponse handleRequest(HttpRequest request) { 
    System.out.println("Handling request to " + request.getUri()); 
    if (request.getUri().path().equals("/greeter")) { 
     final Flow<Message, Message, NotUsed> greeterFlow = greeterHello(); 
     return WebSocket.handleWebSocketRequestWith(request, greeterFlow); 
    } else { 
     return HttpResponse.create().withStatus(404); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    ActorSystem system = ActorSystem.create(); 

    try { 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request); 
     CompletionStage<ServerBinding> serverBindingFuture = Http.get(system).bindAndHandleSync(handler, 
      ConnectHttp.toHost("localhost", 8080), materializer); 

     // will throw if binding fails 
     serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); 
     System.out.println("Press ENTER to stop."); 
     new BufferedReader(new InputStreamReader(System.in)).readLine(); 
    } finally { 
     system.terminate(); 
    } 
    } 

    public static Flow<Message, Message, NotUsed> greeterHello() { 
    return Flow.fromSinkAndSource(Sink.ignore(), 
     Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!"))); 
    } 
} 

, 나는 성공적으로 수신하고 메시지. 그러나 지금은 이런 식으로, (액터 바람직) 동적으로 뭔가를 데이터를 전송하려면 :

import akka.actor.ActorRef; 
import akka.actor.UntypedActor; 

public class PushActor extends UntypedActor { 
    @Override 
    public void onReceive(Object message) { 
    if (message instanceof String) { 
     String statusChangeMessage = (String) message; 
     // How to push this message to a socket ?? 
    } else { 
     System.out.println(String.format("'%s':\nReceived unknown message '%s'!", selfActorPath, message)); 
    } 
    } 

} 

나는이 온라인에 관한 예를 찾을 수 없습니다입니다. 다음

사용하는 소프트웨어 스택이다 :

답변

0

한 - 반드시 매우 우아하지 -이 일의 방법은 Source.actorRef을 사용하고 구체화을 보낼 수 있습니다 배우 어딘가에 (아마도 라우터 액터?) 귀하의 요구 사항에 따라.

public static Flow<Message, Message, NotUsed> greeterHello() { 
    return Flow.fromSinkAndSourceMat(Sink.ignore(), 
     Source.actorRef(100, OverflowStrategy.fail()), 
     Keep.right()).mapMaterializedValue(/* send your actorRef to a router? */); 
} 

연결된 클라이언트의 actorRef를받는 사람은 누구나 메시지를 라우팅해야합니다.