재생 프레임 워크 Iteratee를 사용하여 파일을 읽습니다. 이 파일 청크를 청크 (각 단계마다)로 처리하고 싶습니다.Play Iteratees를 사용하여 프로세스의 각 단계에 대해 청크별로 파일 청크를 읽고 처리하는 방법
I는 다음과 같이 구성 :
groupByLines: Enumeratee[Array[Byte], List[String]]
turnIntoLines: Enumeratee[List[String], List[Line]]
-
(나는
parseChunk: Iteratee[List[Line], Try[List[T]]]
(예 : CSV 구문 분석)
case class Line(number: Int, value: String)
정의)
내가 Iteratee.fold
를 사용해야 groupByLines
을 정의하려면 이전 청크의 마지막 라인을 현재 청크의 첫 번째 라인과 연결합니다.
문제는 이것이 파일의 모든 라인을 포함하는 단일 청크를 생성한다는 것입니다.
하지만 청크 파일을 처리하고 싶습니다. 예를 들어, groupByLines
은 200 줄의 청크를 생성해야합니다.
turnIntoLine
과 동일한 문제가 발생합니다. 또한 fold
을 사용하여 선을 만듭니다. 줄 번호와 줄 내용을 압축하려면 누적 기 (fold
이 제공)를 사용해야합니다.
나는 iteratee를 처음 접하는 자입니다.
여기 내 코드입니다 :
여기val chunkSize = 1024 * 8
val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize)
def isLastChunk(chunk: Array[Byte]): Boolean = {
chunk.length < chunkSize
}
val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped {
println("groupByLines")
Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) {
case ((accLast, accLines), chunk) =>
println("groupByLines chunk size " + chunk.length)
new String(chunk)
.trim
.split("\n")
.toList match {
case lines @ Cons(h, tail) =>
val lineBetween2Chunks: String = accLast + h
val goodLines =
isLastChunk(chunk) match {
case true => Cons(lineBetween2Chunks, tail)
case false => Cons(lineBetween2Chunks, tail).init
}
(lines.last, accLines ++ goodLines)
case Nil => ("", accLines)
}
}.map(_._2)
}
val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped {
println("turnIntoLines")
Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) {
case ((index, accLines), chunk) =>
println("turnIntoLines chunk size " + chunk.length)
val lines =
((Stream from index) zip chunk).map {
case (lineNumber, content) => Line(lineNumber, content)
}.toList
(index + chunk.length, lines ++ accLines)
}.map(_._2)
}