2017-12-09 27 views
1

를 사용합니다.어떻게 <a href="https://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html" rel="nofollow noreferrer">host-level API with a queue</a>을 사용하고 Source.Queue 배압

private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async 
    .viaMat(poolFlow)(Keep.both) 
    .toMat(
     Sink.foreach({ 
     case ((Success(resp), p)) => 
      p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
     }) 
    )(Keep.left) 
    .run() 

내가 연결 풀에서 연결 요청 경주의 여지가 있지만 다음과 같은 오류 얻을 :. 내가 .async를 추가하는 시도하지만, 다시 압력이 여전히 걷어차하지 않습니다

java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request 
    at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84) 
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94) 
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91) 
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447) 
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464) 
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559) 
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741) 
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:496) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

을 어떻게합니까 위의 오류는 무엇이며 문제를 조사하는 방법은 무엇입니까?

답변

0

Source.queue 개체 메서드를 사용하여 Source을 이미 구성 했으므로 queue.offer을 호출하는 기능에 직접 역기전력을 적용 할 수는 없다고 생각합니다. 그러나 문제는 다른 방식으로 해결 될 수 있습니다. OverflowStrategy

다른

당신은 OverflowStrategy.dropHead 또는 OverflowStrategy.dropTail 같은 것으로 전략을 변경할 수 있습니다. queueSizequeue.offer 호출의 비율과 비교해 충분히 큰 경우에는 이것이 아마도 사용자의 필요를 충족시킬 것입니다.

+0

메시지를 잃을 여유가 없습니다. 그것이 작동하지 않으면 배압 전략의 요점은 무엇입니까? – Rabzu

+0

@Rabzu Source.queue에서 역 압력 전략을 사용하는 예를 알지 못합니다. 배압이 그다지 중요하지 않다면 왜 처음에는'Source.queue'를 사용해야합니까? –

+0

대체 뭐야? – Rabzu