2017-11-27 18 views
2

Source[ByteString, NotUsed]의 목록과 S3 버킷의 파일 이름이 짝을 이루고 있습니다. 이를 상수 메모리에 압축하여 Play 2.6에서 제공해야합니다.Akka 스트림을 사용하여 [Source [ByteString, NotUsed]]를 즉시 압축합니다.

여기에 다소 비슷한 질문이

: (재생 2.6+ 필요) Akka 스트림에 대한 관련 코드와 stream a zip created on the fly with play 2.5 and akka stream with backpressure

여기 :

https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60가 내 실험 지금까지 기반이 그러나 위의 요지는 요점은 다른 문제를 해결합니다 - 그래프 스테이지를 InputStream에 전달하여 디스크에서 파일을 스트리밍합니다. 그러나 Source[ByteString, NotUsed]InputStream으로 변환하는 안전한 방법이 없으므로 스 니펫을있는 그대로 사용할 수는 없습니다.

지금까지 입력 유형을 () => InputStream에서 () => Source[ByteString, NotUsed]으로 변경 한 다음 source.runForeach(...)을 사용하여이를 소비했습니다.

내 변화의 대부분은 여기에 있습니다 :

override def onPush(): Unit = { 
    val (filepath, source: StreamGenerator) = grab(in) 
    buffer.startEntry(filepath) 
    val src: Source[ByteString, NotUsed] = source() 
    val operation = src.runForeach(bytestring => { 
    val byteInputStream = new ByteArrayInputStream(bytestring.toArray) 
    emitMultiple(out, fileChunks(byteInputStream, buffer)) 
    }) 
    operation.onComplete { 
    case _ => buffer.endEntry() 
    } 
    Await.ready(operation, 5.minute) 
} 

나는이 차단되는 것을 알고 있지만, 나는이 상황에서 허용되는지 여부를 확신입니다.

어떻게 안전하게이 작업을 수행 할 수 있습니까?

제가 또한 요지에 훨씬 더 가깝다 버전 시도

EDIT :

[ERROR : 그러나

override def onPush(): Unit = { 
    val (filepath, source: StreamGenerator) = grab(in) 
    buffer.startEntry(filepath) 
    val stream = source().runWith(StreamConverters.asInputStream(1.minute)) 
    currentStream = Some(stream) 
    emitMultiple(out, fileChunks(stream, buffer),() => buffer.endEntry()) 
} 

는,이 스택 트레이스에 오류를 얻을 ] [11/27/2017 09 : 26 : 38.428] [alpakka-akka.actor.default-dispatcher-3] [akka : // alpakka/user/StreamSupervisor-0/flow-0] -0-headSink] 단계의 오류 [[email protected]] : 비활성 스트림이 종료되고 읽을 수 없음 java.io.IOException : 비활성 스트림이 종료되고 읽음이 없습니다. akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed에서 (InputStreamSinkStage.scala 125) akka.stream에서 : akka.stream.impl.io.InputStreamAdapter.subscriberClosedException (InputStreamSinkStage.scala 117)에 가능 . impl.io.InputStreamAdapter.read (InputStreamSinkStage.scala : 144) 에 com.company.productregistration.services.s3.StreamedZip $$ anon $ 2.result $ 1 (StreamedZip.scala : 99) 에서 com.company.productregistration.services.s3.StreamedZip $$ anon $ 2. $ anonfun $ fileChunks $ 1 (StreamedZip.scala : 105) at scala.collection.immutable.Stream $ Cons.tail (Stream.scala : 1169) at scala.collection.immutable.Stream $ Cons.tail (Stream.scala : 1159) at scala.collection.immutable.StreamIterator. $ anonfun $ next $ 1 (Stream.scala : 1058) at scala.collection.immutable. StreamIterator $ LazyCell.v $ lzycompute (Stream.scala : 1047) at scala.collection.immutable.StreamIterator $ LazyCell.v (Stream.스칼라 : 1047) scala.collection.immutable.StreamIterator.hasNext에서 (Stream.scala : 1052) akka.stream.stage.GraphStageLogic $ EmittingIterator.onPull (GraphStage.scala에서 : 911) akka.stream에서 akka.stream.impl.fusing.GraphInterpreterShell.runBatch에서 : akka.stream.impl.fusing.GraphInterpreter.execute (412 GraphInterpreter.scala)에서 : .impl.fusing.GraphInterpreter.processPull (506 GraphInterpreter.scala) (ActorGraphInterpreter.scala : 571) at akka.stream.impl.fusing.GraphInterpreterShell.init (ActorGraphInterpreter.scala : 541) at akka.stream.impl.fusing.ActorGrap akka에서 : akka.actor.Actor.aroundPreStart (522 Actor.scala)에서 : akka.stream.impl.fusing.ActorGraphInterpreter.preStart (707 ActorGraphInterpreter.scala)에서 : hInterpreter.tryInit (659 ActorGraphInterpreter.scala) .actor.Actor.aroundPreStart $ (Actor.scala : 522) at akka.actor.ActorCell.create (ActorCell.scala : 591)에서 akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart (ActorGraphInterpreter.scala : 650) 에 있습니다. akka.actor.ActorCell.systemInvoke (ActorCell.scala : 484)의 일 : akka.dispatch.Mailbox.processAllSystemMessages (282 Mailbox.scala)에서 : akka.actor.ActorCell.invokeAll $ 1 (462 ActorCell.scala)에서 akka.dispatch.Mailbox.run (Mailbox.scala : 223) at akka.d ispatch.Mailbox.exec (Mailbox.scala : 234) at akka.dispatch.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339) akka.dispatch.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)에서 akka.dispatch.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979) 에서

EDIT2 나는 경우, currentStream = Some(stream)을 설정하지 마십시오. 위의 오류가 발생하지 않습니다. 또한, 그것은 실제로 파일의 일부 조합에 대해 작동합니다. 내가 약 20 메가 바이트의 파일, 만약 내가 마지막 소스로 넣어, 내 zip 파일을 손상시킵니다. 소스 목록에서 다른 곳으로 옮기면 모든 것이 올바르게 작동합니다.

import java.io.{ByteArrayInputStream, InputStream, OutputStream} 

import akka.NotUsed 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.stream.stage._ 
import akka.util.{ByteString, ByteStringBuilder} 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, ExecutionContext} 
import scala.util.control.NonFatal 

