실험적인 Akka Streams API를 조금 놀았으며 구현 방법을 알고 싶었습니다. 내 유스 케이스의 경우 입력 스트림을 내 서버 소켓에 바인딩 할 때 StreamTcp
을 기반으로 Flow
을 제공합니다. 내가 가진 흐름은 ByteString
데이터를 기반으로합니다. 들어오는 데이터에는 구분 기호가 있기 때문에 구분 기호 앞에있는 모든 내용을 하나의 메시지로 취급해야하며 모든 메시지는 다음 메시지로 다음 분리 문자까지 처리해야합니다. 그래서 어떤 소켓과 정적 인 텍스트를 사용하지 않고, 간단한 예와 함께 장난이 내가 생각 해낸 것입니다 :Akka Streams를 사용하여 구분 문자에서 인바운드 스트림을 분할하는 방법
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
내 목표는 다음 생산 splitWhen
을했다 달성하기 위해 발견 된 Flow
의 주요 기능 추가 서브 플로우는 각 메시지에 대해 하나씩 해당 .
분리 문자입니다. 그런 다음 각 하위 흐름을 다른 단계의 파이프 라인으로 처리하고 최종적으로 개별 메시지를 인쇄합니다.
이 모든 것이 매우 단순하고 일반적인 사용 사례로 생각되는 것을 성취하기 위해 약간 장황 해 보입니다. 그래서 내 질문에, 거기에 깨끗하고 덜 자세한 방법이 일을하거나 올바른 및 선호하는 방법을 구분 기호로 스트림을 분할하는 것입니다?
완벽! 이것은 받아 들여진 대답이어야합니다. 청크에서도 작동합니다.그것을 시도 : '브로 제 = ByteString을'' 브로 제 ByteString = (("로렘 입숨가 printing.And 조판의 simply.Dummy 텍스트 인") "는 industry.More by.a 기간 text.delimited. ")' –