2
이 스트림을 소비하는 간단한 akka 스트림 나머지 엔드 포인트 및 클라이언트를 작성하려고합니다. 하지만 서버와 클라이언트를 실행하려고하면 클라이언트는 스트림의 일부만을 소비 할 수 있습니다. 실행 중에 예외가 표시되지 않습니다.Akka http-client는 서버의 모든 데이터 스트림을 사용할 수 없습니다.
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import spray.json.DefaultJsonProtocol
import scala.io.StdIn
import scala.util.Random
object WebServer {
object Model {
case class Person(id: Int = Random.nextInt(), fName: String = Random.nextString(10), sName: String = Random.nextString(10))
}
object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
implicit val personFormat = jsonFormat(Model.Person.apply, "id", "firstName", "secondaryName")
}
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val start = ByteString.empty
val sep = ByteString("\n")
val end = ByteString.empty
import JsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
.withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
.withParallelMarshalling(parallelism = 8, unordered = false)
val decider: Supervision.Decider = {
case ex: Throwable => {
println("Exception occurs")
ex.printStackTrace()
Supervision.Resume
}
}
val persons: Source[Model.Person, NotUsed] = Source.fromIterator(
() => (0 to 1000000).map(id => Model.Person(id = id)).iterator
)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.map(p => { println(p); p })
val route =
path("persons") {
get {
complete(persons)
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => {
println("Stopping http server ...")
system.terminate()
})
}
}
와 클라이언트 :
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import scala.util.{Failure, Success}
object WebClient {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val request = HttpRequest(uri = Uri("http://localhost:8080/persons"))
val response = Http().singleRequest(request)
val attributes = ActorAttributes.withSupervisionStrategy {
case ex: Throwable => {
println("Exception occurs")
ex.printStackTrace
Supervision.Resume
}
}
response.map(r => {
r.entity.dataBytes.withAttributes(attributes)
}).onComplete {
case Success(db) => db.map(bs => bs.utf8String).runForeach(println)
case Failure(ex) => ex.printStackTrace()
}
}
}
은 100, 1000, 10 000 명 작동하지만 작동하지 않습니다 000 100>에 대한 '여기
내 서버와 클라이언트입니다. 사람 (79101, ⰷ 瑐 劲 죗 醂 竜 :이 스트림에 대한 몇 가지 제한이 있지만 내가 그것을마지막 기록은 내 로컬 컴퓨터에 서버가 인쇄 된 이 (번호 79101로)입니다
찾을 수 없습니다처럼 같습니다泲 늎 制 䠸, 䮳 硝 沢 并 ⎗ᝨᫌ ꊭ ᐽ 酡) 클라이언트마지막 기록() 번호 79048 함께 : 그런 일이 왜
{"id":79048,"firstName":"췁頔䚐龫暀࡙頨捜昗㢵","secondaryName":"⏉ݾ袈庩컆◁ꄹ葪䑥Ϻ"}
아마 누군가가 알아?