Apache Pekkoを使用したCQRS(Command Query Responsibility Segregation)とEvent Sourcingの実践的なサンプル実装です。
- 完全なCQRS/ESアーキテクチャ: コマンド側とクエリ側を完全に分離
- イベント駆動: DynamoDB Streamsを使用した非同期イベント処理
- ローカル開発環境: LocalStackを使用したAWSサービスのローカルエミュレーション
- GraphQL API: コマンド側・クエリ側の両方でGraphQL APIを提供
- 自動化テスト: E2Eテストスクリプトによる完全なフロー検証
graph TB
subgraph "Command Side"
CommandAPI["Command API<br/>(GraphQL)<br/>Port: 50501"]
PekkoActors["Pekko Actors<br/>(Event Sourced)"]
DynamoDB["DynamoDB<br/>(Event Store)<br/>(LocalStack)"]
end
subgraph "Query Side"
QueryAPI["Query API<br/>(GraphQL)<br/>Port: 50502"]
SlickDAOs["Slick DAOs<br/>(Read Model)"]
PostgreSQL["PostgreSQL<br/>(Read Model)"]
end
subgraph "Event Processing"
Lambda["Lambda<br/>(Read Model Updater)"]
Streams["DynamoDB Streams"]
end
CommandAPI -->|Mutation| PekkoActors
PekkoActors -->|Events| DynamoDB
DynamoDB -->|Stream| Streams
Streams -->|Trigger| Lambda
Lambda -->|Update| PostgreSQL
QueryAPI -->|Query| SlickDAOs
SlickDAOs -->|Read| PostgreSQL
style CommandAPI fill:#e1f5ff
style QueryAPI fill:#e1f5ff
style DynamoDB fill:#fff4e1
style PostgreSQL fill:#fff4e1
style Lambda fill:#f0e1ff
- コマンド受付: GraphQL Mutationでコマンドを受け取る(例:
createUserAccount) - イベント生成: Pekkoアクターがドメインロジックを実行し、イベントを生成
- イベント永続化: イベントをDynamoDB(Event Store)に保存
- イベント配信: DynamoDB Streamsがイベントを検知
- Read Model更新: Lambda関数がイベントを処理し、PostgreSQLを更新
- クエリ実行: GraphQL Queryでデータを取得(例:
getUserAccounts)
- 言語: Scala 3.6.2
- ビルドツール: SBT 1.10.6
- アクターフレームワーク: Apache Pekko 1.1.2 (型付きアクター、永続化、クラスター)
- イベントストア: DynamoDB (Pekko Persistence + LocalStack)
- Read Model: PostgreSQL 16 + Slick 3.5.2
- API: GraphQL (Sangria 4.2.4)
- シリアライゼーション: Protocol Buffers (ScalaPB + pekko-protobuf-v3)
- イベント処理基盤: AWS Lambda (LocalStack)
- 非同期処理: DynamoDB Streams
- ローカルAWS環境: LocalStack 3.x
- コンテナ: Docker & Docker Compose
- Java: OpenJDK 17以降
- Docker & Docker Compose: LocalStack、PostgreSQL、DynamoDBの実行に必要
- Java: OpenJDK 17以降(推奨: OpenJDK 21)
- SBT: 1.8以降
- awslocal CLI: LocalStackとの対話に使用(オプション)
# awslocal のインストール(オプション)
pip install awscli-localgit clone https://github.com/j5ik2o/pekko-cqrs-es-example.git
cd pekko-cqrs-es-example# SBTでプロジェクトをビルド
sbt compile
# Dockerイメージをビルド(Command API、Query API、Read Model Updater)
sbt dockerBuildAll# LocalStack、PostgreSQL、DynamoDBを起動し、初期設定を実行
./scripts/run-single.sh up
# 内部で以下が実行されます:
# - docker-compose でインフラ起動
# - DynamoDBテーブル作成
# - PostgreSQLマイグレーション実行
# - Lambda関数のデプロイ
# - Command API、Query APIの起動起動完了後、以下のサービスが利用可能になります:
- Command API (GraphQL): http://localhost:50501/api/graphql
- Command API Playground: http://localhost:50501/api/playground
- Query API (GraphQL): http://localhost:50502/api/graphql
- Query API Playground: http://localhost:50502/api/playground
- LocalStack: http://localhost:4566
- PostgreSQL: localhost:5432
# 完全なCQRS/ESフローをテスト
./scripts/test-e2e.sh
# 出力例:
# === Health Check ===
# ✓ Command API is healthy
# ✓ Query API is healthy
# === Step 1: Create UserAccount via GraphQL Mutation ===
# ✓ UserAccount created successfully!
# === Step 2: Wait for Event Processing ===
# ✓ Event processing time elapsed
# === Step 3: Query UserAccount via GraphQL ===
# ✓ UserAccount found via GraphQL!
# ✓ End-to-End test completed successfully!# アプリケーションとインフラを停止
./scripts/run-single.sh down
# 全てのデータを削除(ボリューム含む)
./scripts/run-single.sh down --volumesブラウザでGraphQL Playgroundを開きます:
Playgroundで以下のMutationを実行:
mutation CreateUserAccount($input: CreateUserAccountInput!) {
createUserAccount(input: $input) {
id
}
}Variables:
{
"input": {
"firstName": "太郎",
"lastName": "山田",
"emailAddress": "yamada@example.com"
}
}レスポンス例:
{
"data": {
"createUserAccount": {
"id": "01KAAM3Q5PVKKWW1ZSEH6A68FT"
}
}
}数秒待ってから、Query API Playgroundで実行:
# 全ユーザーの取得
{
getUserAccounts {
id
firstName
lastName
fullName
createdAt
updatedAt
}
}
# 特定ユーザーの取得
query GetUserAccount($id: String!) {
getUserAccount(userAccountId: $id) {
id
firstName
lastName
fullName
createdAt
updatedAt
}
}
# ユーザー検索
query SearchUsers($term: String!) {
searchUserAccounts(searchTerm: $term) {
id
firstName
lastName
fullName
}
}mutation RenameUserAccount($input: RenameUserAccountInput!) {
renameUserAccount(input: $input) {
id
}
}Variables:
{
"input": {
"userAccountId": "01KAAM3Q5PVKKWW1ZSEH6A68FT",
"firstName": "次郎",
"lastName": "田中"
}
}mutation DeleteUserAccount($input: DeleteUserAccountInput!) {
deleteUserAccount(input: $input) {
id
}
}Variables:
{
"input": {
"userAccountId": "01KAAM3Q5PVKKWW1ZSEH6A68FT"
}
}curl -X POST http://localhost:50501/api/graphql \
-H "Content-Type: application/json" \
-d '{
"query": "mutation CreateUserAccount($input: CreateUserAccountInput!) { createUserAccount(input: $input) { id } }",
"variables": {
"input": {
"firstName": "太郎",
"lastName": "山田",
"emailAddress": "yamada@example.com"
}
}
}'curl -X POST http://localhost:50502/api/graphql \
-H "Content-Type: application/json" \
-d '{
"query": "{ getUserAccounts { id firstName lastName fullName createdAt updatedAt } }"
}'curl -X POST http://localhost:50502/api/graphql \
-H "Content-Type: application/json" \
-d '{
"query": "query GetUserAccount($id: String!) { getUserAccount(userAccountId: $id) { id firstName lastName fullName } }",
"variables": {
"id": "01KAAM3Q5PVKKWW1ZSEH6A68FT"
}
}'pekko-cqrs-es-example/
├── apps/
│ ├── command-api/ # コマンド側HTTP/GraphQLサーバー
│ │ └── src/main/
│ │ ├── resources/
│ │ │ └── application.conf # Command API設定
│ │ └── scala/
│ │ └── CommandApiMain.scala
│ ├── query-api/ # クエリ側GraphQLサーバー
│ │ └── src/main/
│ │ ├── resources/
│ │ │ └── application.conf # Query API設定
│ │ └── scala/
│ │ └── QueryApiMain.scala
│ └── read-model-updater/ # Lambda関数(イベント→Read Model更新)
│ └── src/main/
│ ├── resources/
│ │ └── application.conf # Lambda設定
│ └── scala/
│ └── LambdaHandler.scala
├── modules/
│ ├── command/ # コマンド側モジュール
│ │ ├── domain/ # ドメインエンティティ、値オブジェクト、イベント
│ │ │ └── src/main/scala/
│ │ │ └── users/
│ │ │ ├── UserAccount.scala # 集約ルート
│ │ │ ├── UserAccountEvent.scala # ドメインイベント
│ │ │ └── UserAccountId.scala # 値オブジェクト
│ │ ├── use-case/ # アプリケーションサービス
│ │ └── interface-adapter/ # Pekkoアクター、永続化、GraphQLエンドポイント
│ │ └── src/main/
│ │ ├── protobuf/ # Protocol Buffer定義
│ │ └── scala/
│ │ └── aggregate/
│ │ └── UserAccountAggregateActor.scala
│ ├── query/ # クエリ側モジュール
│ │ ├── interface-adapter/ # GraphQL API、Slick DAO
│ │ │ └── src/main/scala/
│ │ │ ├── dao/ # Slick DAOs(自動生成)
│ │ │ └── graphql/ # GraphQL Schema & Resolvers
│ │ └── flyway-migration/ # データベースマイグレーション
│ │ └── src/main/resources/db/migration/
│ └── infrastructure/ # 共有インフラコード
│ └── src/main/scala/
│ └── serialization/ # カスタムシリアライザ
├── scripts/
│ ├── run-single.sh # シングルノードモード起動スクリプト
│ ├── test-e2e.sh # E2Eテストスクリプト
│ └── test-graphql.sh # GraphQLテストスクリプト
├── tools/
│ └── dynamodb-setup/ # DynamoDBテーブル定義とセットアップ
│ ├── Makefile
│ └── tables.tf
├── docker-compose-common.yml # 共通インフラ定義
├── docker-compose-single.yml # シングルノードモード定義
├── build.sbt # SBTビルド定義
└── CLAUDE.md # Claude Code向けプロジェクトガイド
modules/command/domain/src/main/scala/users/UserAccountEvent.scalaにイベントを追加modules/command/interface-adapter-contract/src/main/protobuf/にProtobuf定義を追加sbt compileでProtobufコードを生成- シリアライザを更新(必要に応じて)
コマンド側(Mutation):
modules/command/interface-adapter/src/main/scala/graphql/にスキーマとリゾルバを追加
クエリ側(Query):
modules/query/flyway-migration/src/main/resources/db/migration/にマイグレーションを追加sbt migrateQueryでマイグレーション実行sbt "queryInterfaceAdapter/generateAllWithDb"でDAOを再生成modules/query/interface-adapter/src/main/scala/graphql/にスキーマとリゾルバを追加
# コードフォーマット
sbt fmt
# フォーマットとリントのチェック
sbt lint
# コンパイル
sbt compile
# テスト実行
sbt test
# カバレッジ付きテスト
sbt testCoverage# マイグレーション実行
sbt migrateQuery
# マイグレーション情報表示
sbt infoQuery
# マイグレーション検証
sbt validateQuery
# クリーン後マイグレーション
sbt cleanMigrateQuery
# DAO生成(テーブル定義から自動生成)
sbt "queryInterfaceAdapter/generateAllWithDb"# コマンドドメインのテスト
sbt "commandDomain/test"
# クエリインターフェースアダプターのテスト
sbt "queryInterfaceAdapter/test"
# 特定のテストクラスのみ実行
sbt "testOnly io.github.j5ik2o.pcqrses.domain.users.UserAccountSpec"# 全テスト実行
sbt test
# カバレッジレポート生成
sbt testCoverage# 完全なCQRS/ESフローをテスト
./scripts/test-e2e.shE2Eテストスクリプトは以下を自動実行します:
- ヘルスチェック: Command APIとQuery APIの稼働確認
- ユーザー作成: GraphQL Mutationでユーザーアカウント作成
- イベント処理待機: Lambda関数がイベントを処理するまで待機
- データ取得: GraphQL Queryでデータ取得を試行(リトライ機能付き)
- 整合性検証: 作成したデータが正しく取得できることを確認
環境変数でテストの動作をカスタマイズできます:
# リトライ回数とタイムアウトのカスタマイズ
E2E_MAX_RETRIES=15 \
E2E_RETRY_DELAY=5 \
E2E_WAIT_AFTER_CREATE=10 \
./scripts/test-e2e.sh
# 別ホストでテスト
COMMAND_API_HOST=192.168.1.100 \
QUERY_API_HOST=192.168.1.100 \
./scripts/test-e2e.sh# GraphQL APIの基本動作テスト
./scripts/test-graphql.sh# LocalStackのログを確認
docker logs localstack
# LocalStackを再起動
docker-compose -f docker-compose-common.yml restart localstack
# ヘルスチェック
curl http://localhost:4566/_localstack/health# Lambda関数のログを確認(CloudWatch Logs)
awslocal logs tail /aws/lambda/read-model-updater --follow
# DynamoDB Streamsの設定を確認
awslocal dynamodbstreams list-streams
# Lambda関数のイベントソースマッピングを確認
awslocal lambda list-event-source-mappings# PostgreSQLコンテナのログを確認
docker logs postgres
# 接続テスト
psql -h localhost -p 5432 -U postgres -d postgres
# マイグレーション状態を確認
sbt infoQuery# テーブルの存在確認
awslocal dynamodb list-tables
# テーブルの内容確認
awslocal dynamodb scan --table-name Journal
# テーブル定義の確認
awslocal dynamodb describe-table --table-name Journal# クリーンビルド
sbt clean compile
# 依存関係の更新
sbt update
# Protobufコードの再生成
sbt clean compile
# Dockerイメージの再ビルド
sbt clean dockerBuildAll# 詳細ログでテストを実行
bash -x ./scripts/test-e2e.sh
# 待機時間を延長してテスト
E2E_WAIT_AFTER_CREATE=15 E2E_MAX_RETRIES=20 ./scripts/test-e2e.sh
# 各サービスの状態を確認
curl http://localhost:50501/api/graphql # Command API
curl http://localhost:50502/api/graphql # Query API
docker ps # 全コンテナの状態# ポートを使用しているプロセスを確認
lsof -i :50501 # Command API
lsof -i :50502 # Query API
lsof -i :4566 # LocalStack
lsof -i :5432 # PostgreSQL
# プロセスを終了
kill -9 <PID>
# または全て停止してクリーンスタート
./scripts/run-single.sh down
./scripts/run-single.sh upこのプロジェクトでは、全ての状態変更をイベントとして記録します:
- コマンド受信:
CreateUserAccount - ドメインロジック実行:
UserAccount集約でビジネスルールを検証 - イベント生成:
UserAccountEvent.Created_V1 - イベント永続化: DynamoDBに保存(Pekko Persistenceが
PersistentReprでラップ) - 状態復元: 過去のイベントを再生して現在の状態を復元可能
コマンド側(書き込み):
- Pekko型付きアクターで集約を実装
- イベントソーシングで状態を管理
- DynamoDBをイベントストアとして使用
- GraphQL Mutationでコマンドを受け付け
クエリ側(読み取り):
- PostgreSQLに非正規化されたRead Modelを構築
- Slick DAOで高速なクエリを実現
- GraphQL Queryでデータを提供
- Lambda関数でイベントからRead Modelを非同期更新
- イベント検知: DynamoDB StreamsがJournalテーブルの変更を検知
- Lambda起動: read-model-updaterが起動
- デシリアライズ:
PersistentReprをデシリアライズ(Pekkoの内部構造)- ペイロードから実際のイベント(
UserAccountEvent)を取り出し
- Read Model更新: PostgreSQLのuser_accountsテーブルを更新
- クエリ可能: 更新されたデータをQuery APIで取得可能
イベントとスナップショットはProtocol Buffersでシリアライズされます:
- 定義:
modules/command/interface-adapter-contract/src/main/protobuf/ - 生成:
sbt compileでScalaコードを自動生成 - 使用: Pekko Persistenceのカスタムシリアライザで使用
- バージョニング: イベントスキーマの進化に対応(例:
Created_V1)
- Pekko Cluster: 複数ノードでアクターを分散(
run-cluster.shで実行) - Cluster Sharding: エンティティIDでアクターを自動分散
- イベント処理: Lambda関数は自動的にスケール
- Read Model: クエリ用に最適化されたスキーマ設計
- イベントスナップショット: 大量イベントからの状態復元を高速化
- 接続プール: Slick/HikariCPで効率的なDB接続管理
- 非同期処理: イベント処理は完全に非同期
このサンプルプロジェクトには以下のセキュリティ機能は含まれていません:
- 認証・認可
- API レート制限
- 入力バリデーション(最小限のみ)
- 暗号化(転送時・保管時)
本番環境では、これらのセキュリティ対策を必ず実装してください。
LICENSEファイルを参照してください。
プルリクエストを歓迎します。大きな変更の場合は、まずIssueを開いて変更内容を議論してください。
問題が発生した場合は、GitHubのIssueを作成してください。