2016-06-18 7 views
12

각 줄을 처리하기 위해 akka 스트림을 사용하여 여러 개의 큰 파일을 읽고 싶습니다. 각 키가 ("식별자"-> "값")으로 구성된다고 상상해보십시오. 새로운 "식별자"가 발견되면 데이터베이스에 저장하고 "값"을 저장하려고합니다. 그렇지 않으면 식별자가 이미 스트림 스트림을 처리하는 동안 발견 된 경우 "값"만 저장하려고합니다. 이를 위해 이미 맵에서 발견 된 식별자를 유지하기 위해 일종의 재귀적인 상태 저장 플로우가 필요하다고 생각합니다. 나는이 흐름에서 한 쌍의 (newLine, contextWithIdentifiers)를받을 것이라고 생각한다.Akka Streams. 흐름의 상태 유지 상태

나는 방금 akka 스트림을 조사하기 시작했습니다. 나는 무국적 처리 물건을 관리 할 수 ​​있다고 생각하지만 "contextWithIdentifiers"를 유지하는 방법에 대한 단서가 없다. 나는 누군가가 좋은 방향으로 나를 가리킬 수없는 경우에 감사 할 것입니다.

저는 스칼라를 사용하고 있습니다.

+2

감사합니다. 그것은 매우 간단한 요청이지만 샘플 코드로 의미있는 대답을 찾는 것은 정교합니다. 이것은 내가 찾은 유일한 것입니다! – akauppi

답변

17

어쩌면 statefulMapConcat 같은 모양이 당신을 도울 수 있습니다.

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import scala.util.Random._ 
import scala.math.abs 
import scala.concurrent.ExecutionContext.Implicits.global 

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

//encapsulating your input 
case class IdentValue(id: Int, value: String) 
//some random generated input 
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere")) 

val stateFlow = Flow[IdentValue].statefulMapConcat{() => 
    //state with already processed ids 
    var ids = Set.empty[Int] 
    identValue => if (ids.contains(identValue.id)) { 
    //save value to DB 
    println(identValue.value) 
    List(identValue) 
    } else { 
    //save both to database 
    println(identValue) 
    ids = ids + identValue.id 
    List(identValue) 
    } 
} 

Source(identValues) 
    .via(stateFlow) 
    .runWith(Sink.seq) 
    .onSuccess { case identValue => println(identValue) } 
+0

코드를 제공해 주셔서 감사합니다. 나는() => ... 공장이 관련되어 있기 때문에 중간에 조금 더 많은 유형을 고맙게 생각합니다. '.statefulMap' 메쏘드가없는 이유를 아시겠습니까? – akauppi