Skip to content

Commit 41b18d9

Browse files
committed
test updates; more docs; small changes to table options
1 parent 6c279cd commit 41b18d9

File tree

8 files changed

+350
-100
lines changed

8 files changed

+350
-100
lines changed

feature/dynamodb/enhancedclient/batch_read.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type BatchGetOperation[T any] struct {
3232
func (b *BatchGetOperation[T]) AddReadItem(item *T) error {
3333
m, err := b.schema.createKeyMap(item)
3434
if err != nil {
35-
return fmt.Errorf(`error calling schema.createKeyMap: %v`, err)
35+
return fmt.Errorf("error calling schema.createKeyMap: %w", err)
3636
}
3737

3838
b.queue = append(b.queue, batchReadQueueItem{
@@ -62,6 +62,12 @@ func (b *BatchGetOperation[T]) AddReadItemByMap(m Map) error {
6262
// seq := op.Execute(ctx)
6363
// for res := range iter.Chan(seq) { ... }
6464
func (b *BatchGetOperation[T]) Execute(ctx context.Context, optFns ...func(options *dynamodb.Options)) iter.Seq[ItemResult[T]] {
65+
var consecutiveErrors uint = 0
66+
var maxConsecutiveErrors = b.table.options.MaxConsecutiveErrors
67+
if maxConsecutiveErrors == 0 {
68+
maxConsecutiveErrors = DefaultMaxConsecutiveErrors
69+
}
70+
6571
return func(yield func(ItemResult[T]) bool) {
6672
tableName := b.schema.TableName()
6773
if tableName == nil {
@@ -87,8 +93,17 @@ func (b *BatchGetOperation[T]) Execute(ctx context.Context, optFns ...func(optio
8793

8894
res, err := b.client.BatchGetItem(ctx, bgii, optFns...)
8995
if err != nil {
90-
yield(ItemResult[T]{err: err})
96+
if !yield(ItemResult[T]{err: err}) {
97+
return
98+
}
99+
100+
if consecutiveErrors >= maxConsecutiveErrors {
101+
return
102+
} else {
103+
continue
104+
}
91105
}
106+
consecutiveErrors = 0
92107

93108
if res != nil && res.Responses != nil {
94109
for _, item := range res.Responses[*tableName] {

feature/dynamodb/enhancedclient/batch_write.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ type BatchWriteOperation[T any] struct {
3636
// The item is encoded using the table's schema and extensions are applied before writing.
3737
func (b *BatchWriteOperation[T]) AddPut(item *T) error {
3838
if err := b.table.applyBeforeWriteExtensions(item); err != nil {
39-
return fmt.Errorf(`error calling table.applyBeforeWriteExtensions(): %v`, err)
39+
return fmt.Errorf("error calling table.applyBeforeWriteExtensions(): %w", err)
4040
}
4141

4242
m, err := b.schema.Encode(item)
4343
if err != nil {
44-
return fmt.Errorf(`error calling schema.Encode(): %v`, err)
44+
return fmt.Errorf("error calling schema.Encode(): %w", err)
4545
}
4646

4747
b.queue = append(b.queue, batchWriteQueueItem{
@@ -72,7 +72,7 @@ func (b *BatchWriteOperation[T]) AddRawPut(i map[string]types.AttributeValue) er
7272
func (b *BatchWriteOperation[T]) AddDelete(item *T) error {
7373
m, err := b.schema.createKeyMap(item)
7474
if err != nil {
75-
return fmt.Errorf(`error calling schema.createKeyMap(): %v`, err)
75+
return fmt.Errorf("error calling schema.createKeyMap(): %w", err)
7676
}
7777

7878
b.queue = append(b.queue, batchWriteQueueItem{
@@ -102,6 +102,12 @@ func (b *BatchWriteOperation[T]) AddRawDelete(i map[string]types.AttributeValue)
102102
// It sends requests in batches of up to 25 items, and retries unprocessed items until all are written.
103103
// Returns an error if the table name is not set or if a request fails.
104104
func (b *BatchWriteOperation[T]) Execute(ctx context.Context, optFns ...func(options *dynamodb.Options)) error {
105+
var consecutiveErrors uint = 0
106+
var maxConsecutiveErrors = b.table.options.MaxConsecutiveErrors
107+
if maxConsecutiveErrors == 0 {
108+
maxConsecutiveErrors = DefaultMaxConsecutiveErrors
109+
}
110+
105111
tableName := b.schema.TableName()
106112
if tableName == nil {
107113
return fmt.Errorf("empty table name, did you forget Schema[T].WithName()?")
@@ -129,7 +135,7 @@ func (b *BatchWriteOperation[T]) Execute(ctx context.Context, optFns ...func(opt
129135
},
130136
})
131137
default:
132-
return fmt.Errorf("uknown batchWriteOpType: %d", op.typ)
138+
return fmt.Errorf("unknown batchWriteOpType: %d", op.typ)
133139
}
134140
}
135141

@@ -141,22 +147,31 @@ func (b *BatchWriteOperation[T]) Execute(ctx context.Context, optFns ...func(opt
141147

142148
res, err := b.client.BatchWriteItem(ctx, bwii, optFns...)
143149
if err != nil {
144-
return err
150+
consecutiveErrors++
151+
152+
if consecutiveErrors >= maxConsecutiveErrors {
153+
return err
154+
}
145155
}
146156

157+
var unprocessedItems []types.WriteRequest
147158
if res != nil && res.UnprocessedItems != nil {
148-
for _, ui := range res.UnprocessedItems[*tableName] {
149-
if ui.PutRequest != nil {
150-
b.queue = append(b.queue, batchWriteQueueItem{
151-
typ: batchWriteOpPut,
152-
item: ui.PutRequest.Item,
153-
})
154-
} else if ui.DeleteRequest != nil {
155-
b.queue = append(b.queue, batchWriteQueueItem{
156-
typ: batchWriteOpDelete,
157-
item: ui.DeleteRequest.Key,
158-
})
159-
}
159+
unprocessedItems = res.UnprocessedItems[*tableName]
160+
} else if err != nil {
161+
unprocessedItems = bwii.RequestItems[*tableName]
162+
}
163+
164+
for _, ui := range unprocessedItems {
165+
if ui.PutRequest != nil {
166+
b.queue = append(b.queue, batchWriteQueueItem{
167+
typ: batchWriteOpPut,
168+
item: ui.PutRequest.Item,
169+
})
170+
} else if ui.DeleteRequest != nil {
171+
b.queue = append(b.queue, batchWriteQueueItem{
172+
typ: batchWriteOpDelete,
173+
item: ui.DeleteRequest.Key,
174+
})
160175
}
161176
}
162177
}

feature/dynamodb/enhancedclient/schema_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ func TestSchema(t *testing.T) {
1919
t.Fatalf("NewSchema error: %v", err)
2020
}
2121

22-
if len(actual.cachedFields.fields) != 17 {
23-
t.Fatalf("expected %d CachedFields, found %d", 17, len(actual.cachedFields.fields))
22+
if len(actual.cachedFields.fields) != 18 {
23+
t.Fatalf("expected %d CachedFields, found %d", 18, len(actual.cachedFields.fields))
2424
}
2525

2626
// | Index Name | Partition Key | Sort Key | Type | Notes |
@@ -327,6 +327,15 @@ func TestSchema(t *testing.T) {
327327
Name: "last_name",
328328
},
329329
},
330+
{
331+
Name: "nick_name",
332+
NameFromTag: true,
333+
Index: []int{16},
334+
Type: reflect.TypeFor[string](),
335+
Tag: Tag{
336+
Name: "nick_name",
337+
},
338+
},
330339
},
331340
fieldsByName: map[string]int{
332341
"order_id": 0,
@@ -346,6 +355,7 @@ func TestSchema(t *testing.T) {
346355
"note": 14,
347356
"first_name": 15,
348357
"last_name": 16,
358+
"nick_name": 17,
349359
},
350360
},
351361
enc: NewEncoder[order](),
@@ -474,6 +484,9 @@ func TestSchemaEncodeDecode(t *testing.T) {
474484
"last_name": &types.AttributeValueMemberS{
475485
Value: "1e73f306-3362-49da-af74-41e1befff588",
476486
},
487+
"nick_name": &types.AttributeValueMemberNULL{
488+
Value: true,
489+
},
477490
}
478491

479492
s, _ := NewSchema[order]()

feature/dynamodb/enhancedclient/structs_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type order struct {
2222
customerNote string `dynamodbav:"note" dynamodbgetter:"Note" dynamodbsetter:"SetNote" dynamodbindex:"NoteIndex,global,partition"`
2323
CustomerFirstName string `dynamodbav:"first_name"`
2424
CustomerLastName string `dynamodbav:"last_name"`
25+
CustomerNickName *string `dynamodbav:"nick_name"`
2526
}
2627

2728
// Getter method for customerNote

feature/dynamodb/enhancedclient/table.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,44 @@ import (
77
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
88
)
99

10+
// TableOptions provides configuration options for a DynamoDB Table.
11+
//
12+
// T is the type of the items stored in the table.
1013
type TableOptions[T any] struct {
11-
Schema *Schema[T]
12-
DynamoDBOptions []func(*dynamodb.Options)
14+
// Schema defines the schema for the table, including attribute mapping and validation.
15+
Schema *Schema[T]
16+
17+
// DynamoDBOptions is a list of functions to customize the underlying DynamoDB client options.
18+
DynamoDBOptions []func(*dynamodb.Options)
19+
20+
// ExtensionRegistry holds the registry of extensions to be used with the table.
1321
ExtensionRegistry *ExtensionRegistry[T]
22+
23+
// MaxConsecutiveErrors sets the maximum number of consecutive errors allowed during batch, query, or scan operations.
24+
// If this threshold is exceeded, the operation will stop and return.
25+
// If set to 0, the default value of DefaultMaxConsecutiveErrors will be used.
26+
MaxConsecutiveErrors uint
1427
}
1528

29+
// DefaultMaxConsecutiveErrors is the fallback value used for MaxConsecutiveErrors when it is set to 0.
30+
// A value of 1 means the operation will stop after the first error.
31+
const DefaultMaxConsecutiveErrors uint = 1
32+
33+
// Table represents a strongly-typed DynamoDB table for items of type T.
34+
//
35+
// It provides methods for interacting with DynamoDB using the provided client and options.
1636
type Table[T any] struct {
17-
client Client
37+
// client is the DynamoDB client used to perform operations on the table.
38+
client Client
39+
40+
// options holds the configuration options for the table.
1841
options TableOptions[T]
1942
}
2043

44+
// NewTable creates a new Table for items of type T using the provided client and configuration functions.
45+
//
46+
// The configuration functions can be used to customize the TableOptions before the table is created.
47+
// Returns an error if T is not a struct type or if required options cannot be resolved.
2148
func NewTable[T any](client Client, fns ...func(options *TableOptions[T])) (*Table[T], error) {
2249
if reflect.TypeFor[T]().Kind() != reflect.Struct {
2350
return nil, fmt.Errorf("NewClient() can only be created from structs, %T given", *new(T))
@@ -32,6 +59,7 @@ func NewTable[T any](client Client, fns ...func(options *TableOptions[T])) (*Tab
3259
defaultResolvers := []resolverFn[T]{
3360
resolveDefaultSchema[T],
3461
resolveDefaultExtensionRegistry[T],
62+
resolveDefaultMaxConsecutiveErrors[T],
3563
}
3664

3765
for _, fn := range defaultResolvers {
@@ -46,20 +74,44 @@ func NewTable[T any](client Client, fns ...func(options *TableOptions[T])) (*Tab
4674
}, nil
4775
}
4876

77+
// WithSchema returns a configuration function that sets the Schema for TableOptions.
78+
//
79+
// Use this to specify a custom schema when creating a Table.
4980
func WithSchema[T any](schema *Schema[T]) func(options *TableOptions[T]) {
5081
return func(options *TableOptions[T]) {
5182
options.Schema = schema
5283
}
5384
}
5485

86+
// WithExtensionRegistry returns a configuration function that sets the ExtensionRegistry for TableOptions.
87+
//
88+
// Use this to specify a custom extension registry when creating a Table.
5589
func WithExtensionRegistry[T any](registry *ExtensionRegistry[T]) func(options *TableOptions[T]) {
5690
return func(options *TableOptions[T]) {
5791
options.ExtensionRegistry = registry
5892
}
5993
}
6094

95+
// WithMaxConsecutiveErrors returns a configuration function that sets the MaxConsecutiveErrors option for TableOptions.
96+
//
97+
// Use this to specify the maximum number of consecutive errors allowed during batch, query, or scan operations.
98+
// A value of 0 means no limit is enforced.
99+
// WithMaxConsecutiveErrors returns a configuration function that sets the MaxConsecutiveErrors option for TableOptions.
100+
//
101+
// Use this to specify the maximum number of consecutive errors allowed during batch, query, or scan operations.
102+
// If set to 0, the default value of DefaultMaxConsecutiveErrors will be used.
103+
func WithMaxConsecutiveErrors[T any](maxConsecutiveErrors uint) func(options *TableOptions[T]) {
104+
return func(options *TableOptions[T]) {
105+
options.MaxConsecutiveErrors = maxConsecutiveErrors
106+
}
107+
}
108+
109+
// resolverFn defines a function type for resolving or setting default options on TableOptions.
61110
type resolverFn[T any] func(opts *TableOptions[T]) error
62111

112+
// resolveDefaultSchema sets a default schema on TableOptions if none is provided.
113+
//
114+
// Returns an error if the schema cannot be created.
63115
func resolveDefaultSchema[T any](opts *TableOptions[T]) error {
64116
if opts.Schema == nil {
65117
var err error
@@ -72,10 +124,21 @@ func resolveDefaultSchema[T any](opts *TableOptions[T]) error {
72124
return nil
73125
}
74126

127+
// resolveDefaultExtensionRegistry sets a default extension registry on TableOptions if none is provided.
75128
func resolveDefaultExtensionRegistry[T any](opts *TableOptions[T]) error {
76129
if opts.ExtensionRegistry == nil {
77130
opts.ExtensionRegistry = DefaultExtensionRegistry[T]()
78131
}
79132

80133
return nil
81134
}
135+
136+
// resolveDefaultMaxConsecutiveErrors sets MaxConsecutiveErrors to DefaultMaxConsecutiveErrors
137+
// if it is not explicitly set (i.e., if the value is 0).
138+
// This ensures a sensible default for error handling in batch, query, or scan operations.
139+
func resolveDefaultMaxConsecutiveErrors[T any](opts *TableOptions[T]) error {
140+
if opts.MaxConsecutiveErrors == 0 {
141+
opts.MaxConsecutiveErrors = DefaultMaxConsecutiveErrors
142+
}
143+
return nil
144+
}

0 commit comments

Comments
 (0)