2016-08-03 1 views
0

에 의해 버전/Akka 메시지를 고유하게 나는 다음과 같은 사용 사례가 있습니다일부 속성

배우가 key:value 쌍으로 구성 외부 소스에서 데이터를 소비합니다. 그런 다음 그들은 이전의 것들을 건너 뛰고 key과 최신 value에서 몇 가지 작업을 수행해야하는 배우에게 전달됩니다. 기본적으로 사서함을 1의 용량을 가진 "하위 주제"로 분할하려고합니다. Akka에서는 이러한 일이 가능합니까? 그렇지 않다면 어떤 대안을 고려해야합니까?

+0

왜 사전 필터링을 사용하지 않고 최신 값만 전달합니까? 예를 들어 유스 케이스를 이해하는 데 도움이 될 수 있습니다. 부목의 이유를 모르겠다. – sascha10000

+0

응용 프로그램은 HFT 도메인에 있고 입력은 시장 시세표의 스트림입니다. 가장 최근의 시세 표시기를 처리 한 후에는 오래된 시장 데이터를 처리 할 필요가 없습니다. 따라서 "주제"또는 "키"는 시세 기호가되고 값은 가격이됩니다. – sdfdfjndfsd

+0

좋아요, 따라서 akka-streams을 통해 스트림을 전처리 한 다음 key : value 쌍에 대한 처리가 동일하면 BalancingPool에 "실제"tickinfos 만 보냅니다. 그렇지 않으면 둘 이상의 액터가 구현 된 경우 먼저 스트림을 필터링 한 다음 처리해야하는 액터를 결정할 수 있습니다. 기본적으로 필자는 RootActor가 필터링을 처리하는 Actor 시스템을 구축 한 다음 더 복잡한 작업을 위해 자식에게 보낼 것입니다. "하나"의 일과 아주 좋은 일을해야합니다. – sascha10000

답변

0

당신이 필요로하는 것은 가장 우선 순위가 높은 메시지입니다. PriorityMailbox의 기본 구현을 살펴보십시오.

그것은 (워드 프로세서의 예에 따라) 같은 것을 볼 수 있습니다

여기 versionToInt
import akka.dispatch.PriorityGenerator 
import akka.dispatch.UnboundedStablePriorityMailbox 
import com.typesafe.config.Config 

type Version = // Long, Date, Timestamp, smth else - must be ordered 
case class MyMessage(key: String, value: String, version: Version) 

class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) 
    extends UnboundedStablePriorityMailbox(
    // Create a new PriorityGenerator, lower prio means more important 
    PriorityGenerator { 
     case MyMessage(_, _, version) => versionToInt(version) 

     // PoisonPill when no other left 
     case PoisonPill => 1 // or Int.MaxValue - 2 

     // We default to 1, which is in between high and low 
     case otherwise  => 0 // or Int.MaxValue - 1 
    }) 

메시지 (높은 우선 순위)의 새 버전 낮은 일부 Int 값을 반환해야합니다을 2 - Inf에 말 범위.

이후에 배우에서 처리 된 가장 높은 버전을 추적하고 해당 버전 이전의 다른 모든 메시지를 삭제할 수 있습니다. 배우를 위해 become 또는 var을 사용하십시오.

보낸 메시지의 순서는 Akka에서 보장되지만 결국 메시지를 보내는 순서와 중간에있는 지연에 따라 달라 지므로 메시지 처리에 영향을 미칩니다.

+0

이 방법을 생각했지만 느리지 않습니까? 모든 액터 인스턴스에서 공유 변수가 필요합니다. 그러면 동기화/원자 값을 사용해야합니다. 나는 Akka를 처음 보았고 모든 것을 완벽하게 이해하지 못합니다. 또한 나는 전체 스트림에서가 아니라 일부 세트 (질문에 대한 설명에서 더 많은 설명) 사이에서만 "고유"한 메시지가 필요합니다. 버전의 공유 된 맵을 사용하여이 접근 방식을 수정할 수는 있지만, 모든 것을 느리게 할 수 있습니다. b) 이전 키를 제거하는 메커니즘이 필요합니다. – sdfdfjndfsd

+0

공유 변수가 없어야합니다. 키마다 액터를 만들어야합니다. 무시하는 메시지는 저렴합니다. 최신 메시지 버전을 나타내는 변경 가능 상태는 액터 내에 안전하게 캡슐화됩니다. JVM 당 수백만 명의 액터에서 잘 작동해야합니다. –

+0

하지만 향후 메시지가 동일한 대상 액터로 라우팅되도록하려면 어떻게해야합니까? 맞춤 라우터를 작성 하시겠습니까? – sdfdfjndfsd