Skip to content

harlow/kinesis-consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

233 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Golang Kinesis Consumer

technology Go GoDoc GoReportCard

Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to read records, save checkpoints (with swappable backends), and gracefully recover from service timeouts/errors.

Alternate serverless options:

Installation

Get the package source:

$ go get github.com/harlow/kinesis-consumer

Note: This repo requires the AWS SDK V2 package. If you are still using AWS SDK V1 then use: https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.5

Overview

The consumer leverages a handler func that accepts a Kinesis record. The Scan method will consume all shards concurrently and call the callback func as it receives records from the stream.

Important 1: The Scan func will also poll the stream to check for new shards, it will automatically start consuming new shards added to the stream.

Important 2: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults.

import(
	// ...

	consumer "github.com/harlow/kinesis-consumer"
)

func main() {
	var stream = flag.String("stream", "", "Stream name")
	flag.Parse()

	// consumer
	c, err := consumer.New(*stream)
	if err != nil {
		log.Fatalf("consumer error: %v", err)
	}

	// start scan
	err = c.Scan(context.TODO(), func(r *consumer.Record) error {
		fmt.Println(string(r.Data))
		return nil // continue scanning
	})
	if err != nil {
		log.Fatalf("scan error: %v", err)
	}

	// Note: If you need to aggregate based on a specific shard
	// the `ScanShard` function should be used instead.
}

ScanFunc

ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library.

type ScanFunc func(r *Record) error

If an error is returned, scanning stops. The sole exception is when the function returns the special value SkipCheckpoint.

// continue scanning
return nil

// continue scanning, skip checkpoint
return consumer.SkipCheckpoint

// stop scanning, return error
return errors.New("my error, exit all scans")

ScanBatch (experimental)

For interval/size-based batch processing, use ScanBatch:

err := c.ScanBatch(ctx, func(batch []*consumer.Record) error {
	// process records in this batch
	return nil
},
	consumer.WithBatchMaxSize(500),
	consumer.WithBatchFlushInterval(2*time.Second),
)

Checkpoint behavior in batch mode:

  • checkpoint advances only after a batch callback succeeds
  • on callback error, scan stops and that batch is not checkpointed

Aggregated records

WithAggregation(true) enables KPL deaggregation before records reach your callback.

Checkpoint persistence in this library is still sequence-number based. That means a persisted checkpoint can resume only at the Kinesis record boundary, not at a sub-record position inside an aggregated KPL record.

Practical implication:

  • if a process stops after checkpointing one logical record from an aggregated Kinesis record but before finishing the rest of that same aggregate, a restart will resume after that Kinesis sequence number
  • remaining logical records from that aggregate may be skipped on restart

If you need exact restart semantics for aggregated records, avoid persisted checkpoints for that workload until the library adds sub-sequence checkpoint support.

Use context cancel to signal the scan to exit without error. For example if we wanted to gracefully exit the scan on interrupt.

// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// context with cancel
ctx, cancel := context.WithCancel(context.Background())

go func() {
	<-signals
	cancel() // call cancellation
}()

err := c.Scan(ctx, func(r *consumer.Record) error {
	fmt.Println(string(r.Data))
	return nil // continue scanning
})

Consumer Groups (DynamoDB Leases, Opt-In)

By default, consumer.New(...).Scan(...) consumes all shards in a single process. For multi-process shard coordination, use the opt-in consumer-group package.

Note: Consumer-group support is experimental and may evolve.

Consumer-group shard assignment preserves parent/child ordering for resharded streams: child shards are not assigned until their parent shards have been fully completed.

import (
	consumer "github.com/harlow/kinesis-consumer"
	groupddb "github.com/harlow/kinesis-consumer/group/consumergroup/ddb"
	checkpointddb "github.com/harlow/kinesis-consumer/store/ddb"
)

// checkpoint store (existing API)
ck, err := checkpointddb.New(appName, checkpointTable)
if err != nil {
	log.Fatalf("checkpoint store error: %v", err)
}

// group (new opt-in API)
group, err := groupddb.NewGroup(groupddb.GroupConfig{
	GroupName:   groupName, // preferred
	AppName:     appName,   // deprecated alias
	StreamName:  streamName,
	WorkerID:    workerID, // optional; auto-generated if empty
	KinesisClient: kinesisClient,
	Repository: groupddb.Config{
		Client:    dynamoClient,
		TableName: leaseTable,
	},
	CheckpointStore: ck,
})
if err != nil {
	log.Fatalf("group error: %v", err)
}

c, err := consumer.New(
	streamName,
	consumer.WithGroup(group),
	consumer.WithStore(ck), // keep checkpoints consistent with group
)
if err != nil {
	log.Fatalf("consumer error: %v", err)
}

If WorkerID is omitted, the library generates a unique worker ID per process. If both GroupName and AppName are set, GroupName is used.

