2013-08-06 2 views
1

생산자가 소비자에게 메시지를 보내고 소비자가 스칼라 Futures을 사용하여 비동기로 메시지를 처리하는 경우를 예로 들어 봅니다 (예 : future { /* do the processing */ }).스칼라에 선물이있는 프로듀서 - 소비자

이제 프로듀서가 초당 100 개의 메시지를 생성한다고 가정 해 보겠습니다. 소비자는 초당 10 개의 메시지 만 처리합니다. 무슨 일이 일어날 것 ? 나는 메모리 누출이있을 것 같아요. Future 개체가 많이 있고 스레드 풀의 내부 메시지 큐도 커질 것입니다. 그것은 의미가 있습니까?

처리하는 가장 좋은 방법은 무엇입니까?

+0

주 -이 질문은 혼란 스러웠습니다.이 질문은 배우가 아니라 선물에 관한 것입니다. – JasonG

답변

1

최대 대기열 크기를 설정할 수 있습니다. 사실, 나는 라고 생각한다. Akka 액터는 기본적으로 제한된 큐를 가지고 있지만, 나는 여기서 잘못 될 수있다.

실제로 문제를 해결하지는 못하지만, 결국에는 처리를 수행하는 백엔드 배우가 충분하지 않으면 모든 것을 처리 할 수 ​​없습니다.

나는 넷플 릭스가하는 것을 좋아한다 : 모든 요청은 백엔드의 상태를 모니터하는 프록시를 거친다. 백엔드가 너무 오래 걸리면 요청을 삭제하고 오류가있는 기본 메시지 또는 오류 메시지를 전달합니다. 그들은 그들의 아키텍처에 대해 많이 이야기합니다. 예를 들어 this presentation을 참조하십시오.

+0

감사합니다. 이제는 생산자와 소비자 사이에 제한된 대기열이 있다고 가정합니다. 문제가 해결 되었습니까? 소비자 논리를 바꾸지 않겠습니까? – Michael

0

여러 소비자가 있습니다 - 액터 풀을 사용하십시오. 풀의 스트레스에 따라 동적으로 크기를 조정할 수 있습니다. 실행 컨텍스트를 사용, akka에서 http://doc.akka.io/docs/akka/snapshot/scala/routing.html

+0

감사합니다. 액터/스레드를 더 추가 할 수 없다고 가정 해보십시오. 예를 들어, 처리는 CPU 바운드이고 CPU는 1 개뿐입니다. – Michael

+0

