내 ES-CQRS 아키텍처에서 읽기 측을 구현하려고합니다. 이제 나는이 같은 지속적인 배우 있다고 가정 해 봅시다 : 이벤트가 지속됩니다 때Akka 지속 쿼리 이벤트 스트림 및 CQRS
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
를 지금까지 내가 이해, 때마다, 우리는 Akka Persistence Query
통해 게시 할 수 있습니다. 이제는 이러한 이벤트를 구독하는 적절한 방법이 무엇이 될지 모르겠으므로 읽기 전용 데이터베이스에 보관할 수 있습니까? 아이디어 중 하나는 처음에 UsersStream
메시지를 내 읽기 측 배우에게서 UserWrite
배우에게 보내고 그 읽기 액터의 이벤트를 싱크하는 것입니다. @cmbaxter의 제안에 따라
편집
, 나는 팀에게이 방법을 읽어 구현 된 이벤트 스트림이 느린 것 같다 : 같은 몇 가지 문제가 있습니다
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
. 나는. UserRead
액터는 새로 추가 된 사용자가 저장되기 전에 사용자 집합과 함께 응답 할 수 있습니다.
편집 2
는 좀 더 적은 느린 이벤트 스트림으로 문제를 해결 카산드라 쿼리 저널의 새로 고침 간격을 증가했다. Cassandra 이벤트 저널은 기본적으로 3 초마다 폴링됩니다. 내 application.conf
나는 추가 :
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 사실 3
, 새로 고침 간격을 감소하지 않습니다. 메모리 사용량은 증가하지만 위험하지는 않습니다. CQRS의 일반적인 개념은 쓰기 및 읽기 측이 비동기라는 것입니다. 따라서 쓰기 후에는 데이터를 즉시 읽을 수 없습니다. UI 다루기? 난 그냥 스트림을 열고 서버를 통해 데이터를 밀어 푸는 측면을 읽은 후 그들을 인정했다.
난 그냥에'Source'로 메시지를 보내는 대신 읽기 측 돌기 배우로 읽기 저널 기반 코드를 이동합니다. 그런 다음 해당 읽기 측 영사기에서 스트림을 처리하고 해당 정보를 Elasticsearch에 투영합니다. – cmbaxter
@cmbaxter 나는 그것을했다. 그것은 아주 좋은 생각 인 것 같습니다. 나는 여전히 의문을 가지고 있기 때문에 내 질문을 업데이 트하고 여전히 제안을 수락. –