2014-09-19 4 views
1

이 코드가 있으며 Stream에 반복을 중지시키고 누적 된 결과를 얻도록하고 싶습니다. 기본적으로, 반복은 그것이 작동Stream을 중지하여 다음 요소를 평가하고 누적 된 결과를 기능적 방식으로 얻는 방법

sealed trait Ele 

case class FailureEle() extends Ele 
case class SuccessEle() extends Ele 

type EitherResult = Either[IndexedSeq[Ele], Seq[FailureEle]] 

def parse(process: Process[Task, Ele], errorLimit: Int): EitherResult = { 

    val errorAccumulator = new ListBuffer[FailureEle] 
    val taskProcess = process.map(t => { 
    t match { 
     case x: FailureEle => errorAccumulator += x 
     case _ => 
    } 
    t 
    }).takeWhile(_ => !(errorAccumulator.size == errorLimit)) 

    val voSeq = taskProcess.runLog.run 

    if (errorAccumulator.isEmpty) { 
     Left(voSeq) 
    } else { 
     Right(errorAccumulator) 
    } 

} 
val result = Seq(FailureEle(), SuccessEle(), FailureEle(), SuccessEle(), SuccessEle(), FailureEle(), SuccessEle()) 

val adaptor = new SeqAdaptor[Ele](result) 

val process: Process[Task, Ele] = Process 
    .repeatEval(Task {adaptor.next()}).takeWhile(t => !t.shouldStop).map(_.get) 

parse(process, 1).isRight //no SuccessEle will be iterated 
parse(process, 2).isRight //only one SuccessEle will be iterated 
parse(process, 3).isRight //the last one SuccessEle will not be iterated 

errorLimit의 수에 따라,하지만 난 더 많은 기능으로 해석 방법을 리팩토링 할 몇 가지 문제가있다 :

  • ListBuffer은 필수적 방법

  • takeWhile 조건은 여전히 ​​ListBuffer 결과

을 사용하고, 현재의 구성 요소를 확인하는 어떤 논리가 없다

그래서 ListBuffer를 사용하여 명령 방식을 대신 할 꼬리 재귀 방식이 있는지 궁금합니다.

답변

0

scan 더 충분하지만,

 sealed trait Ele 
     case class FailureEle(e: Throwable) extends Ele 
     case class SuccessEle(r: String) extends Ele 

     def parse(p: Process[Task, Ele], error: Int): Process[Task, (Seq[SuccessEle], Seq[FailureEle])] = { 
      p.scan(Seq[SuccessEle]() -> Seq[FailureEle]()) { (r, e) => 
       val (s, f) = r 
       e match { 
       case fail: FailureEle => 
        s -> (f :+ fail) 
       case succ: SuccessEle => 
        (s :+ succ) -> f 
       } 
      }.dropWhile { case (succ, fail) => fail.size < error }.take(1) 
      } 

     def test() { 
      def randomFail = { 
       val nInt = scala.util.Random.nextInt() 
       println("getting" + nInt) 
       if(nInt % 5 == 0) 
       FailureEle(new Exception("fooo")) 
       else 
       SuccessEle(nInt.toString) 
      } 
      val infinite = Process.repeatEval(Task.delay(randomFail)) 
      val r = parse(infinite, 3).runLast.run 
      println(r) 
} 
작동하지 않을 수 있습니다