2016-07-07 9 views
7

내 ES-CQRS 아키텍처에서 읽기 측을 구현하려고합니다. 이제 나는이 같은 지속적인 배우 있다고 가정 해 봅시다 : 이벤트가 지속됩니다 때Akka 지속 쿼리 이벤트 스트림 및 CQRS

object UserWrite { 

    sealed trait UserEvent 
    sealed trait State 
    case object Uninitialized extends State 
    case class User(username: String, password: String) extends State 
    case class AddUser(user: User) 
    case class UserAdded(user: User) extends UserEvent 
    case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) 
    case class UsersStream(fromSeqNo: Long) 
    case object GetCurrentUser 

    def props = Props(new UserWrite) 
} 

class UserWrite extends PersistentActor { 

    import UserWrite._ 

    private var currentUser: State = Uninitialized 

    override def persistenceId: String = "user-write" 

    override def receiveRecover: Receive = { 
    case UserAdded(user) => currentUser = user 
    } 

    override def receiveCommand: Receive = { 
    case AddUser(user: User) => persist(UserAdded(user)) { 
     case UserAdded(`user`) => currentUser = user 
    } 
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) 
    case GetCurrentUser => sender() ! currentUser 
    } 

    def publishUserEvents(fromSeqNo: Long) = { 
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    val userEvents = readJournal 
     .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) 
     .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } 
    sender() ! UserEvents(userEvents) 
    } 
} 

를 지금까지 내가 이해, 때마다, 우리는 Akka Persistence Query 통해 게시 할 수 있습니다. 이제는 이러한 이벤트를 구독하는 적절한 방법이 무엇이 될지 모르겠으므로 읽기 전용 데이터베이스에 보관할 수 있습니까? 아이디어 중 하나는 처음에 UsersStream 메시지를 내 읽기 측 배우에게서 UserWrite 배우에게 보내고 그 읽기 액터의 이벤트를 싱크하는 것입니다. @cmbaxter의 제안에 따라

편집

, 나는 팀에게이 방법을 읽어 구현 된 이벤트 스트림이 느린 것 같다 : 같은 몇 가지 문제가 있습니다

object UserRead { 

    case object GetUsers 
    case class GetUserByUsername(username: String) 
    case class LastProcessedEventOffset(seqNo: Long) 
    case object StreamCompleted 

    def props = Props(new UserRead) 
} 

class UserRead extends PersistentActor { 
    import UserRead._ 

    var inMemoryUsers = Set.empty[User] 
    var offset  = 0L 

    override val persistenceId: String = "user-read" 

    override def receiveRecover: Receive = { 
    // Recovery from snapshot will always give us last sequence number 
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo 
    case RecoveryCompleted         => recoveryCompleted() 
    } 

    // After recovery is being completed, events will be projected to UserRead actor 
    def recoveryCompleted(): Unit = { 
    implicit val materializer = ActorMaterializer() 
    PersistenceQuery(context.system) 
     .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
     .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) 
     .map { 
     case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event 
     } 
     .runWith(Sink.actorRef(self, StreamCompleted)) 
    } 

    override def receiveCommand: Receive = { 
    case GetUsers     => sender() ! inMemoryUsers 
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) 
    // Match projected event and update offset 
    case (seqNo: Long, UserAdded(user)) => 
     saveSnapshot(LastProcessedEventOffset(seqNo)) 
     inMemoryUsers += user 
    } 
} 

. 나는. UserRead 액터는 새로 추가 된 사용자가 저장되기 전에 사용자 집합과 함께 응답 할 수 있습니다.

편집 2

는 좀 더 적은 느린 이벤트 스트림으로 문제를 해결 카산드라 쿼리 저널의 새로 고침 간격을 증가했다. Cassandra 이벤트 저널은 기본적으로 3 초마다 폴링됩니다. 내 application.conf 나는 추가 :

cassandra-query-journal { 
    refresh-interval = 20ms 
} 

EDIT 사실 3

, 새로 고침 간격을 감소하지 않습니다. 메모리 사용량은 증가하지만 위험하지는 않습니다. CQRS의 일반적인 개념은 쓰기 및 읽기 측이 비동기라는 것입니다. 따라서 쓰기 후에는 데이터를 즉시 읽을 수 없습니다. UI 다루기? 난 그냥 스트림을 열고 서버를 통해 데이터를 밀어 푸는 측면을 읽은 후 그들을 인정했다.

+2

난 그냥에'Source'로 메시지를 보내는 대신 읽기 측 돌기 배우로 읽기 저널 기반 코드를 이동합니다. 그런 다음 해당 읽기 측 영사기에서 스트림을 처리하고 해당 정보를 Elasticsearch에 투영합니다. – cmbaxter

+0

@cmbaxter 나는 그것을했다. 그것은 아주 좋은 생각 인 것 같습니다. 나는 여전히 의문을 가지고 있기 때문에 내 질문을 업데이 트하고 여전히 제안을 수락. –

