Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,20 @@ protected abstract KeyOrValueIngestData getIngestData(
protected abstract KeyOrValueProcessor getProcessor(
TableDefinition tableDef,
KeyOrValueIngestData data);

private KeyOrValueIngestData ingestDataAndIncrement(
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

            protected KeyOrValueIngestData getIngestData(
                    KeyOrValue keyOrValue,
                    SchemaRegistryClient schemaRegistryClient,
                    Map<String, ?> configs,
                    int nextColumnIndex,
                    List<ColumnDefinition<?>> columnDefinitionsOut) {
                return getIngestData(keyOrValue, schemaRegistryClient, configs, new MutableInt(nextColumnIndex),
                        columnDefinitionsOut);
            }

            private KeyOrValueIngestData getIngestDataAndIncrementColumnIndex(
                    final KeyOrValue keyOrValue,
                    final SchemaRegistryClient schemaRegistryClient,
                    Map<String, ?> configs,
                    MutableInt nextColumnIndexMut,
                    List<ColumnDefinition<?>> columnDefinitionsOut) {
                final int ixPre = nextColumnIndexMut.get();
                final int sizePre = columnDefinitionsOut.size();
                // It's unfortunate we have to do this; but the getIngestData signature and design of KeyOrValueSpec
                // would need a breaking change to improve in these regards.
                final KeyOrValueIngestData ingestData = getIngestData(keyOrValue, schemaRegistryClient, configs,
                        ixPre, columnDefinitionsOut);
                final int addedColumns = columnDefinitionsOut.size() - sizePre;
                if (ingestData == null) {
                    // ignore case (or, possible a complex impl that chooses to return null when it's empty):
                    // expecting no modifications
                    if (addedColumns != 0) {
                        throw new IllegalStateException(
                                "KeyOrValueSpec getIngestData is modifying out-variables without returning ingest data");
                    }
                } else {
                    nextColumnIndexMut.set(ixPre + addedColumns);
                }
                return ingestData;
            }
        }

KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs,
MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
final KeyOrValueIngestData ingestData = getIngestData(keyOrValue, schemaRegistryClient, configs,
nextColumnIndexMut, columnDefinitionsOut);
if (ingestData != null && !ingestData.isSimple()) {
nextColumnIndexMut.getAndAdd(ingestData.fieldPathToColumnName.size());
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
}
return ingestData;
}
}

private static final KeyOrValueSpec FROM_PROPERTIES = new SimpleConsume(null, null);
Expand Down Expand Up @@ -1273,9 +1287,9 @@ private static ConsumeStruct getConsumeStruct(
cc.setColumnIndex.setColumnIndex(publisherParametersBuilder, nextColumnIndex.getAndIncrement());
});

final KeyOrValueIngestData keyIngestData = keySpec.getIngestData(KeyOrValue.KEY,
final KeyOrValueIngestData keyIngestData = keySpec.ingestDataAndIncrement(KeyOrValue.KEY,
schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
final KeyOrValueIngestData valueIngestData = valueSpec.getIngestData(KeyOrValue.VALUE,
final KeyOrValueIngestData valueIngestData = valueSpec.ingestDataAndIncrement(KeyOrValue.VALUE,
schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change fixes subtle column-indexing behavior that only shows up with a complex key spec + simple value spec (per DH-22265 / #7884), but there doesn’t appear to be regression coverage exercising getConsumeStruct/consumeToTable with that spec combination. Adding a unit/integration test that builds a TableDefinition via getTableDefinition (or runs a minimal ingest path) for complex-key + simple-value and asserts the resulting indices/column ordering would help prevent future regressions.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing will be in a follow-up PR.


final TableDefinition tableDefinition = TableDefinition.of(columnDefinitions);
Expand Down Expand Up @@ -1670,6 +1684,10 @@ public static class KeyOrValueIngestData {
public int simpleColumnIndex = NULL_COLUMN_INDEX;
public Function<Object, Object> toObjectChunkMapper = Function.identity();
public Object extra;

private boolean isSimple() {
return simpleColumnIndex != NULL_COLUMN_INDEX;
}
}

private interface SetColumnIndex {
Expand Down
Loading