2015-01-27 9 views
1

나는 SinkSource을 제공하는 SinkSource을 찾고 있습니다. 요소가 Sink으로 유입되면 해당 요소는 Source에 제공되어야합니다. 실행하면SinkSource [T]와 같은 것이 있습니까?

object SinkSource { 
    def apply[T] = new { 
    def sink: Sink[T] = ??? 
    def source: Source[T] = ??? 
    } 
} 
val flowgraph = FlowGraph { implicit fgb => 
    import FlowGraphImplicits._ 
    val sinksource = SinkSource[Int] 
    Source(1 to 5) ~> sinksource.sink 
        sinksource.source ~> Sink.foreach(print) 
} 
implicit val actorSystem = ActorSystem(name = "System") 
implicit val flowMaterializer = FlowMaterializer() 
val materializedMap = flowgraph.run() 

이 인쇄해야합니다 : 12345
그래서하는 SinkSource합니다 (API에서 그것을 보지 못했다) 존재 또는 누군가가 그것을 구현하는 방법을 알고 있지 않습니다 다음 코드는 내가 무슨 뜻인지 보여줍니다? 내가 Flow이 특정 형태의 솔루션 아니라고 SinkSource에 별개의 접근이 필요하다는 것을 언급해야한다 : 아이디어는 질문이 이미 요구 한 경우 마음에 와서

Source(1 to 5) ~> Flow[Int] ~> Sink.foreach(println) 

답변

1

과 같이 자주 : 그것은 밝혀졌다 , SinkSource, JunctionInPortJunctionOutPort이 필요하지 않습니다.
여기에 표시됩니다.

object SinkSource { 
    def apply[T](implicit fgb: FlowGraphBuilder) = new SinkSource[T] 
} 
class SinkSource[T](implicit fgb: FlowGraphBuilder) { 
    import FlowGraphImplicits._ 
    private val merge = Merge[T] 
    private val bcast = Broadcast[T] 
    Source.empty ~> merge 
    merge ~> bcast 
    bcast ~> Sink.ignore 
    def in: JunctionInPort[T] = merge 
    def out: JunctionOutPort[T] = bcast 
} 
val flowgraph = FlowGraph { implicit fgb => 
    import FlowGraphImplicits._ 
    val source = Source(1 to 5) 
    val sink = Sink.foreach(println) 
    val sinkSource = SinkSource[Int] 
    source ~> sinkSource.in 
      sinkSource.out ~> sink 
} 
implicit val actorSystem = ActorSystem(name = "System") 
implicit val flowMaterializer = FlowMaterializer() 
val materializedMap = flowgraph.run()