The lease table schema is:

Partition key: namespace
Sort key: shard_id

For worker row cleanup, enable DynamoDB TTL on attribute ttl. Worker heartbeat rows write this field automatically.

Integration tests for this path are available and opt-in:

RUN_DDB_INTEGRATION=1 DDB_ENDPOINT=http://localhost:8000 go test ./group/consumergroup/... -run DynamoDB -v

Consumer-group shard management

Consumer-group rebalancing uses a cooperative handoff flow:

  • an under-target worker requests handoff for a shard owned by an overloaded worker
  • the current owner sees the handoff request, stops the local shard scan, flushes checkpoints, and releases the lease
  • the requesting worker claims the shard after release

This avoids the older release-first behavior where excess shards could become immediately unowned and sit idle waiting for the next assignment cycle.

There is also an opt-in example integration test that shows the handoff timing during a late join on a 10-shard local stream:

RUN_EXAMPLE_INTEGRATION=1 go test -v ./integration -run TestGroupExample_LateJoinLogsHandoffTimeline

Or run the full local end-to-end example suite:

bash scripts/run-e2e-integration.sh

The test logs a small timeline including:

  • when worker A was already consuming
  • when worker B joined
  • when worker B first consumed a post-join record
  • the elapsed time from join to B's first post-join record

Example go test -v output:

=== RUN   TestGroupExample_LateJoinLogsHandoffTimeline
    group_example_integration_test.go:152: ownership: t=2026-03-17T14:22:39.081234-07:00 worker-a=10 worker-b=0
    group_example_integration_test.go:163: ownership: t=2026-03-17T14:22:41.312345-07:00 worker-b joined
    group_example_integration_test.go:173: ownership: t=2026-03-17T14:22:41.9-07:00 worker-a=9 worker-b=1 elapsed_since_join=600ms
    group_example_integration_test.go:173: ownership: t=2026-03-17T14:22:42.5-07:00 worker-a=7 worker-b=3 elapsed_since_join=1.2s
    group_example_integration_test.go:173: ownership: t=2026-03-17T14:22:43.1-07:00 worker-a=5 worker-b=5 elapsed_since_join=1.8s
    group_example_integration_test.go:182: ownership: rebalance reached 5/5 in 1.8s
    group_example_integration_test.go:194: handoff timeline: join_at=2026-03-17T14:22:41.312345-07:00 worker_a_first_before=2026-03-17T14:22:39-07:00 shard=shardId-000000000003 seq=49613890083656679142130284297101740673125758020959617026 worker_b_first_after=2026-03-17T14:22:43-07:00 shard=shardId-000000000007 seq=49613890083656679142130284297101740673125758020959617104 join_to_b_first_after=2s
    group_example_integration_test.go:203: post-join message counts: worker-a before=40 after=43 worker-b before=0 after=37 total_after=80
--- PASS: TestGroupExample_LateJoinLogsHandoffTimeline (8.41s)

The worker processes also emit per-record logs such as:

consumer-group: 2026/03/17 14:22:39 worker=worker-a shard=shardId-000000000003 seq=49613890083656679142130284297101740673125758020959617026 data={"run":"before","i":12}
consumer-group: 2026/03/17 14:22:43 worker=worker-b shard=shardId-000000000007 seq=49613890083656679142130284297101740673125758020959617104 data={"run":"after","i":4}

That combination shows the expected handoff shape: worker A starts at 10 shards, worker B joins, the lease table converges to 5/5 over time, and the post-join batch is split across both workers without duplicates.

Options

The consumer allows the following optional overrides.

Store

To record the progress of the consumer in the stream (checkpoint) we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.

This will allow consumers to re-launch and pick up at the position in the stream where they left off.

The uniq identifier for a consumer is [appName, streamName, shardID]

kinesis-checkpoints

Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.

The consumer accepts a WithStore option to set the storage layer:

c, err := consumer.New(*stream, consumer.WithStore(db))
if err != nil {
	log.Log("consumer error: %v", err)
}

Checkpoint durability depends on the store implementation:

  • store/redis writes checkpoints immediately.
  • store/ddb, store/postgres, and store/mysql buffer checkpoints in memory and flush them periodically.
  • consumer.Scan(...), consumer.ScanShard(...), and consumer.ScanBatch(...) flush buffered checkpoints automatically before they return.
  • If you manage a buffered store outside the consumer lifecycle, call Flush() to persist pending checkpoints or Shutdown() to flush and stop the store.

To persist scan progress choose one of the following storage layers:

Redis

The Redis checkpoint requires App Name, and Stream Name:

import store "github.com/harlow/kinesis-consumer/store/redis"

// redis checkpoint
db, err := store.New(appName)
if err != nil {
	log.Fatalf("new checkpoint error: %v", err)
}

