당신이 필요로하는 것은 가장 우선 순위가 높은 메시지입니다. PriorityMailbox의 기본 구현을 살펴보십시오.
그것은 (워드 프로세서의 예에 따라) 같은 것을 볼 수 있습니다
여기
versionToInt
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config
type Version = // Long, Date, Timestamp, smth else - must be ordered
case class MyMessage(key: String, value: String, version: Version)
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedStablePriorityMailbox(
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator {
case MyMessage(_, _, version) => versionToInt(version)
// PoisonPill when no other left
case PoisonPill => 1 // or Int.MaxValue - 2
// We default to 1, which is in between high and low
case otherwise => 0 // or Int.MaxValue - 1
})
메시지 (높은 우선 순위)의 새 버전 낮은 일부 Int
값을 반환해야합니다을 2 - Inf
에 말 범위.
이후에 배우에서 처리 된 가장 높은 버전을 추적하고 해당 버전 이전의 다른 모든 메시지를 삭제할 수 있습니다. 배우를 위해 become
또는 var
을 사용하십시오.
보낸 메시지의 순서는 Akka에서 보장되지만 결국 메시지를 보내는 순서와 중간에있는 지연에 따라 달라 지므로 메시지 처리에 영향을 미칩니다.
왜 사전 필터링을 사용하지 않고 최신 값만 전달합니까? 예를 들어 유스 케이스를 이해하는 데 도움이 될 수 있습니다. 부목의 이유를 모르겠다. – sascha10000
응용 프로그램은 HFT 도메인에 있고 입력은 시장 시세표의 스트림입니다. 가장 최근의 시세 표시기를 처리 한 후에는 오래된 시장 데이터를 처리 할 필요가 없습니다. 따라서 "주제"또는 "키"는 시세 기호가되고 값은 가격이됩니다. – sdfdfjndfsd
좋아요, 따라서 akka-streams을 통해 스트림을 전처리 한 다음 key : value 쌍에 대한 처리가 동일하면 BalancingPool에 "실제"tickinfos 만 보냅니다. 그렇지 않으면 둘 이상의 액터가 구현 된 경우 먼저 스트림을 필터링 한 다음 처리해야하는 액터를 결정할 수 있습니다. 기본적으로 필자는 RootActor가 필터링을 처리하는 Actor 시스템을 구축 한 다음 더 복잡한 작업을 위해 자식에게 보낼 것입니다. "하나"의 일과 아주 좋은 일을해야합니다. – sascha10000