2016-09-26 9 views
2

재생 프레임 워크 Iteratee를 사용하여 파일을 읽습니다. 이 파일 청크를 청크 (각 단계마다)로 처리하고 싶습니다.Play Iteratees를 사용하여 프로세스의 각 단계에 대해 청크별로 파일 청크를 읽고 처리하는 방법

I는 다음과 같이 구성 :

  • groupByLines: Enumeratee[Array[Byte], List[String]]
  • turnIntoLines: Enumeratee[List[String], List[Line]]

      (나는 case class Line(number: Int, value: String) 정의)
    • parseChunk: Iteratee[List[Line], Try[List[T]]] (예 : CSV 구문 분석)

    내가 Iteratee.fold를 사용해야 groupByLines을 정의하려면 이전 청크의 마지막 라인을 현재 청크의 첫 번째 라인과 연결합니다.

    문제는 이것이 파일의 모든 라인을 포함하는 단일 청크를 생성한다는 것입니다.

    하지만 청크 파일을 처리하고 싶습니다. 예를 들어, groupByLines은 200 줄의 청크를 생성해야합니다.

    turnIntoLine과 동일한 문제가 발생합니다. 또한 fold을 사용하여 선을 만듭니다. 줄 번호와 줄 내용을 압축하려면 누적 기 (fold이 제공)를 사용해야합니다.

    나는 iteratee를 처음 접하는 자입니다.

    여기 내 코드입니다 :

    여기
    val chunkSize = 1024 * 8 
    
    val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize) 
    
    def isLastChunk(chunk: Array[Byte]): Boolean = { 
        chunk.length < chunkSize 
    } 
    
    val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped { 
        println("groupByLines") 
        Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) { 
        case ((accLast, accLines), chunk) => 
         println("groupByLines chunk size " + chunk.length) 
         new String(chunk) 
         .trim 
         .split("\n") 
         .toList match { 
         case lines @ Cons(h, tail) => 
          val lineBetween2Chunks: String = accLast + h 
    
          val goodLines = 
          isLastChunk(chunk) match { 
           case true => Cons(lineBetween2Chunks, tail) 
           case false => Cons(lineBetween2Chunks, tail).init 
          } 
    
          (lines.last, accLines ++ goodLines) 
         case Nil => ("", accLines) 
         } 
        }.map(_._2) 
    } 
    
    
    val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped { 
        println("turnIntoLines") 
        Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) { 
        case ((index, accLines), chunk) => 
         println("turnIntoLines chunk size " + chunk.length) 
         val lines = 
         ((Stream from index) zip chunk).map { 
          case (lineNumber, content) => Line(lineNumber, content) 
         }.toList 
         (index + chunk.length, lines ++ accLines) 
        }.map(_._2) 
    } 
    
  • 답변

    0

    문제는, 플레이 Iteratees을 사용하여 파일을 줄을 처리하는 방법에 대해 설명합니다.

    우선, UTF-8 사용하여 파일 읽기 I 사용 :

    object EnumeratorAdditionalOperators { 
        implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e) 
    } 
    
    class EnumeratorAdditionalOperators(e: Enumerator.type) { 
    
        def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] = 
        e.fromFile(file, chunkSize) 
         .map(bytes => new String(bytes, Charset.forName("UTF-8"))) 
    
    } 
    

    이어서, ('\n' 절단) 행으로 분할 입력 청크 행 :

    object EnumerateeAdditionalOperators { 
        implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e) 
    } 
    
    class EnumerateeAdditionalOperators(e: Enumeratee.type) { 
    
        def splitToLines: Enumeratee[String, String] = e.grouped(
        Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
         Iteratee.consume() 
    ) 
    
    } 
    

    셋째 으로 줄 번호을 추가하면 여기에있는 코드 조각을 사용합니다 https://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala/ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scala. 내가 EnumeratorEnumeratee에 방법을 "추가"를 implicits을 정의

    class EnumerateeAdditionalOperators(e: Enumeratee.type) { 
    
        /** 
        * As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer. 
        */ 
        def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] = 
        zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] { 
    
         case Input.Empty => 
         Enumerator.enumInput[E](Input.Empty) 
    
         case Input.El((element, index)) if 0 < index => 
         separators andThen Enumerator(element) 
    
         case Input.El((element, _)) => 
         Enumerator(element) 
    
         case Input.EOF => 
         Enumerator.enumInput[E](Input.EOF) 
    
        } 
    
        /** 
        * Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function. 
        * 
        * (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[https://stackoverflow.com/a/27589990/700420 a question about enumeratees on Stack Overflow]].) 
        */ 
        def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = 
        e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) { 
         case ((_, index), value) => 
         value -> step(index) 
        } 
    
        /** 
        * Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time. 
        */ 
        def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one)) 
    
        /** 
        * Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]]. 
        */ 
        def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0) 
    
        // ... 
    
    } 
    

    참고. 이 트릭을 사용하면 예를 들어 Enumerator.fromUTF8File(file)을 쓸 수 있습니다.

    모두 함께 넣어 :

    case class Line(number: Int, value: String) 
    
    
    Enumerator.fromUTF8File(file) &> 
    Enumeratee.splitToLines ><> 
    Enumeratee.zipWithIndex ><> Enumeratee.map{ 
        case (e, idx) => Line(idx, e) 
    } // then an Iteratee or another Enumeratee 
    

    새로운 코드는 훨씬 더 단순하고 간결한 질문에 주어진 하나보다.