DynamoDB

The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:

import store "github.com/harlow/kinesis-consumer/store/ddb"

// ddb checkpoint
db, err := store.New(appName, tableName)
if err != nil {
	log.Fatalf("new checkpoint error: %v", err)
}

// Override the Kinesis if any needs on session (e.g. assume role)
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))

// For versions of AWS sdk that fixed config being picked up properly, the example of
// setting region should work.
//    myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{
//        Region: aws.String("us-west-2"),
//    })

db, err := store.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

// Or we can provide your own Retryer to customize what triggers a retry inside checkpoint
// See code in examples
// ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))

To leverage the DDB checkpoint we'll also need to create a table:

Partition key: namespace
Sort key: shard_id

screen shot 2017-11-22 at 7 59 36 pm

Postgres

The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:

import store "github.com/harlow/kinesis-consumer/store/postgres"

// postgres checkpoint
db, err := store.New(app, table, connStr)
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

To leverage the Postgres checkpoint we'll also need to create a table:

CREATE TABLE kinesis_consumer (
	namespace text NOT NULL,
	shard_id text NOT NULL,
	sequence_number numeric NOT NULL,
	CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);

The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.

Mysql

The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!):

import store "github.com/harlow/kinesis-consumer/store/mysql"

// mysql checkpoint
db, err := store.New(app, table, connStr)
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

To leverage the Mysql checkpoint we'll also need to create a table:

CREATE TABLE kinesis_consumer (
	namespace varchar(255) NOT NULL,
	shard_id varchar(255) NOT NULL,
	sequence_number numeric(65,0) NOT NULL,
	CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);

The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.

Kinesis Client

Override the Kinesis client if there is any special config needed:

// client
client := kinesis.New(session.NewSession(aws.NewConfig()))

// consumer
c, err := consumer.New(streamName, consumer.WithClient(client))

Metrics

Add optional counter for exposing counts for checkpoints and records processed:

// counter
counter := expvar.NewMap("counters")

// consumer
c, err := consumer.New(streamName, consumer.WithCounter(counter))

The expvar package will display consumer counts:

"counters": {
  "checkpoints": 3,
  "records": 13005
},

Consumer starting point

Kinesis allows consumers to specify where on the stream they'd like to start consuming from. The default in this library is LATEST (Start reading just after the most recent record in the shard).

This can be adjusted by using the WithShardIteratorType option in the library:

// override starting place on stream to use TRIM_HORIZON
c, err := consumer.New(
  *stream,
  consumer.WithShardIteratorType(kinesis.ShardIteratorTypeTrimHorizon)
)

See AWS Docs for more options.

Aggregation

Use WithAggregation(true) when records were produced with KPL aggregation and you want the consumer to deaggregate them before invoking your callback.

This affects callback delivery only. Checkpoint stores still persist a single sequence number per shard, so a persisted restart cannot resume partway through an aggregated Kinesis record.

Logging

Logging supports the basic built-in logging library or use third party external one, so long as it implements the Logger interface.

For example, to use the builtin logging package, we wrap it with myLogger structure.

// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
	logger *log.Logger
}

// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
	l.logger.Println(args...)
}

The package defaults to ioutil.Discard so swallow all logs. This can be customized with the preferred logging strategy:

// logger
logger := &myLogger{
	logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
}

// consumer
c, err := consumer.New(streamName, consumer.WithLogger(logger))

To use a more complicated logging library, e.g. apex log

type myLogger struct {
	logger *log.Logger
}

func (l *myLogger) Log(args ...interface{}) {
	l.logger.Infof("producer", args...)
}

func main() {
	log := &myLogger{
		logger: alog.Logger{
			Handler: text.New(os.Stderr),
			Level:   alog.DebugLevel,
		},
	}

Examples

There are examples of producer and comsumer in the /examples directory. These should help give end-to-end examples of setting up consumers with different checkpoint strategies.

The examples run locally against Kinesis Lite.

$ kinesalite &

Produce data to the stream:

$ cat examples/producer/users.txt  | go run examples/producer/main.go --stream myStream

Consume data from the stream:

$ go run examples/consumer/main.go --stream myStream

Community Contributions

Thanks to everyone who has helped improve this project over time by reporting issues, fixing bugs, and contributing code.

That includes contributors from companies such as Uber, GoDaddy, Signal, Splunk, LaunchDarkly, Qualtrics, Swift Navigation, Datadog, Disney, Globant, Nubank, and others.

Contributing

Please see CONTRIBUTING.md for more information. Thank you, contributors!

License

Copyright (c) 2015 Harlow Ward. It is free software, and may be redistributed under the terms specified in the LICENSE file.

www.hward.com  ·  GitHub @harlow  ·  Twitter @harlow_ward

About

Golang library for consuming Kinesis stream data

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages