Skip to content

fix: DH-22265: increment Kafka column index for complex spec#7887

Merged
devinrsmith merged 5 commits intodeephaven:mainfrom
devinrsmith:dh-22265-index-chunks-correctly
Apr 13, 2026
Merged

fix: DH-22265: increment Kafka column index for complex spec#7887
devinrsmith merged 5 commits intodeephaven:mainfrom
devinrsmith:dh-22265-index-chunks-correctly

Conversation

@devinrsmith
Copy link
Copy Markdown
Member

This fixes a bug where the column indexing is incorrectly calculated; it only manifests when the key spec is "complex" and the value spec is "simple".

Fixes #7884

This fixes a bug where the column indexing is incorrectly calculated; it only manifests when the key spec is "complex" and the value spec is "simple".

Fixes deephaven#7884
@devinrsmith devinrsmith self-assigned this Apr 10, 2026
@devinrsmith devinrsmith requested a review from Copilot April 10, 2026 01:16
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 10, 2026

No docs changes detected for 5dc6e69

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes Kafka consume column-index calculation when combining a complex key spec with a simple value spec, addressing the “Chunk sizes don’t match” ingestion error reported in #7884.

Changes:

  • Add KeyOrValueSpec.ingestDataAndIncrement(...) to adjust nextColumnIndex after complex ingest specs.
  • Switch getConsumeStruct to use the new helper for both key and value ingest-data creation.
  • Add a small helper (KeyOrValueIngestData.isSimple()) to distinguish simple vs non-simple ingest specs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java Outdated
Comment on lines 1288 to 1293
});

final KeyOrValueIngestData keyIngestData = keySpec.getIngestData(KeyOrValue.KEY,
final KeyOrValueIngestData keyIngestData = keySpec.ingestDataAndIncrement(KeyOrValue.KEY,
schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
final KeyOrValueIngestData valueIngestData = valueSpec.getIngestData(KeyOrValue.VALUE,
final KeyOrValueIngestData valueIngestData = valueSpec.ingestDataAndIncrement(KeyOrValue.VALUE,
schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change fixes subtle column-indexing behavior that only shows up with a complex key spec + simple value spec (per DH-22265 / #7884), but there doesn’t appear to be regression coverage exercising getConsumeStruct/consumeToTable with that spec combination. Adding a unit/integration test that builds a TableDefinition via getTableDefinition (or runs a minimal ingest path) for complex-key + simple-value and asserts the resulting indices/column ordering would help prevent future regressions.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing will be in a follow-up PR.

@devinrsmith devinrsmith requested a review from rcaudy April 10, 2026 02:12
devinrsmith added a commit to devinrsmith/deephaven-core that referenced this pull request Apr 10, 2026
This adds KafkaTools integration testing via TestContainers.

There are four different Kafka images we are testing against: `cp-kafka`, `kafka`, `kafka-native`, and `redpanda`.

This includes a test case for DH-22265 / deephaven#7887
Copy link
Copy Markdown
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're being a bit brittle.

Comment thread extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java Outdated
Comment thread extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java Outdated
Comment thread extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java Outdated
Comment thread extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java Outdated
Comment thread extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java Outdated
TableDefinition tableDef,
KeyOrValueIngestData data);

private KeyOrValueIngestData ingestDataAndIncrement(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

            protected KeyOrValueIngestData getIngestData(
                    KeyOrValue keyOrValue,
                    SchemaRegistryClient schemaRegistryClient,
                    Map<String, ?> configs,
                    int nextColumnIndex,
                    List<ColumnDefinition<?>> columnDefinitionsOut) {
                return getIngestData(keyOrValue, schemaRegistryClient, configs, new MutableInt(nextColumnIndex),
                        columnDefinitionsOut);
            }

            private KeyOrValueIngestData getIngestDataAndIncrementColumnIndex(
                    final KeyOrValue keyOrValue,
                    final SchemaRegistryClient schemaRegistryClient,
                    Map<String, ?> configs,
                    MutableInt nextColumnIndexMut,
                    List<ColumnDefinition<?>> columnDefinitionsOut) {
                final int ixPre = nextColumnIndexMut.get();
                final int sizePre = columnDefinitionsOut.size();
                // It's unfortunate we have to do this; but the getIngestData signature and design of KeyOrValueSpec
                // would need a breaking change to improve in these regards.
                final KeyOrValueIngestData ingestData = getIngestData(keyOrValue, schemaRegistryClient, configs,
                        ixPre, columnDefinitionsOut);
                final int addedColumns = columnDefinitionsOut.size() - sizePre;
                if (ingestData == null) {
                    // ignore case (or, possible a complex impl that chooses to return null when it's empty):
                    // expecting no modifications
                    if (addedColumns != 0) {
                        throw new IllegalStateException(
                                "KeyOrValueSpec getIngestData is modifying out-variables without returning ingest data");
                    }
                } else {
                    nextColumnIndexMut.set(ixPre + addedColumns);
                }
                return ingestData;
            }
        }

@devinrsmith devinrsmith requested a review from rcaudy April 10, 2026 23:08
rcaudy
rcaudy previously approved these changes Apr 10, 2026
Copy link
Copy Markdown
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving, but I liked my simpler version that absolved the implementations of any responsibility for mutating the index quite a lot more. This version has an awful lot of validation and warning trace that only benefits anyone if the current nonsense implementation is desirable long-term.

Copy link
Copy Markdown
Member

@rcaudy rcaudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the commented-out warnings are ugly, but I'm willing to compromise here.

@devinrsmith devinrsmith enabled auto-merge (squash) April 13, 2026 16:39
@devinrsmith devinrsmith merged commit eefe6d6 into deephaven:main Apr 13, 2026
23 checks passed
@devinrsmith devinrsmith deleted the dh-22265-index-chunks-correctly branch April 13, 2026 17:03
@github-actions github-actions Bot locked and limited conversation to collaborators Apr 13, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka: Chunk sizes don't match error when combining jsonSpec and simpleSpec

3 participants