Skip to content

Commit a2aebd1

Browse files
committed
batch get and write on multiple tables
1 parent b3659b9 commit a2aebd1

17 files changed

+1276
-477
lines changed

feature/dynamodb/entitymanager/README.md

Lines changed: 112 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,26 @@ type Product struct {
3434
}
3535

3636
// default usage
37-
table := entitymanager.NewTable[Product](client)
37+
table, err := entitymanager.NewTable[Product](client)
38+
if err != nil {
39+
// handle error
40+
}
3841

3942
// customized schema options
40-
schema := entitymanager.NewSchema[Product]()
43+
schema, err := entitymanager.NewSchema[Product]()
44+
if err != nil {
45+
// handle error
46+
}
4147
schema = schema.WithProvisionedThroughput(&types.ProvisionedThroughput{
4248
ReadCapacityUnits: 5,
4349
WriteCapacityUnits: 5,
4450
})
4551
schema = schema.WithTableClass(types.TableClassStandardInfrequentAccess)
4652
schema = schema.WithTags([]types.Tag{{Key: aws.String("env"), Value: aws.String("prod")}})
47-
table := entitymanager.NewTable(client, entitymanager.WithSchema(schema))
53+
table, err = entitymanager.NewTable[Product](client, entitymanager.WithSchema(schema))
54+
if err != nil {
55+
// handle error
56+
}
4857
```
4958

5059
The `Table[T]` type provides a high-level, type-safe interface for managing DynamoDB tables. It abstracts away much of the boilerplate required for table lifecycle management, making it easier to work with DynamoDB in Go.
@@ -60,8 +69,6 @@ The `Table[T]` type provides a high-level, type-safe interface for managing Dyna
6069

6170
These features allow you to manage the full lifecycle of your DynamoDB tables in a concise, idiomatic Go style, while leveraging the full power of DynamoDB's management capabilities.
6271

63-
> **Note:** Only table level scans are supported at the moment.
64-
6572
> **Note:** Table schema updates (such as adding or modifying attributes, indexes, or throughput settings after table creation) are not supported at this time. Only the table management functions listed above are available. Support for table updates is planned for a future release.
6673
6774
## Item Operations
@@ -76,35 +83,37 @@ The `Table[T]` type provides a set of strongly-typed methods for common item-lev
7683
- `UpdateItem(ctx, item, ...) (*T, error)`: Update an existing item, using the struct as the source of changes.
7784
- `DeleteItem(ctx, item, ...) error`: Delete an item by providing the struct value.
7885
- `DeleteItemByKey(ctx, key, ...) error`: Delete an item by its key.
79-
- `Scan(ctx, expr, ...) iter.Seq[ItemResult[T]]`: Scan the table with a filter expression, returning an iterator over results.
80-
- `ScanIndex(ctx, indexName, expr, ...) iter.Seq[ItemResult[T]]`: Scan the index with a filter expression, returning an iterator over results.
81-
- `Query(ctx, expr, ...) iter.Seq[ItemResult[T]]`: Query the table or an index using a key condition expression, returning an iterator over results.
82-
- `QueryIndex(ctx, indexName, expr, ...) iter.Seq[ItemResult[T]]`: Query the index or an index using a key condition expression, returning an iterator over results.
86+
- `Scan(ctx, expr, ...) iter.Seq[ItemResult[*T]]`: Scan the table with a filter expression, returning an iterator over results.
87+
- `ScanIndex(ctx, indexName, expr, ...) iter.Seq[ItemResult[*T]]`: Scan the index with a filter expression, returning an iterator over results.
88+
- `Query(ctx, expr, ...) iter.Seq[ItemResult[*T]]`: Query the table or an index using a key condition expression, returning an iterator over results.
89+
- `QueryIndex(ctx, indexName, expr, ...) iter.Seq[ItemResult[*T]]`: Query the index or an index using a key condition expression, returning an iterator over results.
8390

8491
**Batch operations:**
8592

8693
- `CreateBatchWriteOperation() *BatchWriteOperation[T]`: Returns a new batch write operation, allowing you to queue multiple put and delete requests and execute them efficiently in batches. Handles chunking, retries for unprocessed items, and respects DynamoDB's batch size limits.
8794
- Use `AddPut(item *T)` or `AddRawPut(map[string]types.AttributeValue)` to queue items for writing.
8895
- Use `AddDelete(item *T)` or `AddRawDelete(map[string]types.AttributeValue)` to queue items for deletion.
8996
- Call `Execute(ctx, ...)` to perform the batch write.
97+
- Use `Merge(otherBatchers...)` on a `BatchWriteOperation` to create a `BatchWriteExecutor` that can write to multiple tables in a single coordinated workflow.
9098

9199
- `CreateBatchGetOperation() *BatchGetOperation[T]`: Returns a new batch get operation, allowing you to queue multiple keys for retrieval and execute them in a single batch request. Handles chunking, retries for unprocessed keys, and respects DynamoDB's batch size limits.
92100
- Use `AddReadItem(item *T)` or `AddReadItemByMap(map[string]types.AttributeValue)` to queue keys for retrieval.
93101
- Call `Execute(ctx, ...)` to perform the batch get, which yields results as an iterator.
102+
- Use `Merge(otherBatchers...)` on a `BatchGetOperation` to create a `BatchGetExecutor` that can read from multiple tables in a single `BatchGetItem` workflow.
94103

95104
Batch operations are useful for efficiently processing large numbers of items, minimizing network calls, and handling DynamoDB's batch constraints automatically.
96105

97106
These methods are designed to be ergonomic and safe, leveraging Go's type system to reduce boilerplate and runtime errors when working with DynamoDB items.
98107

99108
**Iterators and ItemResult:**
100109

101-
Many methods, such as `Scan`, `Query`, and `BatchGetOperation.Execute`, return an iterator in the form of an `iter.Seq[ItemResult[T]]`, which is a function that accepts a callback. Each callback invocation receives an `ItemResult[T]` containing either a successfully decoded item or an error encountered during retrieval or decoding.
110+
Many methods, such as `Scan`, `Query`, and `BatchGetOperation.Execute`, return an iterator in the form of an `iter.Seq[ItemResult[*T]]`, which is a function that accepts a callback. Each callback invocation receives an `ItemResult[*T]` containing either a successfully decoded item (accessible via `Item()`) or an error encountered during retrieval or decoding (accessible via `Error()`). For batch operations that may span multiple tables, `ItemResult` also exposes a `Table()` method that returns the source table name for the item.
102111

103112
When consuming these iterators, use the callback or range pattern and always check the `Error()` method on each result before using the item:
104113

105114
```go
106115
// Callback-based iteration (idiomatic for iter.Seq):
107-
table.Scan(ctx, expr, ...)(func(result ItemResult[T]) bool {
116+
table.Scan(ctx, expr, ...)(func(result ItemResult[*T]) bool {
108117
if err := result.Error(); err != nil {
109118
// handle error, e.g. log or collect
110119
return true // continue to next result
@@ -126,6 +135,74 @@ for res := range table.Scan(ctx, expr, ...) {
126135

127136
This pattern ensures robust error handling and makes it easy to process large result sets efficiently and safely.
128137

138+
### Advanced: merged batch operations
139+
140+
You can merge batch operations from multiple tables and execute them together. This is useful when you want to minimize network calls and still keep type-safe table APIs.
141+
142+
**Merged BatchGetOperation (multi-table read):**
143+
144+
```go
145+
ordersTable, _ := entitymanager.NewTable[Order](client)
146+
customersTable, _ := entitymanager.NewTable[Customer](client)
147+
148+
ordersBatch := ordersTable.CreateBatchGetOperation()
149+
customersBatch := customersTable.CreateBatchGetOperation()
150+
151+
// queue keys for both tables
152+
for _, key := range orderKeys {
153+
_ = ordersBatch.AddReadItemByMap(key)
154+
}
155+
for _, key := range customerKeys {
156+
_ = customersBatch.AddReadItemByMap(key)
157+
}
158+
159+
// merge into a single executor
160+
executor := ordersBatch.Merge(customersBatch)
161+
162+
for res := range executor.Execute(ctx) { // iter.Seq[ItemResult[any]]
163+
if err := res.Error(); err != nil {
164+
// handle error
165+
continue
166+
}
167+
168+
switch res.Table() {
169+
case "orders":
170+
order := res.Item().(*Order)
171+
// process order
172+
case "customers":
173+
customer := res.Item().(*Customer)
174+
// process customer
175+
}
176+
}
177+
```
178+
179+
**Merged BatchWriteOperation (multi-table write):**
180+
181+
```go
182+
ordersTable, _ := entitymanager.NewTable[Order](client)
183+
customersTable, _ := entitymanager.NewTable[Customer](client)
184+
185+
ordersBatch := ordersTable.CreateBatchWriteOperation()
186+
customersBatch := customersTable.CreateBatchWriteOperation()
187+
188+
// queue writes for both tables
189+
for _, o := range ordersToUpsert {
190+
_ = ordersBatch.AddPut(&o)
191+
}
192+
for _, c := range customersToDelete {
193+
_ = customersBatch.AddDelete(&c)
194+
}
195+
196+
// merge and execute in a single workflow
197+
executor := ordersBatch.Merge(customersBatch)
198+
199+
if err := executor.Execute(ctx); err != nil {
200+
// handle error
201+
}
202+
```
203+
204+
In these advanced scenarios, the merge APIs (`Merge`) let you coordinate multi-table operations while still using the high-level, type-safe table abstractions provided by this package. Due to Go generics limitations, merged executors always return `ItemResult[any]`, so you must type-assert each item, typically by switching on `res.Table()` and then asserting the concrete type of `res.Item()`.
205+
129206
## Extensions
130207

131208
The entity manager supports an extension system that allows you to inject custom logic at key points in the item lifecycle. Extensions can be used for auditing, validation, automatic field population, versioning, atomic counters, and more.
@@ -255,20 +332,29 @@ The default extension registry includes several built-in extensions that provide
255332
- **How it works:**
256333
- Fields with the `version` tag option are checked and incremented on each write. If the version in the database does not match the expected value, the write fails, preventing lost updates.
257334

258-
These extensions are automatically included when you use `DefaultExtensionRegistry`:
335+
These extensions are automatically included by default via `DefaultExtensionRegistry` when you create a new table, but you can still override or customize the registry if needed:
259336

260337
```go
261-
table := entitymanager.NewTable[Product](
338+
table, err := entitymanager.NewTable[Product](client)
339+
if err != nil {
340+
// handle error
341+
}
342+
343+
// or customize the registry explicitly
344+
table, err := entitymanager.NewTable[Product](
262345
client,
263346
entitymanager.WithExtensionRegistry(
264347
entitymanager.DefaultExtensionRegistry[Product](),
265348
),
266349
)
350+
if err != nil {
351+
// handle error
352+
}
267353
```
268354

269355
You can also clone and customize the registry to add or remove extensions as needed for your application.
270356

271-
**Note:** Because extensions can modify how your data is processed, the extension registry is not enabled by default. Enable it explicitly if you want to use these features.
357+
**Note:** Because extensions can modify how your data is processed, they are enabled by default via `DefaultExtensionRegistry`. If you need different behavior, provide a custom registry with `WithExtensionRegistry` when creating the table.
272358

273359
## Custom converters
274360

@@ -312,9 +398,12 @@ my_registry := converters.NewRegistry()
312398
// register converter
313399
my_registry.Add("my_custom_converter", &MyCustomConverter{})
314400

315-
schema := entitymanager.NewSchema[MyStruct]func(options *SchemaOptions) {
401+
schema, err := entitymanager.NewSchema[MyStruct](func(options *entitymanager.SchemaOptions) {
316402
options.ConverterRegistry = my_registry
317403
})
404+
if err != nil {
405+
// handle error
406+
}
318407

319408
// add it to the struct
320409
type MyStruct struct {
@@ -336,6 +425,7 @@ import (
336425
"time"
337426

338427
"github.com/aws/aws-sdk-go-v2/config"
428+
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
339429
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
340430
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/entitymanager"
341431
)
@@ -378,8 +468,13 @@ func main() {
378468
}
379469
log.Printf("Got item: %+v", got)
380470

381-
// Scan all items
382-
for res := range table.Scan(context.Background(), entitymanager.ScanExpression{}) {
471+
// Scan all items (no additional filter in this example)
472+
expr, err := expression.NewBuilder().Build()
473+
if err != nil {
474+
log.Fatalf("failed to build scan expression: %v", err)
475+
}
476+
477+
for res := range table.Scan(context.Background(), expr) {
383478
if err := res.Error(); err != nil {
384479
log.Printf("scan error: %v", err)
385480
continue

0 commit comments

Comments
 (0)