2017-04-02 3 views
2

Akka 지속 쿼리를 이해하는 데 문제가 있습니다. 특히 예상대로 작동하지 않으므로 eventsByTag 메서드가 문제가됩니다.Akka Persisence Query Read Journal이 내 이벤트를 검색하지 않는 이유는 무엇입니까?

내 메인 클래스에서 특정 태그로 유지되는 모든 이벤트를 수신하기 시작하는 클래스를 호출합니다.

class CassandraJournal(implicit val system: ActorSystem) { 

def engageStreaming = { 
    val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    implicit val mat = ActorMaterializer() 

    readJournal.eventsByTag("account", Offset.noOffset) 
    .runForeach { event => println(event) } 
    } 
} 

내가 내 서버를 시작할 때마다 내 이벤트 저장소가 비어 내가 (Akka HTTP 내장 HTTP 서비스를 호출하여) 내 첫 번째 이벤트를 지속, 이벤트는 참으로 인쇄됩니다. 그러나 서버를 다시 시작하고 이미 이벤트 저장소에 이벤트가 있으면 새 지속 이벤트가 인쇄되지 않습니다.

설명이 있습니까? 왜 이런 일이 일어나고 있는지 파악하기가 힘듭니다. 내가 사용

편집

이벤트 저장소는 카산드라입니다. 여기 receiveRecover가 필요한 상태 복구 작업을하고 있지 않은 상태

class Account(id: UUID) extends PersistentActor { 

    override def receiveRecover: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     println("Creating checkings account") 
    } 

    override def receiveCommand: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event => 
     val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated] 
     sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString) 
     } 

    } 

    def updateState(evt: Event): Unit = { 
    } 

    override def persistenceId: String = s"account-$id" 
} 
+0

이벤트 어댑터, 영구 액터 및 지속성을 위해 사용하는 데이터 저장소로 업데이트 질문. –

답변

2

(난 그냥 태그() 주위를 감싸, 이벤트에 태그를 이벤트 어댑터를 사용하고 있지 않다)이 PersistentActor이다, 지속성이 제대로 작동하지 않을 것입니다. 기본 상태 복구 로직을 receiveRecover에 두는 것이 좋습니다. 또한 updateState 메소드가 태그가있는 이벤트 사례를 처리하도록하십시오.

다음과 유사한 상태 복구 논리가있는 응용 프로그램에서 eventsByTag을 사용했는데 처음 시작과 복구시 모두 정상적으로 작동했습니다.

def updateState(e: Any): Unit = e match { 
    case evt: Event => 
    state = state.updated(evt) 
    case Tagged(evt: Event, _) => 
    state = state.updated(evt) 
} 

... 

override def receiveRecover: Receive = { 
    case evt: Event => updateState(evt) 
    case taggedEvt: Tagged => updateState(taggedEvt) 
} 
+0

실제로 적절한 복구 작업을 설정하여 작동시킬 수있었습니다. 회복이 새로운 출발을 할 것이라는 사실을 알지 못했습니다. 당신의 도움을 주셔서 감사합니다! –