diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 963428f5c63..67fc35d1d72 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -336,6 +336,71 @@ protected abstract KeyOrValueIngestData getIngestData( protected abstract KeyOrValueProcessor getProcessor( TableDefinition tableDef, KeyOrValueIngestData data); + + private KeyOrValueIngestData getIngestDataAndIncrementColumnIndex( + final KeyOrValue keyOrValue, + final SchemaRegistryClient schemaRegistryClient, + final Map configs, + final MutableInt nextColumnIndexMut, + final List> columnDefinitionsOut) { + final int ixPre = nextColumnIndexMut.get(); + final int sizePre = columnDefinitionsOut.size(); + // It's unfortunate we have to do this; but the getIngestData signature and design of KeyOrValueSpec + // would need a breaking change to improve in these regards. We are trying to be "lenient" for impls + // that improperly update nextColumnIndexMut, or setting the wrong index on simpleColumnIndex, and + // warn them that it appears they are doing something wrong. If an implementation is improperly adding + // to columnsDefinitionOut though, this is a more serious error. + final KeyOrValueIngestData ingestData = getIngestData(keyOrValue, schemaRegistryClient, configs, + nextColumnIndexMut, columnDefinitionsOut); + // Note: choosing to _not_ log warning when caller is mutating the index "incorrectly", but keeping code + // commented out here for future reference. + // final int addedIx = nextColumnIndexMut.get() - ixPre; + final int addedColumns = columnDefinitionsOut.size() - sizePre; + if (ingestData == null) { + // ignore case (or, possibly a complex impl that chooses to return null when it's empty): + // expecting no modifications + if (addedColumns != 0) { + throw new IllegalStateException( + "KeyOrValueSpec getIngestData is modifying columnDefinitionsOut without returning ingest data"); + } + // if (addedIx != 0) { + // log.warn().append( + // "Ignore KeyOrValueSpec getIngestData is improperly mutating the index; manually correcting...") + // .endl(); + // } + } else if (ingestData.isSimple()) { + // simple case: + // expecting exactly one increment and one column + if (addedColumns != 1) { + throw new IllegalStateException( + "Simple KeyOrValueSpec getIngestData is not adding exactly one column"); + } + // if (addedIx != 1) { + // log.warn().append( + // "Simple KeyOrValueSpec did not properly mutate the index by 1; manually correcting...") + // .endl(); + // } + // if (ingestData.simpleColumnIndex != ixPre) { + // log.warn().append("Simple KeyOrValueSpec set a bad simpleColumnIndex; manually correcting...") + // .endl(); + // } + // The fact that we are setting this here means that this would ideally not be the responsibility of + // the implementation... + ingestData.simpleColumnIndex = ixPre; + } else { + // complex case: + // expecting no increments - we'll increase the index based on any columns that were added + // if (addedIx != 0) { + // log.warn().append( + // "Complex KeyOrValueSpec getIngestData is improperly mutating the index; manually correcting...") + // .endl(); + // } + } + // The fact that we are setting this here means that this would ideally not be the responsibility of the + // implementation... + nextColumnIndexMut.set(ixPre + addedColumns); + return ingestData; + } } private static final KeyOrValueSpec FROM_PROPERTIES = new SimpleConsume(null, null); @@ -1273,10 +1338,10 @@ private static ConsumeStruct getConsumeStruct( cc.setColumnIndex.setColumnIndex(publisherParametersBuilder, nextColumnIndex.getAndIncrement()); }); - final KeyOrValueIngestData keyIngestData = keySpec.getIngestData(KeyOrValue.KEY, - schemaRegistryClient, configs, nextColumnIndex, columnDefinitions); - final KeyOrValueIngestData valueIngestData = valueSpec.getIngestData(KeyOrValue.VALUE, - schemaRegistryClient, configs, nextColumnIndex, columnDefinitions); + final KeyOrValueIngestData keyIngestData = keySpec.getIngestDataAndIncrementColumnIndex( + KeyOrValue.KEY, schemaRegistryClient, configs, nextColumnIndex, columnDefinitions); + final KeyOrValueIngestData valueIngestData = valueSpec.getIngestDataAndIncrementColumnIndex( + KeyOrValue.VALUE, schemaRegistryClient, configs, nextColumnIndex, columnDefinitions); final TableDefinition tableDefinition = TableDefinition.of(columnDefinitions); publisherParametersBuilder.setTableDefinition(tableDefinition); @@ -1670,6 +1735,10 @@ public static class KeyOrValueIngestData { public int simpleColumnIndex = NULL_COLUMN_INDEX; public Function toObjectChunkMapper = Function.identity(); public Object extra; + + private boolean isSimple() { + return simpleColumnIndex != NULL_COLUMN_INDEX; + } } private interface SetColumnIndex {