Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ jobs:
- uses: pnpm/action-setup@v3
with:
version: 8
- uses: unfor19/install-aws-cli-action@v1
with:
version: "2"
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ yarn-error.log
.turbo/
.turbo/*

# Local developer overrides (see common.env.default)
common.env

3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "references/okite-ai"]
path = references/okite-ai
url = git@github.com:j5ik2o/okite-ai.git
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated private git submodule accidentally committed

Medium Severity

A .gitmodules file referencing references/okite-ai via a private SSH URL (git@github.com:j5ik2o/okite-ai.git) is included in this PR but is entirely unrelated to the Lambda RMU / DynamoDB Streams changes. This will cause git clone --recursive (or git submodule update --init) to fail for anyone without SSH access to that private repository, breaking CI or onboarding flows.

Fix in Cursor Fix in Web

2 changes: 1 addition & 1 deletion README.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- [x] Read API Server(GraphQL)
- [x] Read Model Updater on Local
- [x] Docker Compose Support
- [ ] Read Model Updater on AWS Lambda
- [x] Read Model Updater on AWS Lambda(LocalStack 向けビルド/デプロイと `common.env`。詳細は [ビルドとテスト](./docs/BUILD_AND_TEST.ja.md))
- [ ] Deployment to AWS

## 概要
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Please refer to [here](https://github.com/j5ik2o/cqrs-es-example) for implementa
- [x] Read API Server(GraphQL)
- [x] Read Model Updater on Local
- [x] Docker Compose Support
- [ ] Read Model Updater on AWS Lambda
- [x] Read Model Updater on AWS Lambda (LocalStack: build/deploy scripts and `common.env`; see [Build and Test](./docs/BUILD_AND_TEST.md))
- [ ] Deployment to AWS

## Component Composition
Expand Down
23 changes: 22 additions & 1 deletion docs/BUILD_AND_TEST.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,25 @@ docker composeで起動したアプリケーションに対してE2Eテストを
$ pnpm docker-build
$ pnpm docker-compose-up
$ pnpm verify-group-chat # E2E
```
```

## Read Model Updater Lambda(LocalStack)

Rust 版と同様に、Read Model Updater を LocalStack 上の Lambda(DynamoDB ストリーム → Lambda → MySQL)として動かす手順です。

1. リポジトリルートに `common.env.default` を `common.env` としてコピーするか、`docker-compose-up.sh` / `docker-compose-e2e-test.sh` に任せて自動生成する。`common.env` は `.gitignore` されます。
2. ホストから AWS CLI で LocalStack に接続するときは **`http://localhost:34566`** を使います(Compose がコンテナの `4566` をホストの `34566` に公開)。Compose 内のサービスからは `http://localstack:4566` です。
3. デプロイ用 zip(Docker `linux/amd64`、Prisma の Lambda 用バイナリを含む)のビルド:

```shell
$ pnpm build-read-model-updater-lambda
```

成果物は `dist/lambda/read-model-updater/function.zip` です。

4. `pnpm docker-compose-up` および `pnpm docker-compose-e2e-test` は、必要に応じて zip をビルドし、LocalStack 待機後に `deploy-read-model-updater-localstack.sh` を実行し、既定でコンテナ `read-model-updater-1` を停止します(ストリームの二重処理を防ぐため)。`DOCKER_COMPOSE_UP_DEPLOY_LAMBDA=0` や `DOCKER_COMPOSE_UP_BUILD_LAMBDA=0` でスキップできます。E2E 用コンテナは `profile: e2e` とし、スタック起動と Lambda デプロイの後に `verify-group-chat.sh` が走るようにしています。
5. Compose 起動後に手動デプロイする場合:

```shell
$ pnpm deploy-read-model-updater-localstack
```
21 changes: 21 additions & 0 deletions docs/BUILD_AND_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,24 @@ $ pnpm docker-build
$ pnpm docker-compose-up
$ pnpm verify-group-chat # E2E
```

## Read Model Updater Lambda (LocalStack)

This mirrors the Rust example: run the Read Model Updater as an AWS Lambda on LocalStack (DynamoDB stream → Lambda → MySQL).

1. Copy `common.env.default` to `common.env` at the repo root, or let `docker-compose-up.sh` / `docker-compose-e2e-test.sh` create it. `common.env` is gitignored.
Comment thread
cursor[bot] marked this conversation as resolved.
2. From the host, point the AWS CLI at **`http://localhost:34566`** (Compose maps container `4566` to host `34566`). Inside Compose, services use `http://localstack:4566`.
3. Build the deployment zip (Docker `linux/amd64`, Prisma engines for Lambda):

```shell
$ pnpm build-read-model-updater-lambda
```

Output: `dist/lambda/read-model-updater/function.zip`.

4. `pnpm docker-compose-up` and `pnpm docker-compose-e2e-test` can build the zip (unless `DOCKER_COMPOSE_UP_BUILD_LAMBDA=0`), wait for LocalStack, run `deploy-read-model-updater-localstack.sh`, and by default stop the `read-model-updater-1` container to avoid double-processing the stream. Set `DOCKER_COMPOSE_UP_DEPLOY_LAMBDA=0` to skip deploy. The e2e script brings stacks up, deploys Lambda, then runs the `e2e-test` container (`profile: e2e`) so GraphQL is ready before `verify-group-chat.sh` runs.
5. To deploy manually after Compose is up:

```shell
$ pnpm deploy-read-model-updater-localstack
```
5 changes: 5 additions & 0 deletions mise.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tools]
node = "20"
python = "3"
"npm:@fission-ai/openspec" = "1.2.0"
"npm:happy-coder" = "0.13.0"
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"docker-compose-up": "./tools/scripts/docker-compose-up.sh",
"docker-compose-up-db": "./tools/scripts/docker-compose-up.sh -d",
"docker-compose-down": "./tools/scripts/docker-compose-down.sh",
"build-read-model-updater-lambda": "./tools/scripts/build-read-model-updater-lambda.sh",
"deploy-read-model-updater-localstack": "./tools/scripts/deploy-read-model-updater-localstack.sh",
"create-group-chat": "./tools/scripts/curl-create-group-chat.sh",
"verify-group-chat": "./tools/e2e-test/verify-group-chat.sh",
"build": "turbo build",
Expand Down
3 changes: 2 additions & 1 deletion packages/bootstrap/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-bootstrap",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"start": "ts-node src/index.ts",
"start-write-api": "node dist/index.js writeApi",
Expand Down
29 changes: 29 additions & 0 deletions packages/bootstrap/src/lambda-rmu-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { PrismaClient } from "@prisma/client";
import type { DynamoDBStreamEvent, Handler } from "aws-lambda";
import { GroupChatDao, ReadModelUpdater } from "cqrs-es-example-js-rmu";

let prisma: PrismaClient | undefined;
let readModelUpdater: ReadModelUpdater | undefined;

function getReadModelUpdater(): ReadModelUpdater {
if (readModelUpdater) {
return readModelUpdater;
}
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
throw new Error("DATABASE_URL is not set");
}
prisma = new PrismaClient({
datasources: {
db: {
url: databaseUrl,
},
},
});
readModelUpdater = ReadModelUpdater.of(GroupChatDao.of(prisma));
return readModelUpdater;
}

export const handler: Handler<DynamoDBStreamEvent, void> = async (event) => {
await getReadModelUpdater().updateReadModel(event);
};
3 changes: 2 additions & 1 deletion packages/command/domain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-domain",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/group-chat/*.test.*",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
3 changes: 2 additions & 1 deletion packages/command/interface-adaptor-if/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-interface-adaptor-if",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rm -f dist/internal/*.test.d.ts && rm -f dist/internal/*.test.js && rm -fr dist/internal/test",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
3 changes: 2 additions & 1 deletion packages/command/interface-adaptor-impl/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-interface-adaptor-impl",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/repository/group-chat/*.test.*",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
16 changes: 8 additions & 8 deletions packages/command/interface-adaptor-impl/src/graphql/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type { TaskEither } from "fp-ts/TaskEither";
import { pipe } from "fp-ts/function";
import { GraphQLError } from "graphql/error";
import { Arg, Ctx, Mutation, Query, Resolver } from "type-graphql";
import type {
import {
AddMemberInput,
CreateGroupChatInput,
DeleteGroupChatInput,
Expand All @@ -45,7 +45,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async createGroupChat(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: CreateGroupChatInput,
@Arg("input", () => CreateGroupChatInput) input: CreateGroupChatInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatName(input.name),
Expand Down Expand Up @@ -75,7 +75,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async deleteGroupChat(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: DeleteGroupChatInput,
@Arg("input", () => DeleteGroupChatInput) input: DeleteGroupChatInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -105,7 +105,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async renameGroupChat(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: RenameGroupChatInput,
@Arg("input", () => RenameGroupChatInput) input: RenameGroupChatInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -151,7 +151,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async addMember(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: AddMemberInput,
@Arg("input", () => AddMemberInput) input: AddMemberInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -211,7 +211,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async removeMember(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: RemoveMemberInput,
@Arg("input", () => RemoveMemberInput) input: RemoveMemberInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -257,7 +257,7 @@ class GroupChatCommandResolver {
@Mutation(() => MessageOutput)
async postMessage(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: PostMessageInput,
@Arg("input", () => PostMessageInput) input: PostMessageInput,
): Promise<MessageOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -307,7 +307,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async deleteMessage(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: DeleteMessageInput,
@Arg("input", () => DeleteMessageInput) input: DeleteMessageInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down
3 changes: 2 additions & 1 deletion packages/command/processor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-processor",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/group-chat/*.test.*",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
3 changes: 2 additions & 1 deletion packages/infrastructure/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-infrastructure",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"start": "ts-node src/index.ts",
"start-write-api": "node dist/index.js writeApi",
Expand Down
3 changes: 2 additions & 1 deletion packages/query/interface-adaptor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-query-interface-adaptor",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"prisma:generate": "prisma generate",
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/*.test.*",
Expand Down
3 changes: 2 additions & 1 deletion packages/rmu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-rmu",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"prisma:generate": "prisma generate",
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/*.test.*",
Expand Down
3 changes: 2 additions & 1 deletion packages/rmu/prisma/schema.prisma
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
generator client {
provider = "prisma-client-js"
provider = "prisma-client-js"
binaryTargets = ["native", "rhel-openssl-3.0.x", "rhel-openssl-1.0.x"]
}

datasource db {
Expand Down
19 changes: 13 additions & 6 deletions packages/rmu/src/update-read-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,22 @@ class ReadModelUpdater {
this.logger.warn("No NewImage");
return;
}
const base64EncodedPayload = attributeValues.payload.B;
if (!base64EncodedPayload) {
const rawPayload = attributeValues.payload.B;
if (!rawPayload) {
this.logger.warn("No payload");
return;
}
const payload = this.decoder.decode(
new Uint8Array(base64EncodedPayload.split(",").map(Number)),
);
const payloadJson = JSON.parse(payload);
let payloadJson: unknown;
try {
// LocalStack: B フィールドに生 JSON 文字列が入る
payloadJson = JSON.parse(rawPayload);
} catch {
// AWS Lambda: Base64, ローカル RMU: カンマ区切り数値文字列
const payloadBytes = /^\d+(,\d+)*$/.test(rawPayload)
? new Uint8Array(rawPayload.split(",").map(Number))
: new Uint8Array(Buffer.from(rawPayload, "base64"));
payloadJson = JSON.parse(this.decoder.decode(payloadBytes));
}
const groupChatEvent = convertJSONToGroupChatEvent(payloadJson);
switch (groupChatEvent.symbol) {
case GroupChatCreatedTypeSymbol: {
Expand Down
1 change: 1 addition & 0 deletions references/okite-ai
Submodule okite-ai added at 69fa7c
22 changes: 8 additions & 14 deletions tools/docker-compose/docker-compose-databases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,25 @@ services:
- mysql
localstack:
image: localstack/localstack:2.3.2
container_name: localstack-main
hostname: localstack-local
ports:
- "34566:4566"
- "34571:4571"
- "${PORT_WEB_UI-38083}:${PORT_WEB_UI-8080}"
environment:
DEBUG: 1
LOCALSTACK_API_KEY: ${LOCALSTACK_API_KEY- }
PORT_WEB_UI: ${PORT_WEB_UI- }
PARITY_AWS_ACCESS_KEY_ID: 1
EAGER_SERVICE_LOADING: 1
SERVICES: cloudwatch,dynamodb,dynamodbstreams
# SERVICES: cloudwatch,dynamodb,dynamodbstreams,lambda
SERVICES: cloudwatch,dynamodb,dynamodbstreams,lambda,iam
LAMBDA_DOCKER_NETWORK: ${LAMBDA_DOCKER_NETWORK:-cqrs-es-example-js_default}
MAIN_CONTAINER_NAME: localstack-main
HOSTNAME_EXTERNAL: localstack-local
DEFAULT_REGION: ap-northeast-1
DYNAMODB_SHARE_DB: 1
DYNAMODB_IN_MEMORY: 1
# LAMBDA_PREBUILD_IMAGES: 1
# LAMBDA_EXECUTOR: docker
# LAMBDA_RUNTIME_EXECUTOR: docker
# LAMBDA_REMOTE_DOCKER: true
# LAMBDA_REMOVE_CONTAINERS: true
# DATA_DIR: /tmp/localstack/data
# DOCKER_HOST: unix:///var/run/docker.sock
# LAMBDA_DOCKER_FLAGS: --platform linux/amd64
# privileged: true
LAMBDA_EXECUTOR: docker
LAMBDA_REMOVE_CONTAINERS: 1
LAMBDA_DOCKER_FLAGS: --platform linux/amd64
volumes:
- "${TMPDIR:-/tmp/localstack}:/tmp/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
Expand All @@ -81,6 +74,7 @@ services:
environment:
AWS_ACCESS_KEY_ID: x
AWS_SECRET_ACCESS_KEY: x
AWS_REGION: ap-northeast-1
AWS_DEFAULT_REGION: ap-northeast-1
DYNAMODB_ENDPOINT: localstack:4566
JOURNAL_TABLE_NAME: journal
Expand Down
6 changes: 6 additions & 0 deletions tools/docker-compose/docker-compose-e2e-test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
version: '3.6'
services:
e2e-test:
profiles:
- e2e
image: e2e-test-js:latest
# イメージ内の焼き込みではなく、常にリポジトリ上のスクリプトを使う(待機ロジック等の変更が即反映される)
volumes:
- ../e2e-test/verify-group-chat.sh:/verify-group-chat.sh:ro
entrypoint: ["bash", "/verify-group-chat.sh"]
environment:
WRITE_API_SERVER_BASE_URL: http://write-api-server-1:3000
READ_API_SERVER_BASE_URL: http://read-api-server-1:3000
Expand Down
Loading
Loading