Skip to content

Rooster212/event-sourced-rules-engine

Repository files navigation

Event Sourced Rules Engine

A storage-agnostic event-sourced rules engine for TypeScript.

The core package contains the aggregate orchestration, rule processor, event types, index definitions, and storage contract. DynamoDB is the first production storage adapter, but the core API is designed so a SQLite or SQL adapter can implement the same EventStore contract later.

Install

npm install @rooster212/event-sourced-rules-engine

The root import has no AWS runtime imports:

import { Aggregate, Processor, createEvent, defineIndex } from "@rooster212/event-sourced-rules-engine";

DynamoDB, EventBridge, and CDK integrations are available through subpath exports:

import { DynamoEventStore } from "@rooster212/event-sourced-rules-engine/dynamodb";
import { LocalStorageEventStore } from "@rooster212/event-sourced-rules-engine/browser";
import { EventBridgeOutboxPublisher } from "@rooster212/event-sourced-rules-engine/eventbridge";
import { DynamoDBOutboxEventBridgeHandler } from "@rooster212/event-sourced-rules-engine/cdk";

The AWS packages are optional peer dependencies. Install the ones needed by the subpaths you use. The ./browser export has no AWS or Node runtime dependencies.

Core Concepts

  • Processor applies event rules to state and records outbound events published by rules.
  • Aggregate loads state/history, invokes the processor, builds event records, writes indexes, and commits the change through an EventStore.
  • EventStore is the storage boundary. It atomically persists inbound events, the latest snapshot, index entries, and generic outbox records.
  • OutboxRecords are persisted by the store. Publishing them is handled by integrations such as DynamoDB Streams to EventBridge.

Example

import { Aggregate, Processor, StateUpdater, createEvent, defineIndex } from "@rooster212/event-sourced-rules-engine";
import { DynamoEventStore } from "@rooster212/event-sourced-rules-engine/dynamodb";

interface Account {
  email: string;
  balance: number;
}

type Inbound = AccountOpened | BalanceChanged;
type Outbound = BalanceChangedNotification;

interface AccountOpened {
  email: string;
}

interface BalanceChanged {
  amount: number;
}

interface BalanceChangedNotification {
  balance: number;
}

const rules = new Map<string, StateUpdater<Account, Inbound, Outbound, Inbound>>();

rules.set("ACCOUNT_OPENED", (input) => {
  input.state.email = (input.current as AccountOpened).email;
  return input.state;
});

rules.set("BALANCE_CHANGED", (input) => {
  input.state.balance += (input.current as BalanceChanged).amount;
  input.publish("BALANCE_CHANGED", { balance: input.state.balance });
  return input.state;
});

const store = new DynamoEventStore<Account, Inbound, Outbound>({
  client: dynamoDocumentClient,
  tableName: "events",
});

const accounts = new Aggregate<Account, Inbound, Outbound>({
  type: "ACCOUNT",
  store,
  processor: new Processor(rules, () => ({ email: "", balance: 0 })),
  indexes: [defineIndex("byEmail", (snapshot) => snapshot.state.email || null)],
});

await accounts.append(
  "account-1",
  createEvent("ACCOUNT_OPENED", { email: "owner@example.com" }),
  createEvent("BALANCE_CHANGED", { amount: 50 }),
);

const snapshot = await accounts.get("account-1");
const byEmail = await accounts.query("byEmail", "owner@example.com");

Browser Storage

Frontend apps can use the core rules engine with the browser adapter:

import { Aggregate, Processor } from "@rooster212/event-sourced-rules-engine";
import { LocalStorageEventStore } from "@rooster212/event-sourced-rules-engine/browser";

const store = new LocalStorageEventStore<Account, Inbound, Outbound>({
  namespace: "my-app",
});

const accounts = new Aggregate<Account, Inbound, Outbound>({
  type: "ACCOUNT",
  store,
  processor,
});

WebStorageEventStore works with any Storage-like object, so it can back localStorage, sessionStorage, or tests. It serializes dates explicitly, stores outbox records with aggregate state, checks expected sequence numbers, and can notify subscribers about local commits and browser storage events:

const unsubscribe = store.subscribe((change) => {
  console.log(change.ref, change.source, change.snapshot);
});

Web Storage is useful for lightweight local state, demos, and offline queues. It is not a strict multi-tab transaction system: concurrent writes from separate tabs can still race because browsers do not expose compare-and-swap for localStorage. For larger data, higher write volume, or stronger transaction guarantees, an IndexedDB adapter should be added against the same EventStore contract.

DynamoDB Table

The DynamoDB adapter uses a single-table layout with these keys:

  • partition key: _pk string
  • sort key: _sk string

Each aggregate stores a snapshot, inbound event records, outbox records, and duplicate snapshot records for configured indexes. Writes use DynamoDB transactions with optimistic concurrency.

For local integration tests, start DynamoDB Local first:

npm run dynamodb
npm test

Outbox Publishing

Core code only writes outbox records. The EventBridge integration publishes outbox records from DynamoDB stream images:

import { createDynamoDBStreamToEventBridgeHandler } from "@rooster212/event-sourced-rules-engine/eventbridge";

export const handler = createDynamoDBStreamToEventBridgeHandler({
  source: "account-service",
  eventBusName: "domain-events",
});

For CDK, use DynamoDBOutboxEventBridgeHandler from the ./cdk export and attach it to the DynamoDB table stream.

About

An evolution of https://github.com/Rooster212/event-driven-dynamodb-rules-engine to be more agnostic of storage and platform

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors