Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ jobs:
- uses: pnpm/action-setup@v3
with:
version: 8
- uses: unfor19/install-aws-cli-action@v1
with:
version: "2"
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ yarn-error.log
.turbo/
.turbo/*

# Local developer overrides (see common.env.default)
common.env

3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "references/okite-ai"]
path = references/okite-ai
url = git@github.com:j5ik2o/okite-ai.git
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated private git submodule accidentally committed

Medium Severity

A .gitmodules file referencing references/okite-ai via a private SSH URL (git@github.com:j5ik2o/okite-ai.git) is included in this PR but is entirely unrelated to the Lambda RMU / DynamoDB Streams changes. This will cause git clone --recursive (or git submodule update --init) to fail for anyone without SSH access to that private repository, breaking CI or onboarding flows.

Fix in Cursor Fix in Web

2 changes: 1 addition & 1 deletion README.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- [x] Read API Server(GraphQL)
- [x] Read Model Updater on Local
- [x] Docker Compose Support
- [ ] Read Model Updater on AWS Lambda
- [x] Read Model Updater on AWS Lambda(LocalStack 向けビルド/デプロイと `common.env`。詳細は [ビルドとテスト](./docs/BUILD_AND_TEST.ja.md))
- [ ] Deployment to AWS

## 概要
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Please refer to [here](https://github.com/j5ik2o/cqrs-es-example) for implementa
- [x] Read API Server(GraphQL)
- [x] Read Model Updater on Local
- [x] Docker Compose Support
- [ ] Read Model Updater on AWS Lambda
- [x] Read Model Updater on AWS Lambda (LocalStack: build/deploy scripts and `common.env`; see [Build and Test](./docs/BUILD_AND_TEST.md))
- [ ] Deployment to AWS

## Component Composition
Expand Down
29 changes: 29 additions & 0 deletions common.env.default
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copy to common.env and adjust. common.env is gitignored.
# Host-side AWS CLI uses this URL (Compose maps LocalStack to host port 34566).
LOCALSTACK_ENDPOINT_URL=http://localhost:34566

AWS_REGION=ap-northeast-1
AWS_DEFAULT_REGION=ap-northeast-1
AWS_ACCESS_KEY_ID=x
AWS_SECRET_ACCESS_KEY=x

STREAM_JOURNAL_TABLE_NAME=journal
STREAM_MAX_ITEM_COUNT=32

# Lambda container reaches MySQL on the Compose network (see docker-compose-databases.yml).
DATABASE_URL_LAMBDA=mysql://ceer:ceer@mysql-local:3306/ceer

FUNCTION_NAME=read-model-updater-rmu
# LocalStack 2.x では nodejs20.x が無い場合があるため既定は nodejs18.x
LAMBDA_RUNTIME=nodejs18.x
COMPOSE_PROJECT=cqrs-es-example-js

# 1 = after deploy, stop container read-model-updater-1 to avoid double stream processing
STOP_LOCAL_RMU_AFTER_LAMBDA_DEPLOY=1

# docker-compose-up.sh: build/deploy Lambda zip when non-zero
DOCKER_COMPOSE_UP_BUILD_LAMBDA=1
DOCKER_COMPOSE_UP_DEPLOY_LAMBDA=1

# Override if your compose project name differs (affects LAMBDA_DOCKER_NETWORK default in compose).
# LAMBDA_DOCKER_NETWORK=cqrs-es-example-js_default
23 changes: 22 additions & 1 deletion docs/BUILD_AND_TEST.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,25 @@ docker composeで起動したアプリケーションに対してE2Eテストを
$ pnpm docker-build
$ pnpm docker-compose-up
$ pnpm verify-group-chat # E2E
```
```

## Read Model Updater Lambda(LocalStack)

Rust 版と同様に、Read Model Updater を LocalStack 上の Lambda(DynamoDB ストリーム → Lambda → MySQL)として動かす手順です。

1. リポジトリルートに `common.env.default` を `common.env` としてコピーするか、`docker-compose-up.sh` / `docker-compose-e2e-test.sh` に任せて自動生成する。`common.env` は `.gitignore` されます。
2. ホストから AWS CLI で LocalStack に接続するときは **`http://localhost:34566`** を使います(Compose がコンテナの `4566` をホストの `34566` に公開)。Compose 内のサービスからは `http://localstack:4566` です。
3. デプロイ用 zip(Docker `linux/amd64`、Prisma の Lambda 用バイナリを含む)のビルド:

```shell
$ pnpm build-read-model-updater-lambda
```

成果物は `dist/lambda/read-model-updater/function.zip` です。

4. `pnpm docker-compose-up` および `pnpm docker-compose-e2e-test` は、必要に応じて zip をビルドし、LocalStack 待機後に `deploy-read-model-updater-localstack.sh` を実行し、既定でコンテナ `read-model-updater-1` を停止します(ストリームの二重処理を防ぐため)。`DOCKER_COMPOSE_UP_DEPLOY_LAMBDA=0` や `DOCKER_COMPOSE_UP_BUILD_LAMBDA=0` でスキップできます。E2E 用コンテナは `profile: e2e` とし、スタック起動と Lambda デプロイの後に `verify-group-chat.sh` が走るようにしています。
5. Compose 起動後に手動デプロイする場合:

```shell
$ pnpm deploy-read-model-updater-localstack
```
21 changes: 21 additions & 0 deletions docs/BUILD_AND_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,24 @@ $ pnpm docker-build
$ pnpm docker-compose-up
$ pnpm verify-group-chat # E2E
```

## Read Model Updater Lambda (LocalStack)

This mirrors the Rust example: run the Read Model Updater as an AWS Lambda on LocalStack (DynamoDB stream → Lambda → MySQL).

1. Copy `common.env.default` to `common.env` at the repo root, or let `docker-compose-up.sh` / `docker-compose-e2e-test.sh` create it. `common.env` is gitignored.
Comment thread
cursor[bot] marked this conversation as resolved.
2. From the host, point the AWS CLI at **`http://localhost:34566`** (Compose maps container `4566` to host `34566`). Inside Compose, services use `http://localstack:4566`.
3. Build the deployment zip (Docker `linux/amd64`, Prisma engines for Lambda):

```shell
$ pnpm build-read-model-updater-lambda
```

Output: `dist/lambda/read-model-updater/function.zip`.

4. `pnpm docker-compose-up` and `pnpm docker-compose-e2e-test` can build the zip (unless `DOCKER_COMPOSE_UP_BUILD_LAMBDA=0`), wait for LocalStack, run `deploy-read-model-updater-localstack.sh`, and by default stop the `read-model-updater-1` container to avoid double-processing the stream. Set `DOCKER_COMPOSE_UP_DEPLOY_LAMBDA=0` to skip deploy. The e2e script brings stacks up, deploys Lambda, then runs the `e2e-test` container (`profile: e2e`) so GraphQL is ready before `verify-group-chat.sh` runs.
5. To deploy manually after Compose is up:

```shell
$ pnpm deploy-read-model-updater-localstack
```
5 changes: 5 additions & 0 deletions mise.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tools]
node = "20"
python = "3"
"npm:@fission-ai/openspec" = "1.2.0"
"npm:happy-coder" = "0.13.0"
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"docker-compose-up": "./tools/scripts/docker-compose-up.sh",
"docker-compose-up-db": "./tools/scripts/docker-compose-up.sh -d",
"docker-compose-down": "./tools/scripts/docker-compose-down.sh",
"build-read-model-updater-lambda": "./tools/scripts/build-read-model-updater-lambda.sh",
"deploy-read-model-updater-localstack": "./tools/scripts/deploy-read-model-updater-localstack.sh",
"create-group-chat": "./tools/scripts/curl-create-group-chat.sh",
"verify-group-chat": "./tools/e2e-test/verify-group-chat.sh",
"build": "turbo build",
Expand Down
3 changes: 2 additions & 1 deletion packages/bootstrap/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-bootstrap",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"start": "ts-node src/index.ts",
"start-write-api": "node dist/index.js writeApi",
Expand Down
29 changes: 29 additions & 0 deletions packages/bootstrap/src/lambda-rmu-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { PrismaClient } from "@prisma/client";
import type { DynamoDBStreamEvent, Handler } from "aws-lambda";
import { GroupChatDao, ReadModelUpdater } from "cqrs-es-example-js-rmu";

let prisma: PrismaClient | undefined;
let readModelUpdater: ReadModelUpdater | undefined;

function getReadModelUpdater(): ReadModelUpdater {
if (readModelUpdater) {
return readModelUpdater;
}
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
throw new Error("DATABASE_URL is not set");
}
prisma = new PrismaClient({
datasources: {
db: {
url: databaseUrl,
},
},
});
readModelUpdater = ReadModelUpdater.of(GroupChatDao.of(prisma));
return readModelUpdater;
}

export const handler: Handler<DynamoDBStreamEvent, void> = async (event) => {
await getReadModelUpdater().updateReadModel(event);
};
3 changes: 2 additions & 1 deletion packages/command/domain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-domain",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/group-chat/*.test.*",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
3 changes: 2 additions & 1 deletion packages/command/interface-adaptor-if/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-interface-adaptor-if",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rm -f dist/internal/*.test.d.ts && rm -f dist/internal/*.test.js && rm -fr dist/internal/test",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
3 changes: 2 additions & 1 deletion packages/command/interface-adaptor-impl/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-interface-adaptor-impl",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/repository/group-chat/*.test.*",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
16 changes: 8 additions & 8 deletions packages/command/interface-adaptor-impl/src/graphql/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type { TaskEither } from "fp-ts/TaskEither";
import { pipe } from "fp-ts/function";
import { GraphQLError } from "graphql/error";
import { Arg, Ctx, Mutation, Query, Resolver } from "type-graphql";
import type {
import {
AddMemberInput,
CreateGroupChatInput,
DeleteGroupChatInput,
Expand All @@ -45,7 +45,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async createGroupChat(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: CreateGroupChatInput,
@Arg("input", () => CreateGroupChatInput) input: CreateGroupChatInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatName(input.name),
Expand Down Expand Up @@ -75,7 +75,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async deleteGroupChat(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: DeleteGroupChatInput,
@Arg("input", () => DeleteGroupChatInput) input: DeleteGroupChatInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -105,7 +105,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async renameGroupChat(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: RenameGroupChatInput,
@Arg("input", () => RenameGroupChatInput) input: RenameGroupChatInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -151,7 +151,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async addMember(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: AddMemberInput,
@Arg("input", () => AddMemberInput) input: AddMemberInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -211,7 +211,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async removeMember(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: RemoveMemberInput,
@Arg("input", () => RemoveMemberInput) input: RemoveMemberInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -257,7 +257,7 @@ class GroupChatCommandResolver {
@Mutation(() => MessageOutput)
async postMessage(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: PostMessageInput,
@Arg("input", () => PostMessageInput) input: PostMessageInput,
): Promise<MessageOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down Expand Up @@ -307,7 +307,7 @@ class GroupChatCommandResolver {
@Mutation(() => GroupChatOutput)
async deleteMessage(
@Ctx() { groupChatCommandProcessor }: CommandContext,
@Arg("input") input: DeleteMessageInput,
@Arg("input", () => DeleteMessageInput) input: DeleteMessageInput,
): Promise<GroupChatOutput> {
return pipe(
this.validateGroupChatId(input.groupChatId),
Expand Down
3 changes: 2 additions & 1 deletion packages/command/processor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-command-processor",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/group-chat/*.test.*",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --no-cache",
Expand Down
3 changes: 2 additions & 1 deletion packages/infrastructure/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-infrastructure",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"start": "ts-node src/index.ts",
"start-write-api": "node dist/index.js writeApi",
Expand Down
3 changes: 2 additions & 1 deletion packages/query/interface-adaptor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-query-interface-adaptor",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"prisma:generate": "prisma generate",
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/*.test.*",
Expand Down
3 changes: 2 additions & 1 deletion packages/rmu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "cqrs-es-example-js-rmu",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"prisma:generate": "prisma generate",
"build": "tsc && rimraf dist/internal/*.test.d.ts && rimraf dist/internal/*.test.js && rimraf dist/internal/test && rimraf dist/*.test.*",
Expand Down
3 changes: 2 additions & 1 deletion packages/rmu/prisma/schema.prisma
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
generator client {
provider = "prisma-client-js"
provider = "prisma-client-js"
binaryTargets = ["native", "rhel-openssl-3.0.x", "rhel-openssl-1.0.x"]
}

datasource db {
Expand Down
19 changes: 13 additions & 6 deletions packages/rmu/src/update-read-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,22 @@ class ReadModelUpdater {
this.logger.warn("No NewImage");
return;
}
const base64EncodedPayload = attributeValues.payload.B;
if (!base64EncodedPayload) {
const rawPayload = attributeValues.payload.B;
if (!rawPayload) {
this.logger.warn("No payload");
return;
}
const payload = this.decoder.decode(
new Uint8Array(base64EncodedPayload.split(",").map(Number)),
);
const payloadJson = JSON.parse(payload);
let payloadJson: unknown;
try {
// LocalStack: B フィールドに生 JSON 文字列が入る
payloadJson = JSON.parse(rawPayload);
} catch {
// AWS Lambda: Base64, ローカル RMU: カンマ区切り数値文字列
const payloadBytes = /^\d+(,\d+)*$/.test(rawPayload)
? new Uint8Array(rawPayload.split(",").map(Number))
: new Uint8Array(Buffer.from(rawPayload, "base64"));
payloadJson = JSON.parse(this.decoder.decode(payloadBytes));
}
const groupChatEvent = convertJSONToGroupChatEvent(payloadJson);
switch (groupChatEvent.symbol) {
case GroupChatCreatedTypeSymbol: {
Expand Down
1 change: 1 addition & 0 deletions references/okite-ai
Submodule okite-ai added at 69fa7c
Loading
Loading