Akka 지속 쿼리를 이해하는 데 문제가 있습니다. 특히 예상대로 작동하지 않으므로 eventsByTag 메서드가 문제가됩니다.Akka Persisence Query Read Journal이 내 이벤트를 검색하지 않는 이유는 무엇입니까?
내 메인 클래스에서 특정 태그로 유지되는 모든 이벤트를 수신하기 시작하는 클래스를 호출합니다.
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
내가 내 서버를 시작할 때마다 내 이벤트 저장소가 비어 내가 (Akka HTTP 내장 HTTP 서비스를 호출하여) 내 첫 번째 이벤트를 지속, 이벤트는 참으로 인쇄됩니다. 그러나 서버를 다시 시작하고 이미 이벤트 저장소에 이벤트가 있으면 새 지속 이벤트가 인쇄되지 않습니다.
설명이 있습니까? 왜 이런 일이 일어나고 있는지 파악하기가 힘듭니다. 내가 사용
편집
이벤트 저장소는 카산드라입니다. 여기 receiveRecover
가 필요한 상태 복구 작업을하고 있지 않은 상태
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
이벤트 어댑터, 영구 액터 및 지속성을 위해 사용하는 데이터 저장소로 업데이트 질문. –