내가 통해 얻은 나가는 스트림 TCP 연결을 통해 메시지 스트림 코드를 테스트 해요 : 나는 더미 가입자와 결과 Subscriber[ByteString] 대체 어떤 보내는 메시지를 트리거, 내 테스트에서 (IO(StreamTcp) ? StreamTcp.Connect(settings, address))
.mapTo[StreamTcp.OutgoingTcpCo
API 응답에 의존하는 흐름이 있습니다. 응답이 예상 한 것과 일치하지 않으면 예외가 발생합니다. 이 전략은 Spray와 specs2를 사용한 직접 테스트 방법에 적합합니다. 그러나 예외를 throw하는 예외가있는 스트림을 사용하려고하면 흐름이 중단됩니다. 이것에 대한 Source(() => file)
.via(csvToSeq)
.vi
내 스트림에는 CPU 바인딩 스테이지와 IO 바인딩 스테이지가 균등하게 혼합되어 있습니다 (IO 스테이지마다 CPU 스테이지가 뒤 따른다). 내가하고 싶은 일은 IO 작업을 나머지 스트림과 다른 디스패처에 넣는 것입니다. 전통적인 액터 기반 Akka 응용 프로그램에서 많은 수의 스레드가있는 고정 된 스레드 풀 디스패처에 IO 액터를 넣을 수 있습니다. CP
내 응용 프로그램에서는 여러 HDFS 노드의 데이터를 가져 오는 스레드가 여러 개 있어야합니다. 이를 위해 나는 스레드 실 행자 풀과 포크 스레드를 사용하고 있습니다. 에서 포크 (fork) : val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Ma
나는 다음과 같은 흐름이 있습니다합니다 (Sink.actorRef 하나)을 Actor 내 val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)
val targetSink = Flow[ByteString]
.map(_.utf8String)
.via(new JsonStag
나는 Sink 및 Source을 제공하는 SinkSource을 찾고 있습니다. 요소가 Sink으로 유입되면 해당 요소는 Source에 제공되어야합니다. 실행하면 object SinkSource {
def apply[T] = new {
def sink: Sink[T] = ???
def source: Source[T] = ???
실험적인 Akka Streams API를 조금 놀았으며 구현 방법을 알고 싶었습니다. 내 유스 케이스의 경우 입력 스트림을 내 서버 소켓에 바인딩 할 때 StreamTcp을 기반으로 Flow을 제공합니다. 내가 가진 흐름은 ByteString 데이터를 기반으로합니다. 들어오는 데이터에는 구분 기호가 있기 때문에 구분 기호 앞에있는 모든 내용을 하나의 메시지