- Load balancing of subscribers: HIGH PRIORITY
- Currently the biggest limitation of Meteor, particularly when comparing to Kafka and Kinesis.
- Currently there's no notion of parallelism within a stream or across multiple streams. Under the current model, messages would have to be mapped to multiple streams by publishers; however, due to the absence of load balancing, subscribers may congregate in a relatively few number of nodes (due to the first-come-first-serve stream lease assignment), leaving the cluster underutilised.
- Record versioning and backward compatibility with rolling upgrades: HIGH PRIORITY
- Any changes to the record structure will break older clients when doing a rolling update. This means that the only way of upgrading a grid is to either bring it offline, or to terminate subscribers (which requires bespoke code on the application end).
- Add a version field to the head of a batch. A publisher always writes to the ring buffer in the latest (from its perspective) version.
- Forward compatibility: if a subscriber sees an unsupported message, it will halt processing. Assumingly at some point in the near future the subscriber's JVM is restarted and a new version of Meteor is loaded.
- We could even go as far as automatically unsubscribing the subscriber when it sees an unsupported version, causing a rebalancing of subscriber assignments. The lease might bounce around among old subscribers until eventually an upgraded subscriber is elected. (Over the upgrade window, the number of old subscribers should diminish rapidly.)
- Backward compatibility: if a subscriber observes a message of an older version, it will apply a series of transforms to stage-wise upgrade the message to the current schema on the fly. As we can't rewrite the messages in situ, the Meteor codebase must retain all schemas and migration rules up to the current version. We could use separate project modules to store version-specific schemas and transforms; the apps can include only those dependencies they need.
- If persistence is available, then upgrades could be made on data in situ with write-back, thus avoiding the need to retain all schemas and transforms in the codebase. If the subscriber pull a ring buffer cell with a version that is less than N - 1, then it can fetch from the data store instead. The write-back upgrade could be done with older clients still connected to the grid; they would need to halt processing if they encounter a newer schema.
- Alternatively, we could apply further versioning at the topic level. An upgrade would pump messages from one topic to the next, transforming the messages en route. This could be quite complicated in the presence of older publishers, who will continue to publish to the old stream, unless we atomically cut over all publishers (old and new) to the new stream, and support schema N - 1 in the new stream. Also, this approach would interfere with message offsets (although this could be corrected via
RingbufferStore). We might have to bite the bullet and go with stage-wise upgrades. - Versioning only applies to the records' on-wire representation; not to their payload schema. Payload versioning is the application's concern.
- Thought: we might make backward compatibility exclusions/dispensations for versions 0.x.x, as schema evolution will be particularly liberal in the beginning and the understanding is that the 0.x.x library hasn't reached a milestone that permits its use in systems with unbounded data retention requirements.
- Micro-batching and LZ4 data compression:
- The built-in batching offered by Hazelcast is limited to 1,000 records and isn't tunable; also no opportunity to pass the batch through a compression filter or perform any other pre-processing.
- Proposal is to create an independent micro-batching layer with customisable stream filters; LZ4 support with configurable (possibly even self-tunable) block sizes should be out of the box.
- Keys and key-based sharding
- Client-level message serialization support:
- Currently the API expects you to work directly with byte arrays, which is arbitrarily flexible but assumes experienced coders.
- Manual byte-pushing minimises opportunities for pipelining within the client API. (The application becomes responsible for pipelining.)
- Support for message serialization will be baked into the client APIs.
- OOTB support for Jackson, Gson, Kryo and
java.io.Serializable, as well as custom serializers. - Serialization will apply to both keys and values; different serializers may be used.
- Pipelining of client Hazelcast API calls separately from message (de)serialization (using separate threads for I/O and serialization):
- Approach is similar to how Jackdaw pipelines Kafka I/O and serialization; only the pipelines will be integrated into the Meteor client API (because we can) and thus made completely transparent to the application.
- JMX metrics
- Auto-confirming of subscriber offsets. Currently this is a manual call to
Subscriber.confirm(). - Metadata server:
- Currently all publisher and subscribers to a stream must agree on all of the stream's parameters — capacity, number of sync/async, replicas, storage implementation, etc. There is no way to discover this information. The present design, however restrictive, ensures that any cohort can auto-create the stream if one doesn't exist. (In other words, streams are always created lazily, upon first use.) In practice, this is acceptable for long-lived streams and where the stream configuration is static and can be agreed upon and disseminated out-of-band.
- Ideally, publishers and subscribers should refer to stream solely by its name, without concerning themselves with its underlying configuration. Create a stream metedata service that holds a serialized
StreamConfig(e.g. JSON with YConf mappings) for a given stream name. (A distributed hash map should do.) - The act of looking up the stream's metadata should be separate from the act of connecting to the stream for pub-sub. The lookup operation is done via a separate
MetadataServiceAPI and may take an optionalSupplier<StreamConfig>, in case the stream doesn't exist. - There would ideally be one application responsible for 'mastering' the stream; that application would house the stream config and pass it as the default value. Typically, that application would be one of the publishers. Other applications would perform the lookup without knowledge of the default value; if metadata is missing then the application would either back off or fail (or more pragmatically, fail after some number of back-offs). Perhaps the lookup API could take a timeout value, backing off and retrying behind the scenes.
- Being a distributed hash map, the metadata map might itself be created lazily. For this reason, all cohorts must agree on the metadata map configuration. Sensible defaults should be provided by Meteor, with the option to override.
- Metadata persistence: this wouldn't be an issue for transient (non-persisted topics); however, persisted topics might survive their own metadata if the grid is reformed. The only problem is that there isn't a sensible default persistence configuration for the metadata hash map. The options are to either agree on a global configuration which is dispersed out-of-band, or to apply the
Supplier<MetadataConfig>pattern and make one 'pilot' process responsible for metadata 'bootstrapping'. If all metadata replicas are lost, the other procs would have to wait for the pilot proc to join the grid. The same pilot proc could also be used to 'master' the streams — acting as a central repository of configuration, which it immediately transfers to the grid. For as long as the grid is intact, the pilot proc is dormant. - The pilot is a simple process attached to the grid that can be remotely configured using a Hazelcast topic.
- Parallel persistence engine:
- Traditional challenge with persistence of ordered messages is that the writing a message blocks all other writers, waiting in a write queue. However, it's a simple model involving one large (albeit blocking) write per message. Reading a record is also done in one operation.
- Proposed approach: publisher persists batch and obtains a unique (DB-assigned) store ID (slow operation, but done in parallel across publishers) before putting the compressed batch and the store ID on the ring buffer.
RingbufferStorecompletes the loop by associating the store ID with the message offset (fast operation that is blocking within the shard leader), indexed by ring buffer offset, before acknowledging the write. - By the time the batch is observed by subscribers, the batch would have been persisted and linked back to the ring buffer offset.
- If the ring buffer cell has lapsed,
RingbufferStorelooks up the store ID for the given ring buffer offset. Then the store ID is resolved to the batch data. (Two discrete operations are required for the read, which may be issued as one composite operation depending on the persistence stack and query language semantics.) - Persistence must also apply to subscriber offsets. Offsets may be persisted lazily; there's no need to fsync the offset before returning.
- Background (semi-)compaction:
- Persisted messages are obsoleted in the background based on key, and are thereby excluded from the batch, leaving a hole which may in theory be squashed (as long as the intra-batch message numbering is preserved). If the last message in a batch is obsoleted, then the batch is fed as a special void batch to the subscriber, with a pointer to the next non-void batch. This way, if there is a large void in the message log (several contiguous void batches), the subscriber can rapidly skip over those ring buffer cells.
- This might be called semi-compaction as it works at a batch level; it doesn't shuffle messages between batches or try to splice buddying batches to avoid external fragmentation. The algorithm reaches peak efficiency when entire batches can be reclaimed and the subscribers can begin to fast-forward their offsets, skipping over the ring buffer cells. Even if there are still lots of old non-void batches left due to sparsely distributed relevant/un-compacted messages, we can still realise performance gains by allowing subscribers to silently skip over the compacted messages.
- We're ultimately limited by a ring buffer's rigid structure which isn't naturally prone to compaction (as opposed to chained log nodes, which can easily be buddied and spliced). At best, we can reduce a ring buffer to a skip list.