@@ -336,6 +336,71 @@ protected abstract KeyOrValueIngestData getIngestData(
336336 protected abstract KeyOrValueProcessor getProcessor (
337337 TableDefinition tableDef ,
338338 KeyOrValueIngestData data );
339+
340+ private KeyOrValueIngestData getIngestDataAndIncrementColumnIndex (
341+ final KeyOrValue keyOrValue ,
342+ final SchemaRegistryClient schemaRegistryClient ,
343+ final Map <String , ?> configs ,
344+ final MutableInt nextColumnIndexMut ,
345+ final List <ColumnDefinition <?>> columnDefinitionsOut ) {
346+ final int ixPre = nextColumnIndexMut .get ();
347+ final int sizePre = columnDefinitionsOut .size ();
348+ // It's unfortunate we have to do this; but the getIngestData signature and design of KeyOrValueSpec
349+ // would need a breaking change to improve in these regards. We are trying to be "lenient" for impls
350+ // that improperly update nextColumnIndexMut, or setting the wrong index on simpleColumnIndex, and
351+ // warn them that it appears they are doing something wrong. If an implementation is improperly adding
352+ // to columnsDefinitionOut though, this is a more serious error.
353+ final KeyOrValueIngestData ingestData = getIngestData (keyOrValue , schemaRegistryClient , configs ,
354+ nextColumnIndexMut , columnDefinitionsOut );
355+ // Note: choosing to _not_ log warning when caller is mutating the index "incorrectly", but keeping code
356+ // commented out here for future reference.
357+ // final int addedIx = nextColumnIndexMut.get() - ixPre;
358+ final int addedColumns = columnDefinitionsOut .size () - sizePre ;
359+ if (ingestData == null ) {
360+ // ignore case (or, possibly a complex impl that chooses to return null when it's empty):
361+ // expecting no modifications
362+ if (addedColumns != 0 ) {
363+ throw new IllegalStateException (
364+ "KeyOrValueSpec getIngestData is modifying columnDefinitionsOut without returning ingest data" );
365+ }
366+ // if (addedIx != 0) {
367+ // log.warn().append(
368+ // "Ignore KeyOrValueSpec getIngestData is improperly mutating the index; manually correcting...")
369+ // .endl();
370+ // }
371+ } else if (ingestData .isSimple ()) {
372+ // simple case:
373+ // expecting exactly one increment and one column
374+ if (addedColumns != 1 ) {
375+ throw new IllegalStateException (
376+ "Simple KeyOrValueSpec getIngestData is not adding exactly one column" );
377+ }
378+ // if (addedIx != 1) {
379+ // log.warn().append(
380+ // "Simple KeyOrValueSpec did not properly mutate the index by 1; manually correcting...")
381+ // .endl();
382+ // }
383+ // if (ingestData.simpleColumnIndex != ixPre) {
384+ // log.warn().append("Simple KeyOrValueSpec set a bad simpleColumnIndex; manually correcting...")
385+ // .endl();
386+ // }
387+ // The fact that we are setting this here means that this would ideally not be the responsibility of
388+ // the implementation...
389+ ingestData .simpleColumnIndex = ixPre ;
390+ } else {
391+ // complex case:
392+ // expecting no increments - we'll increase the index based on any columns that were added
393+ // if (addedIx != 0) {
394+ // log.warn().append(
395+ // "Complex KeyOrValueSpec getIngestData is improperly mutating the index; manually correcting...")
396+ // .endl();
397+ // }
398+ }
399+ // The fact that we are setting this here means that this would ideally not be the responsibility of the
400+ // implementation...
401+ nextColumnIndexMut .set (ixPre + addedColumns );
402+ return ingestData ;
403+ }
339404 }
340405
341406 private static final KeyOrValueSpec FROM_PROPERTIES = new SimpleConsume (null , null );
@@ -1273,10 +1338,10 @@ private static ConsumeStruct getConsumeStruct(
12731338 cc .setColumnIndex .setColumnIndex (publisherParametersBuilder , nextColumnIndex .getAndIncrement ());
12741339 });
12751340
1276- final KeyOrValueIngestData keyIngestData = keySpec .getIngestData ( KeyOrValue . KEY ,
1277- schemaRegistryClient , configs , nextColumnIndex , columnDefinitions );
1278- final KeyOrValueIngestData valueIngestData = valueSpec .getIngestData ( KeyOrValue . VALUE ,
1279- schemaRegistryClient , configs , nextColumnIndex , columnDefinitions );
1341+ final KeyOrValueIngestData keyIngestData = keySpec .getIngestDataAndIncrementColumnIndex (
1342+ KeyOrValue . KEY , schemaRegistryClient , configs , nextColumnIndex , columnDefinitions );
1343+ final KeyOrValueIngestData valueIngestData = valueSpec .getIngestDataAndIncrementColumnIndex (
1344+ KeyOrValue . VALUE , schemaRegistryClient , configs , nextColumnIndex , columnDefinitions );
12801345
12811346 final TableDefinition tableDefinition = TableDefinition .of (columnDefinitions );
12821347 publisherParametersBuilder .setTableDefinition (tableDefinition );
@@ -1670,6 +1735,10 @@ public static class KeyOrValueIngestData {
16701735 public int simpleColumnIndex = NULL_COLUMN_INDEX ;
16711736 public Function <Object , Object > toObjectChunkMapper = Function .identity ();
16721737 public Object extra ;
1738+
1739+ private boolean isSimple () {
1740+ return simpleColumnIndex != NULL_COLUMN_INDEX ;
1741+ }
16731742 }
16741743
16751744 private interface SetColumnIndex {
0 commit comments