//scalastyle:off 
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext, 
               mat: ActorMaterializer) 
    extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] { 

    import StreamedZip._ 

    val in: Inlet[ZipSource] = Inlet("StreamedZip.in") 
    val out: Outlet[ByteString] = Outlet("StreamedZip.out") 
    override val shape   = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) with StageLogging { 
     private val buffer        = new ZipBuffer(bufferSize) 
     private var currentStream: Option[InputStream] = None 

     setHandler(
     out, 
     new OutHandler { 
      override def onPull(): Unit = 
      if (isClosed(in)) { 
       if (buffer.isEmpty) completeStage() 
       else { 
       buffer.close 
       push(out, buffer.toByteString) 
       } 
      } else pull(in) 

      override def onDownstreamFinish(): Unit = { 
      closeInput() 
      buffer.close 
      super.onDownstreamFinish() 
      } 
     } 
    ) 

     setHandler(
     in, 
     new InHandler { 
      override def onPush(): Unit = { 
      val (filepath, source: StreamGenerator) = grab(in) 
      buffer.startEntry(filepath) 
      val stream = source().runWith(StreamConverters.asInputStream(1.minute)) 
      emitMultiple(out, fileChunks(stream, buffer),() => { buffer.endEntry() }) 
      } 

      override def onUpstreamFinish(): Unit = { 
      println("Updstream finish") 
      closeInput() 
      if (buffer.isEmpty) completeStage() 
      else { 
       buffer.close() 
       if (isAvailable(out)) { 
       push(out, buffer.toByteString) 
       } 
      } 
      } 
     } 
    ) 

     private def closeInput(): Unit = { 
     currentStream.foreach(_.close) 
     currentStream = None 
     } 

     private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = { 
     // This seems like a good trade-off between single-byte 
     // read I/O performance and doubling the ZipBuffer size. 
     // 
     // And it's still a decent defense against DDOS resource 
     // limit attacks. 
     val readBuffer = new Array[Byte](1024) 
     var done  = false 

     def result: Stream[ByteString] = 
      if (done) Stream.empty 
      else { 
      try { 
       while (!done && buffer.remaining > 0) { 
       val bytesToRead = Math.min(readBuffer.length, buffer.remaining) 
       val count  = stream.read(readBuffer, 0, bytesToRead) 
       if (count == -1) { 
        stream.close 
        done = true 
       } else buffer.write(readBuffer, count) 
       } 
       buffer.toByteString #:: result 
      } catch { 
       case NonFatal(e) => 
       closeInput() 
       throw e 
      } 
      } 

     result.iterator 
     } 
    } 
} 