그러면 어느 시점에서 자원을 소진 할 것입니다. 더 많이 생산한다면, 어떤 시점에서 메시지를 버려야 할 것입니다 (Daniel이 제안한 것처럼 크기 제한이있는 제한된 대기열을 사용하십시오). 또는 더 많은 자원을 사용하십시오. 클러스터 된 접근 방법 (http://doc.akka.io/docs/akka/snapshot/common/cluster.html 참조) –

+0

CPU가 하나 뿐인 호스트가 하나만 있다고 가정합니다. 또한 생산자와 소비자가 평균적으로 잘 작동한다고 가정 해보십시오. 문제는 _bursts_를 처리하는 것입니다. 버스트가 계획보다 크면 소비자는 일부 메시지를 버릴 수 있습니다. 스레드로 구현하고 제한된 메시지 큐를 차단하는 방법을 알고 있습니다. 이제'scala.concurrent.future'로 구현하는 방법을 궁금해합니다. – Michael

2

참조 있지만 사서함은없는 것처럼 보인다 -이 소스를 읽을 가치가있을 것이다 그러나 나는 실험에 의해 귀하의 질문에 대답 할 수

미래의이 '사서함이없는 '나는 Akka는 실행 컨텍스트가 실제로 들어있는 후드 또는 무엇에 따라 수행 정확히 100 % 확실하지 않다,하지만 직접 선물을 사용할 때 우리는 akka 메모리가 부족할 것입니다 볼 수 있습니다

scala> import scala.concurrent.Future 
import scala.concurrent.Future 

scala> import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.ExecutionContext.Implicits.global    ^

scala> while(1==1) Future(Thread.sleep(100)) 
java.lang.OutOfMemoryError: Java heap space 

을 우리가하면' 메시지에 대해 이야기하면 액터 메시지 큐의 동작을 설명하는 사서함이 있습니다. 한 번에 하나의 메시지 만 처리됨) - 아래에서이 내용을 설명하겠습니다.

제한된 사서함 (예 : 크기 제한이있는 사서함)을 가정하면 메시지는 어떻게됩니까? 대답은 사서함에 달려 있습니다. -와 예를 들어 한도 치면

bounded-mailbox { 
    mailbox-type = "akka.dispatch.BoundedMailbox" 
    mailbox-capacity = 1000 
    mailbox-push-timeout-time = 10s 
} 

자 할 때, akka 이전 또는 사서함 구성 방법에 따라 새 메시지를 떨어질 것 중 하나 첫째, 한정된 사서함은 크기 제한과 같은 몇 가지 설정을 가지고 있습니다 이 다음 응용 프로그램들이 메모리에 저장 될 때 메시지가 손실됩니다 것을 의미 충돌 할 수 메모리가 부족 같은 다른 자원 문제가 있는지 분명히

# whether to drop older items (instead of newer) when the queue is full 
discard-old-when-full = on 

설정. 묶여 있지 않은 사서함은 오류 조건이 발생할 때까지 메시지를 계속 스택하여 바운드 된 사서함을 사용할 수 있습니다.

오류 조건의 메시지 손실이 바람직하지 않은 경우 다른 옵션이 있습니다. 파일과 같이 더 영구적으로 메시지를 저장하는 영구 메일 상자를 사용할 수 있습니다. 다음은보다 영구적 인 메시지 저장을 위해 파일을 사용하는 사서함 구성의 예입니다.

akka { 
    actor { 
    mailbox { 
     file-based { 
     # directory below which this queue resides 
     directory-path = "./_mb" 

     # attempting to add an item after the queue reaches this size (in items) 
     # will fail. 
     max-items = 2147483647 

     # attempting to add an item after the queue reaches this size (in bytes) 
     # will fail. 
     max-size = 2147483647 bytes 

     # attempting to add an item larger than this size (in bytes) will fail. 
     max-item-size = 2147483647 bytes 

     # maximum expiration time for this queue (seconds). 
     max-age = 0s 

     # maximum journal size before the journal should be rotated. 
     max-journal-size = 16 MiB 

     # maximum size of a queue before it drops into read-behind mode. 
     max-memory-size = 128 MiB 

     # maximum overflow (multiplier) of a journal file before we re-create it. 
     max-journal-overflow = 10 

     # absolute maximum size of a journal file until we rebuild it, 
     # no matter what. 
     max-journal-size-absolute = 9223372036854775807 bytes 

     # whether to drop older items (instead of newer) when the queue is full 
     discard-old-when-full = on 

     # whether to keep a journal file at all 
     keep-journal = on 

     # whether to sync the journal after each transaction 
     sync-journal = off 

     # circuit breaker configuration 
     circuit-breaker { 
      # maximum number of failures before opening breaker 
      max-failures = 3 

      # duration of time beyond which a call is assumed to be timed out and 
      # considered a failure 
      call-timeout = 3 seconds 

      # duration of time to wait until attempting to reset the breaker during 
      # which all calls fail-fast 
      reset-timeout = 30 seconds 
     } 
     } 
    } 
    } 
} 
+0

자세한 설명 주셔서 감사합니다. 제 질문은 '미래'에 관한 것입니다. 나는'future {...}'를 쓸 때 실제로 객체를 만들고 객체 참조를 쓰레드 풀 사서함에 넣는다. 그것은 의미가 있습니까? 만약 그렇다면 사서함이 묶여 있는지, 그리고 사서함이 꽉 찬 경우'future {...} '로 어떻게되는지 물어볼 것입니다. – Michael

+0

아 ~ 예 - 죄송합니다 - 사서함라는 용어를 사용하면 이것이 akka 질문이라고 생각하게되었습니다. – JasonG

+0

다른 답변들이 배우/파견자에 대해 이야기하고 있기 때문에 나는 혼란 스러웠다. 그러나 당신은 Future의 이야기에 대해 특별히 이야기하고있다. – JasonG