akka-stream

    2

    1답변

    내가 통해 얻은 나가는 스트림 TCP 연결을 통해 메시지 스트림 코드를 테스트 해요 : 나는 더미 가입자와 결과 Subscriber[ByteString] 대체 어떤 보내는 메시지를 트리거, 내 테스트에서 (IO(StreamTcp) ? StreamTcp.Connect(settings, address)) .mapTo[StreamTcp.OutgoingTcpCo

    4

    2답변

    API 응답에 의존하는 흐름이 있습니다. 응답이 예상 한 것과 일치하지 않으면 예외가 발생합니다. 이 전략은 Spray와 specs2를 사용한 직접 테스트 방법에 적합합니다. 그러나 예외를 throw하는 예외가있는 스트림을 사용하려고하면 흐름이 중단됩니다. 이것에 대한 Source(() => file) .via(csvToSeq) .vi

    5

    1답변

    내 스트림에는 CPU 바인딩 스테이지와 IO 바인딩 스테이지가 균등하게 혼합되어 있습니다 (IO 스테이지마다 CPU 스테이지가 뒤 따른다). 내가하고 싶은 일은 IO 작업을 나머지 스트림과 다른 디스패처에 넣는 것입니다. 전통적인 액터 기반 Akka 응용 프로그램에서 많은 수의 스레드가있는 고정 된 스레드 풀 디스패처에 IO 액터를 넣을 수 있습니다. CP

    0

    1답변

    내 응용 프로그램에서는 여러 HDFS 노드의 데이터를 가져 오는 스레드가 여러 개 있어야합니다. 이를 위해 나는 스레드 실 행자 풀과 포크 스레드를 사용하고 있습니다. 에서 포크 (fork) : val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Ma

    2

    1답변

    TLS에서 TCP 연결을 설정하려고하지만 최근 문서를 찾을 수 없습니다. I found something about Akka streams 2.2는 2.4.x 용으로 뭔가를 찾을 수있었습니다. 나는 this 문서 감사에서 찾고있다!

    0

    2답변

    나는 다음과 같은 흐름이 있습니다합니다 (Sink.actorRef 하나)을 Actor 내 val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead) val targetSink = Flow[ByteString] .map(_.utf8String) .via(new JsonStag

    1

    1답변

    나는 Sink 및 Source을 제공하는 SinkSource을 찾고 있습니다. 요소가 Sink으로 유입되면 해당 요소는 Source에 제공되어야합니다. 실행하면 object SinkSource { def apply[T] = new { def sink: Sink[T] = ??? def source: Source[T] = ???

    10

    4답변

    실험적인 Akka Streams API를 조금 놀았으며 구현 방법을 알고 싶었습니다. 내 유스 케이스의 경우 입력 스트림을 내 서버 소켓에 바인딩 할 때 StreamTcp을 기반으로 Flow을 제공합니다. 내가 가진 흐름은 ByteString 데이터를 기반으로합니다. 들어오는 데이터에는 구분 기호가 있기 때문에 구분 기호 앞에있는 모든 내용을 하나의 메시지