このライブラリ(j5ik2o/event-store-adapter-javaのScalaラッパー)は、DynamoDBをCQRS/Event Sourcing用のEvent Storeにするためのものです。
以下をbuild.sbtに追加してください。
val version = "..."
libraryDependencies += Seq(
"com.github.j5ik2o" %% "event-store-adapter-scala" % version,
)EventStoreを使えば、Event Sourcing対応リポジトリを簡単に実装できます。
class UserAccountRepositoryAsync(
eventStoreAsync: EventStoreAsync[UserAccountId, UserAccount, UserAccountEvent]
) {
// イベントを追記するだけならこちら。versionにはスナップショットテーブルの現在のversionを指定してください。
def store(userAccountEvent: UserAccountEvent, version: Long)
(implicit ec: ExecutionContext): Future[Unit] =
eventStoreAsync.persistEvent(userAccountEvent, version)
// スナップショットとイベントを同時に書き込む場合はこちら。
def store(userAccountEvent: UserAccountEvent, userAccount: UserAccount)
(implicit ec: ExecutionContext): Future[Unit] =
eventStoreAsync.persistEventAndSnapshot(userAccountEvent, userAccount)
// 最新のスナップショット+差分イベントを読み込み、最新の集約を取得します。
def findById(id: UserAccountId)
(implicit ec: ExecutionContext): Future[Option[UserAccount]] = {
eventStoreAsync.getLatestSnapshotById(classOf[UserAccount], id).flatMap {
case Some(userAccount) =>
eventStoreAsync
.getEventsByIdSinceSequenceNumber(
classOf[UserAccountEvent], id, userAccount.sequenceNumber + 1).map { events =>
Some(UserAccount.replay(events, userAccount))
}
case None =>
Future.successful(None)
}
}
}以下はリポジトリの使用例です。
val eventStore = EventStoreAsync.ofDynamoDB[UserAccountId, UserAccount, UserAccountEvent](
dynamodbClient,
journalTableName,
snapshotTableName,
journalAidIndexName,
snapshotAidIndexName,
32
)
val repository = new UserAccountRepositoryAsync(eventStore)
val id = UserAccountId(UUID.randomUUID().toString)
val (aggregate, event) = UserAccount.create(id, "test-1")
val result = for {
_ <- repository.store(event, aggregate)
aggregate <- repository.findById(id)
} yield aggregatedocs/DATABASE_SCHEMA.ja.mdを参照してください。
MITライセンスです。詳細はLICENSEを参照してください。