object StreamedZip { 
    type ZipFilePath  = String 
    type StreamGenerator =() => Source[ByteString, NotUsed] 
    type ZipSource  = (ZipFilePath, StreamGenerator) 

    def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip() 

} 

class ZipBuffer(val bufferSize: Int = 64 * 1024) { 
    import java.util.zip.{ZipEntry, ZipOutputStream} 

    private var builder = new ByteStringBuilder() 
    private val zip = new ZipOutputStream(builder.asOutputStream) { 
    // this MUST ONLY be used after flush()! 
    def setOut(newStream: OutputStream): Unit = out = newStream 
    } 
    private var inEntry = false 
    private var closed = false 

    def close(): Unit = { 
    endEntry() 
    closed = true 
    zip.close() 
    } 

    def remaining(): Int = bufferSize - builder.length 

    def isEmpty(): Boolean = builder.isEmpty 

    def startEntry(path: String): Unit = 
    if (!closed) { 
     endEntry() 
     zip.putNextEntry(new ZipEntry(path)) 
     inEntry = true 
    } 

    def endEntry(): Unit = 
    if (!closed && inEntry) { 
     inEntry = false 
     zip.closeEntry() 
    } 

    def write(byte: Int): Unit = 
    if (!closed && inEntry) zip.write(byte) 

    def write(bytes: Array[Byte], length: Int): Unit = 
    if (!closed && inEntry) zip.write(bytes, 0, length) 

    def toByteString(): ByteString = { 
    zip.flush() 
    val result = builder.result 
    builder = new ByteStringBuilder() 
    // set the underlying output for the zip stream to be the buffer 
    // directly, so we don't have to copy the zip'd byte array. 
    zip.setOut(builder.asOutputStream) 
    result 
    } 
} 
+0

것은 현재 유용이 답을 찾을 수 있습니다 : 다음과 같이

내 솔루션 보이는 https://stackoverflow.com/a/47146187/49630 –

답변

1

나는 akka 스트림 DSL을 사용하여 전체 문제를 위에서 ZipBuffer를 사용하여 해결 결국 :

다음은 내 현재 그래프 단계 구현의 전체 목록입니다.

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.scaladsl.Source 
import akka.stream.{ActorMaterializer, SourceShape} 
import akka.util.ByteString 
import com.company.config.AWS 
import org.log4s.getLogger 

case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
    implicit sys: ActorSystem, 
    mat: ActorMaterializer) 
    extends S3StreamingService { 

    private implicit class ConcatSyntax[T, U](source: Source[T, U]) { 
    def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore 
     source.concat(that) 
    } 

    private val logger = getLogger 

    private sealed trait ZipElement 
    private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement 
    private case class FileEnd(name: String, index: Int, outOf: Int) extends ZipElement 
    private case class FilePayload(byteString: ByteString)    extends ZipElement 
    private case object EndZip           extends ZipElement 

    private def payloadSource(filename: String) = 
    s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply) 

    private def fileNameToZipElements(filename: String, 
            index: Int, 
            outOf: Int): Source[ZipElement, NotUsed] = 
    Source.single(FileStart(filename, index, outOf)) ++ 
     payloadSource(filename) ++ 
     Source.single(FileEnd(filename, index, outOf)) 

    def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = { 

    val zipBuffer = new ZipBuffer() 

    val zipElementSource: Source[ZipElement, NotUsed] = 
     Source(filenames.zipWithIndex).flatMapConcat { 
     case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length) 
     } ++ Source.single(EndZip) 

    zipElementSource 
     .map { 
     case FileStart(name, index, outOf) => 
      logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser") 
      zipBuffer.startEntry(name) 
      None 
     case FilePayload(byteString) => 
      if (byteString.length > zipBuffer.remaining()) { 
      throw new Exception(
       s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}") 
      } 
      zipBuffer.write(byteString.toArray, byteString.length) 
      Some(zipBuffer.toByteString()) 
     case FileEnd(name, index, outOf) => 
      logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser") 
      zipBuffer.endEntry() 
      Some(zipBuffer.toByteString()) 
     case EndZip => 
      zipBuffer.close() 
      Some(zipBuffer.toByteString()) 
     } 
     .collect { 
     case Some(bytes) if bytes.length > 0 => bytes 
     } 
    } 

}