2014-07-15 7 views
1

외부 시스템에서 연속적인 스트림 메시지를 처리하는 액터 시스템이 있습니다. 내 시스템에는 다음과 같은 배우가 있습니다.스프레이 클라이언트를 사용하여 액터 시스템 내에서 REST 웹 서비스 호출하기

  1. SubscribeActor -이 배우가 레디 스 채널을 구독하고 새로운 InferActor를 만들고에 JSON 페이로드를 전달합니다.
  2. InferenceActor -이 배우는 2a를 담당합니다. 페이로드를 파싱하고 JSON 페이로드에서 일부 값 텍스트 값을 추출합니다. 2b. 외부에서 REST service을 호출하여 2a에서 추출한 값을이 서비스로 전달합니다. REST 서비스는 LAN의 다른 노드에 배치되며 계산면에서 상당한 양의 무거운 짐을 처리합니다.

2b의 외부 REST 서비스는 스프레이 클라이언트를 사용하여 호출됩니다. 나는 시스템을 테스트했고 2a까지 제대로 작동합니다. 그러나, 2b를 소개하자마자. OutOfMemory 오류가 발생하기 시작하고 결국 시스템이 중단됩니다.

  1. 디자인 결함 - -

    현재, 나는 두 가지 주요 용의자를했습니다 내 배우 내부 시스템을 스프레이 클라이언트를 사용하고 방법이 정확하지

  2. (I 스프레이에 새로 온) 느린 REST 서비스로 인한 대기 시간으로 인해 성능 문제가 발생합니다.

# 2로 가기 전에 Spray 클라이언트를 올바르게 사용하고 있는지 확인하고 싶습니다. 특히 esp. 내가 다른 배우들로부터 그것을 부를 때. 내 질문은 올바른/부정확/차선책 아래의 사용법 무엇입니까?

다음은 서비스를 호출하는 웹 서비스 REST 클라이언트의 코드입니다. 여기

trait GeoWebClient { 
    def get(url: String, params: Map[String, String]): Future[String] 
} 

class GeoSprayWebClient(implicit system: ActorSystem) extends GeoWebClient { 

    import system.dispatcher 

    // create a function from HttpRequest to a Future of HttpResponse 
    val pipeline: HttpRequest => Future[HttpResponse] = sendReceive 

    // create a function to send a GET request and receive a string response 
    def get(path: String, params: Map[String, String]): Future[String] = { 

    val uri = Uri("http://myhost:9191/infer") withQuery params 
    val request = Get(uri) 
    val futureResponse = pipeline(request) 
    futureResponse.map(_.entity.asString) 
    } 
} 

그리고

위의 서비스를 호출 InferenceActor 코드입니다.

class InferenceActor extends Actor with ActorLogging with ParseUtils { 

    val system = context.system  
    import system.dispatcher  
    val restServiceClient = new GeoSprayWebClient()(system)  

    def receive = { 

    case JsonMsg(s) => { 

     //first parse the message to 
     val text: Option[String] = parseAndExtractText(s) //defined in ParseUtils trait 
     log.info(s"extract text $text") 

     def sendReq(text: String) = { 
     import spray.http._ 
     val params = Map(("text" -> text)) 
     // send GET request with absolute URI 
     val futureResponse = restServiceClient.get("http://myhost:9191/infer", params) 
     futureResponse 
     } 

     val f: Option[Future[String]] = text.map(x => sendReq(x)) 

     // wait for Future to complete NOTE: I commented this code without any change. 
     /* f.foreach { r => r.onComplete { 
     case Success(response) => log.debug("*********************" + response) 
     case Failure(error) => log.info("An error has occurred: " + error.getMessage) 
     } 
     } 
     */ 
     context stop self  

    } 
    }   
} 
+0

어떤 종류의 개체가 OOM 상황을 일으키는 지 확인하기 위해 메모리 덤프를 보았습니까? 서버가 실행 중일 때 빠른 개요를 보려면'jmap -histo : live '을 시도해보십시오. 또는 사실적인 분석을 위해 Eclipse MAT를 사용하십시오. – jrudolph

답변

0

코드의 두 번째 조각처럼 차단하는 경우가 akka 문서, Blocking needs careful management에 명시된 바와 같이 다른 미래에 미래를 포장하려고하고 있다고.

해당 요청의 리소스 양을 제한해야합니다.

다른 액터로 text.map을 전송하는 것이 더 쉬운 것처럼 보입니다.