우리는 akka 지속성을 기반으로 내부 CQRS 프레임 워크를 개발하는 동안이 문제에 직면했습니다. 우리의 솔루션은 Persistence Query (https://doc.akka.io/docs/akka/2.5/scala/persistence-query.html)를 사용하는 것이 었습니다. 사용하지 않은 경우 저널 플러그인이 선택적으로 구현할 수있는 쿼리 인터페이스이며 CQRS 시스템에서 읽기 측면으로 사용할 수 있습니다.
테스트 목적으로, 메서드는 eventsByPersistenceId가되어 액터에 의해 지속 된 모든 이벤트가있는 akka 스트림 소스가 제공됩니다. 당신이 스칼라에 참 못생긴 사용되는, 그래서 만약, 위에서 부풀어 보인다
public CompletableFuture<List<Message<?>>> getEventsForIdAsync(String id, FiniteDuration readTimeout) {
return ((EventsByPersistenceIdQuery)readJournal).eventsByPersistenceId(id, 0L, Long.MAX_VALUE)
.takeWithin(readTimeout)
.map(eventEnvelope -> (Message<?>)eventEnvelope.event())
.<List<Message<?>>>runFold(
new ArrayList<Message<?>>(),
(list, event) -> {
list.add(event);
return list;
}, materializer)
.toCompletableFuture();
}
죄송합니다, 우리가 사용하는 Java : 소스는 같은 이벤트의 목록으로 접힐 수있다. 점점 readJournal는만큼 간단하다
ReadJournal readJournal = PersistenceQuery.lookup().get(actorSystem)
.getReadJournalFor(InMemoryReadJournal.class, InMemoryReadJournal.Identifier())
당신은 그것을 테스트에 가장 적합하기 때문에 우리가 akka.persistence.inmemory 플러그인을 사용하여 볼 수 있지만 지속성 쿼리 API가 작동 할 구현하는 플러그인.
우리는 실제로 우리의 프레임 워크 내부의 BDD 같은 테스트 API를 만든, 그래서 일반적인 테스트는 다음과 같습니다
fixture
.given("ID1", event(new AccountCreated("ID1", "John Smith")))
.when(command(new AddAmount("ID1", 2.0)))
.then("ID1", eventApplied(new AmountAdded("ID1", 2.0)))
.test();
보시다시피, 우리는 또한 주어진 절에 이전 이벤트를 설정하는 경우를 처리 잠재적으로 여러 persistenceIds (우리는 ClusterSharding 사용)를 처리합니다.