Source[ByteString, NotUsed]
의 목록과 S3 버킷의 파일 이름이 짝을 이루고 있습니다. 이를 상수 메모리에 압축하여 Play 2.6에서 제공해야합니다.Akka 스트림을 사용하여 [Source [ByteString, NotUsed]]를 즉시 압축합니다.
: (재생 2.6+ 필요) Akka 스트림에 대한 관련 코드와 stream a zip created on the fly with play 2.5 and akka stream with backpressure
여기 :
https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60가 내 실험 지금까지 기반이 그러나 위의 요지는 요점은 다른 문제를 해결합니다 - 그래프 스테이지를 InputStream
에 전달하여 디스크에서 파일을 스트리밍합니다. 그러나 Source[ByteString, NotUsed]
을 InputStream
으로 변환하는 안전한 방법이 없으므로 스 니펫을있는 그대로 사용할 수는 없습니다.
지금까지 입력 유형을 () => InputStream
에서 () => Source[ByteString, NotUsed]
으로 변경 한 다음 source.runForeach(...)
을 사용하여이를 소비했습니다.
내 변화의 대부분은 여기에 있습니다 :
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val src: Source[ByteString, NotUsed] = source()
val operation = src.runForeach(bytestring => {
val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
emitMultiple(out, fileChunks(byteInputStream, buffer))
})
operation.onComplete {
case _ => buffer.endEntry()
}
Await.ready(operation, 5.minute)
}
나는이 차단되는 것을 알고 있지만, 나는이 상황에서 허용되는지 여부를 확신입니다.
어떻게 안전하게이 작업을 수행 할 수 있습니까?
제가 또한 요지에 훨씬 더 가깝다 버전 시도
EDIT :
[ERROR : 그러나
override def onPush(): Unit = { val (filepath, source: StreamGenerator) = grab(in) buffer.startEntry(filepath) val stream = source().runWith(StreamConverters.asInputStream(1.minute)) currentStream = Some(stream) emitMultiple(out, fileChunks(stream, buffer),() => buffer.endEntry()) }
는,이 스택 트레이스에 오류를 얻을 ] [11/27/2017 09 : 26 : 38.428] [alpakka-akka.actor.default-dispatcher-3] [akka : // alpakka/user/StreamSupervisor-0/flow-0] -0-headSink] 단계의 오류 [[email protected]] : 비활성 스트림이 종료되고 읽을 수 없음 java.io.IOException : 비활성 스트림이 종료되고 읽음이 없습니다. akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed에서 (InputStreamSinkStage.scala 125) akka.stream에서 : akka.stream.impl.io.InputStreamAdapter.subscriberClosedException (InputStreamSinkStage.scala 117)에 가능 . impl.io.InputStreamAdapter.read (InputStreamSinkStage.scala : 144) 에 com.company.productregistration.services.s3.StreamedZip $$ anon $ 2.result $ 1 (StreamedZip.scala : 99) 에서 com.company.productregistration.services.s3.StreamedZip $$ anon $ 2. $ anonfun $ fileChunks $ 1 (StreamedZip.scala : 105) at scala.collection.immutable.Stream $ Cons.tail (Stream.scala : 1169) at scala.collection.immutable.Stream $ Cons.tail (Stream.scala : 1159) at scala.collection.immutable.StreamIterator. $ anonfun $ next $ 1 (Stream.scala : 1058) at scala.collection.immutable. StreamIterator $ LazyCell.v $ lzycompute (Stream.scala : 1047) at scala.collection.immutable.StreamIterator $ LazyCell.v (Stream.스칼라 : 1047) scala.collection.immutable.StreamIterator.hasNext에서 (Stream.scala : 1052) akka.stream.stage.GraphStageLogic $ EmittingIterator.onPull (GraphStage.scala에서 : 911) akka.stream에서 akka.stream.impl.fusing.GraphInterpreterShell.runBatch에서 : akka.stream.impl.fusing.GraphInterpreter.execute (412 GraphInterpreter.scala)에서 : .impl.fusing.GraphInterpreter.processPull (506 GraphInterpreter.scala) (ActorGraphInterpreter.scala : 571) at akka.stream.impl.fusing.GraphInterpreterShell.init (ActorGraphInterpreter.scala : 541) at akka.stream.impl.fusing.ActorGrap akka에서 : akka.actor.Actor.aroundPreStart (522 Actor.scala)에서 : akka.stream.impl.fusing.ActorGraphInterpreter.preStart (707 ActorGraphInterpreter.scala)에서 : hInterpreter.tryInit (659 ActorGraphInterpreter.scala) .actor.Actor.aroundPreStart $ (Actor.scala : 522) at akka.actor.ActorCell.create (ActorCell.scala : 591)에서 akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart (ActorGraphInterpreter.scala : 650) 에 있습니다. akka.actor.ActorCell.systemInvoke (ActorCell.scala : 484)의 일 : akka.dispatch.Mailbox.processAllSystemMessages (282 Mailbox.scala)에서 : akka.actor.ActorCell.invokeAll $ 1 (462 ActorCell.scala)에서 akka.dispatch.Mailbox.run (Mailbox.scala : 223) at akka.d ispatch.Mailbox.exec (Mailbox.scala : 234) at akka.dispatch.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339) akka.dispatch.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)에서 akka.dispatch.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979) 에서
EDIT2 나는 경우, currentStream = Some(stream)
을 설정하지 마십시오. 위의 오류가 발생하지 않습니다. 또한, 그것은 실제로 파일의 일부 조합에 대해 작동합니다. 내가 약 20 메가 바이트의 파일, 만약 내가 마지막 소스로 넣어, 내 zip 파일을 손상시킵니다. 소스 목록에서 다른 곳으로 옮기면 모든 것이 올바르게 작동합니다.
import java.io.{ByteArrayInputStream, InputStream, OutputStream}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
mat: ActorMaterializer)
extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {
import StreamedZip._
val in: Inlet[ZipSource] = Inlet("StreamedZip.in")
val out: Outlet[ByteString] = Outlet("StreamedZip.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
emitMultiple(out, fileChunks(stream, buffer),() => { buffer.endEntry() })
}
override def onUpstreamFinish(): Unit = {
println("Updstream finish")
closeInput()
if (buffer.isEmpty) completeStage()
else {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}
}
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator
}
}
}
object StreamedZip {
type ZipFilePath = String
type StreamGenerator =() => Source[ByteString, NotUsed]
type ZipSource = (ZipFilePath, StreamGenerator)
def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()
}
class ZipBuffer(val bufferSize: Int = 64 * 1024) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) {
// this MUST ONLY be used after flush()!
def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
것은 현재 유용이 답을 찾을 수 있습니다 : 다음과 같이
내 솔루션 보이는 https://stackoverflow.com/a/47146187/49630 –