답변

4

몇 가지 방법이 있습니다. 예를 들어, 내 응용 프로그램에서 일관되게 변경 사항을 찾고있는 PersistenceQuery 내 쿼리 측면에서 배우가 있지만 동일한 쿼리를 너무 스레드를 가질 수 있습니다. 문제는 그 대신이의

val readJournal = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
    CassandraReadJournal.Identifier) 

// issue query to journal 
val source: Source[EventEnvelope, NotUsed] = 
    readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) 

// materialize stream, consuming events 
implicit val mat = ActorMaterializer() 
source.map(_.event).runForeach{ 
    case userEvent: UserEvent => { 
    doSomething(userEvent) 
    } 
} 

, 당신은 PersistenceQuery 저장 새 이벤트를 제기 타이머를 가질 수 발생 즉시 지속 된 이벤트를 읽을 수 있도록 열려있는 스트림을 유지하는 것입니다,하지만 난 생각 스트림이 열려있는 것은 최선의 방법 PersistenceQuery와 솔루션은 승인했지만

2

, 그것은 다음과 같은 문제가 포함되어 있습니다 : 그것은 부분입니다

  1. 제시 EventEnvelopes을 읽을 수있는 유일한 방법이있다.
  2. 상태 스냅 샷에서는 작동하지 않으므로 CQRS 판독기 부분은 이상의 모든 지속 된 이벤트가 유지되어야합니다.

첫 번째 솔루션은 더 나은이지만, 다음과 같은 문제가 있습니다

  1. 그것은 너무 복잡하다. 사용자가 시퀀스 번호를 불필요하게 처리합니다.
  2. 이 코드는 액터 구현과 결합 된 상태 (쿼리/업데이트)를 다룹니다. 이

하나의 단순한 존재합니다

import akka.NotUsed 
import akka.actor.{Actor, ActorLogging} 
import akka.persistence.query.{EventEnvelope, PersistenceQuery} 
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal} 
import akka.persistence._ 
import akka.stream.ActorMaterializer 
import akka.stream.javadsl.Source 

/** 
    * Created by alexv on 4/26/2017. 
    */ 
class CQRSTest { 

    // User Command, will be transformed to User Event 
    sealed trait UserCommand 
    // User Event 
    // let's assume some conversion from Command to event here 
    case class PersistedEvent(command: UserCommand) extends Serializable 
    // User State, for simplicity assumed that all State will be snapshotted 
    sealed trait State extends Serializable{ 
    def clear(): Unit 
    def updateState(event: PersistedEvent): Unit 
    def validateCommand(command:UserCommand): Boolean 
    def applyShapshot(newState: State): Unit 
    def getShapshot() : State 
    } 
    case class SaveSnapshot() 

    /** 
    * Common code for Both reader and writer 
    * @param state - State 
    */ 
    abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging { 
    override def persistenceId: String = "CQRSPersistenceId" 

    override def preStart(): Unit = { 
     // Since the state is external and not depends to Actor's failure or restarts it should be cleared. 
     state.clear() 
    } 

    override def receiveRecover: Receive = { 
     case event : PersistedEvent => state.updateState(event) 
     case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot) 
     case RecoveryCompleted => onRecoveryCompleted(super.lastSequenceNr) 
    } 

    abstract def onRecoveryCompleted(lastSequenceNr:Long) 
    } 

    class CQRSWriter(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSWriter Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed") 
    } 

    override def receiveCommand: Receive = { 
     case command: UserCommand => 
     if(state.validateCommand(command)) { 
      // Persist events and call state.updateState with each persisted event 
      persistAll(List(PersistedEvent(command)))(state.updateState) 
     } 
     else { 
      log.error("Validation Failed for Command: {}", command) 
     } 
     case SaveSnapshot => saveSnapshot(state.getShapshot()) 
     case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata) 
     case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason) 
    } 
    } 

    class CQRSReader(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSReader Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed, Starting QueryStream") 

     // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests) 
     val readJournal = PersistenceQuery(context.system).readJournalFor(
     context.system.settings.config.getString("akka.persistence.query.my-read-journal")) 
     .asInstanceOf[ReadJournal 
     with EventsByPersistenceIdQuery] 
     val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
     OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue) 
     source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer()) 

    } 

    // Nothing received since it is Reader only 
    override def receiveCommand: Receive = Actor.emptyBehavior 
    } 
} 
+0

가정 된 CQRSRead 파트는 상태를 통해 직접 쿼리해야합니다. CQRSReader는 상태가 CQRSWriter와 비슷하다는 것을 확인합니다. 여기서는 Concrete state를 구현하지 않았지만 간단한 해시 맵에서 인 메모리 그래프 DB까지는 아무 것도 될 수 없다. –