2014-09-02 9 views
10

실험적인 Akka Streams API를 조금 놀았으며 구현 방법을 알고 싶었습니다. 내 유스 케이스의 경우 입력 스트림을 내 서버 소켓에 바인딩 할 때 StreamTcp을 기반으로 Flow을 제공합니다. 내가 가진 흐름은 ByteString 데이터를 기반으로합니다. 들어오는 데이터에는 구분 기호가 있기 때문에 구분 기호 앞에있는 모든 내용을 하나의 메시지로 취급해야하며 모든 메시지는 다음 메시지로 다음 분리 문자까지 처리해야합니다. 그래서 어떤 소켓과 정적 인 텍스트를 사용하지 않고, 간단한 예와 함께 장난이 내가 생각 해낸 것입니다 :Akka Streams를 사용하여 구분 문자에서 인바운드 스트림을 분할하는 방법

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys") 

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    Flow(data). 
     splitWhen(c => c == '.'). 
     foreach{producer => 
     Flow(producer). 
      filter(c => c != '.'). 
      fold(new StringBuilder)((sb, c) => sb.append(c.toChar)). 
      map(_.toString). 
      filter(!_.isEmpty). 
      foreach(println(_)). 
      consume(FlowMaterializer(MaterializerSettings())) 
     }. 
     onComplete(FlowMaterializer(MaterializerSettings())) { 
     case any => 
      system.shutdown 
     } 
    } 
} 

내 목표는 다음 생산 splitWhen을했다 달성하기 위해 발견 된 Flow의 주요 기능 추가 서브 플로우는 각 메시지에 대해 하나씩 해당 . 분리 문자입니다. 그런 다음 각 하위 흐름을 다른 단계의 파이프 라인으로 처리하고 최종적으로 개별 메시지를 인쇄합니다.

이 모든 것이 매우 단순하고 일반적인 사용 사례로 생각되는 것을 성취하기 위해 약간 장황 해 보입니다. 그래서 내 질문에, 거기에 깨끗하고 덜 자세한 방법이 일을하거나 올바른 및 선호하는 방법을 구분 기호로 스트림을 분할하는 것입니다?

답변

1

Akka 사용자 그룹에이 질문을 게시 한 후 Endre Varga와 Viktor Klang (https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE)으로부터 몇 가지 제안을 받았습니다. Endre가 제안한 Transformer을 입력 한 다음 Flowtransform 메서드를 사용했습니다. 내 이전 예제의 약간 수정 된 버전은 아래에 포함되어 있습니다 : PeriodDelimitedTransformer의 정의 인으로

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 
import akka.stream.Transformer 
import akka.util.ByteStringBuilder 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys")       
    implicit val mater = FlowMaterializer(MaterializerSettings()) 

    val data = List(
     ByteString("Lorem Ipsum is"), 
     ByteString(" simply.Dummy text of.The prin"), 
     ByteString("ting.And typesetting industry.") 
    ) 
    Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_)) 
    } 
} 

다음 : 그래서 내 이전 솔루션의 복잡성 중 일부는이 Transformer에 말려

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{ 
    val buffer = new ByteStringBuilder 

    def onNext(msg:ByteString) = {  
    val msgString = msg.utf8String 
    val delimIndex = msgString.indexOf('.') 
    if (delimIndex == -1){ 
     buffer.append(msg) 
     List.empty 
    } 
    else{ 
     val parts = msgString.split("\\.") 
     val endsWithDelim = msgString.endsWith(".") 

     buffer.putBytes(parts.head.getBytes()) 
     val currentPiece = buffer.result.utf8String    
     val otherPieces = parts.tail.dropRight(1).toList 

     buffer.clear 
     val lastPart = 
     if (endsWithDelim){ 
      List(parts.last) 
     } 
     else{ 
      buffer.putBytes(parts.last.getBytes()) 
      List.empty 
     }   


     val result = currentPiece :: otherPieces ::: lastPart 
     result 
    } 

    } 
} 

을 , 그러나 이것은 최선의 접근법처럼 보입니다. 필자의 초기 솔루션에서는 스트림이 여러 개의 하위 스트림으로 분할되는 결과를 낳았습니다.

10

API가 최근에 개선되어 akka.stream.scaladsl.Framing이 포함 된 것으로 보입니다. 설명서에는 사용 방법에 대한 example도 포함되어 있습니다.

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Source} 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

object TcpDelimiterBasedMessaging extends App { 
    object chunks { 
    val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 
    val second = ByteString("More text.delimited by.a period.") 
    } 

    implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference()) 
    implicit val dispatcher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    Source(chunks.first :: chunks.second :: Nil) 
    .via(Framing.delimiter(ByteString("."), Int.MaxValue)) 
    .map(_.utf8String) 
    .runForeach(println) 
    .onComplete(_ => system.terminate()) 
} 

다음과 같은 출력을 생성합니다 : 특정 문제에 관한 Lorem Ipsum is simply Dummy text of the printing And typesetting industry More text delimited by a period

+0

완벽! 이것은 받아 들여진 대답이어야합니다. 청크에서도 작동합니다.그것을 시도 : '브로 제 = ByteString을'' 브로 제 ByteString = (("로렘 입숨가 printing.And 조판의 simply.Dummy 텍스트 인") "는 industry.More by.a 기간 text.delimited. ")' –

0

나는 Framing의 안드레이의 사용이 질문에 대한 가장 좋은 해결책입니다하지만 난 비슷한 문제를 가지고 너무 제한하는 Framing을 찾은 것 같아요. 내가 대신 statefulMapConcat을 사용하여 원하는 규칙을 사용하여 입력 ByteString을 그룹화 할 수 있습니다. 다음 경우에 귀하의 질문에 대한 코드의 IT 도움이 사람 :

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Source} 
import akka.util.ByteString 

object BasicTransformation extends App { 

    implicit val system = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 
    implicit val dispatcher = system.dispatcher 
    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    val grouping = Flow[Byte].statefulMapConcat {() => 
    var bytes = ByteString() 
    byt => 
     if (byt == '.') { 
     val string = bytes.utf8String 
     bytes = ByteString() 
     List(string) 
     } else { 
     bytes :+= byt 
     Nil 
     } 
    } 

    Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate()) 
} 

생산 : Lorem Ipsum is simply Dummy text of the printing And typesetting industry