2017-12-17 18 views
2

이 스트림을 소비하는 간단한 akka 스트림 나머지 엔드 포인트 및 클라이언트를 작성하려고합니다. 하지만 서버와 클라이언트를 실행하려고하면 클라이언트는 스트림의 일부만을 소비 할 수 있습니다. 실행 중에 예외가 표시되지 않습니다.Akka http-client는 서버의 모든 데이터 스트림을 사용할 수 없습니다.

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport} 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision} 
import akka.stream.scaladsl.{Flow, Source} 
import akka.util.ByteString 
import spray.json.DefaultJsonProtocol 

import scala.io.StdIn 
import scala.util.Random 

object WebServer { 

    object Model { 
    case class Person(id: Int = Random.nextInt(), fName: String = Random.nextString(10), sName: String = Random.nextString(10)) 
    } 

    object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol { 
    implicit val personFormat = jsonFormat(Model.Person.apply, "id", "firstName", "secondaryName") 
    } 

    def main(args: Array[String]) { 

    implicit val system = ActorSystem("my-system") 
    implicit val materializer = ActorMaterializer() 

    implicit val executionContext = system.dispatcher 

    val start = ByteString.empty 
    val sep = ByteString("\n") 
    val end = ByteString.empty 

    import JsonProtocol._ 
    implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 
     .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end)) 
     .withParallelMarshalling(parallelism = 8, unordered = false) 

    val decider: Supervision.Decider = { 
     case ex: Throwable => { 
     println("Exception occurs") 
     ex.printStackTrace() 
     Supervision.Resume 
     } 
    } 

    val persons: Source[Model.Person, NotUsed] = Source.fromIterator(
    () => (0 to 1000000).map(id => Model.Person(id = id)).iterator 
    ) 
     .withAttributes(ActorAttributes.supervisionStrategy(decider)) 
     .map(p => { println(p); p }) 


    val route = 
     path("persons") { 
     get { 
      complete(persons) 
     } 
     } 

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 

    bindingFuture 
     .flatMap(_.unbind()) 
     .onComplete(_ => { 
     println("Stopping http server ...") 
     system.terminate() 
     }) 
    } 
} 

와 클라이언트 :

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpRequest, Uri} 
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision} 

import scala.util.{Failure, Success} 

object WebClient { 
    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = system.dispatcher 


    val request = HttpRequest(uri = Uri("http://localhost:8080/persons")) 

    val response = Http().singleRequest(request) 

    val attributes = ActorAttributes.withSupervisionStrategy { 
     case ex: Throwable => { 
     println("Exception occurs") 
     ex.printStackTrace 
     Supervision.Resume 
     } 
    } 
    response.map(r => { 
     r.entity.dataBytes.withAttributes(attributes) 
    }).onComplete { 
     case Success(db) => db.map(bs => bs.utf8String).runForeach(println) 
     case Failure(ex) => ex.printStackTrace() 
    } 
    } 
} 

은 100, 1000, 10 000 명 작동하지만 작동하지 않습니다 000 100>에 대한 '여기

내 서버와 클라이언트입니다. 사람 (79101, ⰷ 瑐 劲 죗 醂 竜 :이 스트림에 대한 몇 가지 제한이 있지만 내가 그것을

마지막 기록은 내 로컬 컴퓨터에 서버가 인쇄 된 이 (번호 79101로)입니다

찾을 수 없습니다처럼 같습니다泲 늎 制 䠸, 䮳 硝 沢 并 ⎗ᝨᫌ ꊭ ᐽ 酡)

클라이언트

마지막 기록() 번호 79048 함께 : 그런 일이 왜

{"id":79048,"firstName":"췁頔䚐龫暀࡙頨捜昗㢵","secondaryName":"⏉ݾ袈庩컆◁ꄹ葪䑥Ϻ"} 

아마 누군가가 알아?

답변

1

해결책을 찾았습니다. 클라이언트에 r.entity.withoutSizeLimit()을 명시 적으로 추가해야하고 이후에 모두 예상대로 작동합니다.