Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,45 @@ protected abstract KeyOrValueIngestData getIngestData(
protected abstract KeyOrValueProcessor getProcessor(
TableDefinition tableDef,
KeyOrValueIngestData data);

private KeyOrValueIngestData ingestDataAndIncrement(
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

            protected KeyOrValueIngestData getIngestData(
                    KeyOrValue keyOrValue,
                    SchemaRegistryClient schemaRegistryClient,
                    Map<String, ?> configs,
                    int nextColumnIndex,
                    List<ColumnDefinition<?>> columnDefinitionsOut) {
                return getIngestData(keyOrValue, schemaRegistryClient, configs, new MutableInt(nextColumnIndex),
                        columnDefinitionsOut);
            }

            private KeyOrValueIngestData getIngestDataAndIncrementColumnIndex(
                    final KeyOrValue keyOrValue,
                    final SchemaRegistryClient schemaRegistryClient,
                    Map<String, ?> configs,
                    MutableInt nextColumnIndexMut,
                    List<ColumnDefinition<?>> columnDefinitionsOut) {
                final int ixPre = nextColumnIndexMut.get();
                final int sizePre = columnDefinitionsOut.size();
                // It's unfortunate we have to do this; but the getIngestData signature and design of KeyOrValueSpec
                // would need a breaking change to improve in these regards.
                final KeyOrValueIngestData ingestData = getIngestData(keyOrValue, schemaRegistryClient, configs,
                        ixPre, columnDefinitionsOut);
                final int addedColumns = columnDefinitionsOut.size() - sizePre;
                if (ingestData == null) {
                    // ignore case (or, possible a complex impl that chooses to return null when it's empty):
                    // expecting no modifications
                    if (addedColumns != 0) {
                        throw new IllegalStateException(
                                "KeyOrValueSpec getIngestData is modifying out-variables without returning ingest data");
                    }
                } else {
                    nextColumnIndexMut.set(ixPre + addedColumns);
                }
                return ingestData;
            }
        }

KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs,
MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
final int ixPre = nextColumnIndexMut.get();
final int sizePre = columnDefinitionsOut.size();
// It's unfortunate we have to do this; but the getIngestData signature and design of KeyOrValueSpec
// would need a breaking change to improve in these regards.
final KeyOrValueIngestData ingestData = getIngestData(keyOrValue, schemaRegistryClient, configs,
nextColumnIndexMut, columnDefinitionsOut);
final int addedIx = nextColumnIndexMut.get() - ixPre;
final int addedColumns = columnDefinitionsOut.size() - sizePre;
if (ingestData == null) {
// ignore case (or, possible a complex impl that chooses to return null when it's empty):
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
// expecting no modifications
if (addedIx != 0 || addedColumns != 0) {
throw new IllegalStateException(
"KeyOrValueSpec getIngestData is modifying out-variables without returning ingest data");
}
} else if (ingestData.isSimple()) {
// simple case:
// expecting exactly one increment and one column
if (addedIx != 1 || addedColumns != 1) {
throw new IllegalStateException(
"Simple KeyOrValueSpec getIngestData is not adding exactly one index or one column");
Comment thread
devinrsmith marked this conversation as resolved.
Outdated
}
} else {
// complex case:
// expecting no increments - we'll increase the index based on any columns that were added
if (addedIx != 0) {
throw new IllegalStateException("Complex KeyOrValueSpec getIngestData is mutating the index");
}
nextColumnIndexMut.getAndAdd(addedColumns);
}
return ingestData;
}
}

private static final KeyOrValueSpec FROM_PROPERTIES = new SimpleConsume(null, null);
Expand Down Expand Up @@ -1273,10 +1312,10 @@ private static ConsumeStruct getConsumeStruct(
cc.setColumnIndex.setColumnIndex(publisherParametersBuilder, nextColumnIndex.getAndIncrement());
});

final KeyOrValueIngestData keyIngestData = keySpec.getIngestData(KeyOrValue.KEY,
schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
final KeyOrValueIngestData valueIngestData = valueSpec.getIngestData(KeyOrValue.VALUE,
schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
final KeyOrValueIngestData keyIngestData = keySpec.ingestDataAndIncrement(
KeyOrValue.KEY, schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
final KeyOrValueIngestData valueIngestData = valueSpec.ingestDataAndIncrement(
KeyOrValue.VALUE, schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
Comment thread
devinrsmith marked this conversation as resolved.
Outdated

final TableDefinition tableDefinition = TableDefinition.of(columnDefinitions);
publisherParametersBuilder.setTableDefinition(tableDefinition);
Expand Down Expand Up @@ -1670,6 +1709,10 @@ public static class KeyOrValueIngestData {
public int simpleColumnIndex = NULL_COLUMN_INDEX;
public Function<Object, Object> toObjectChunkMapper = Function.identity();
public Object extra;

private boolean isSimple() {
return simpleColumnIndex != NULL_COLUMN_INDEX;
}
}

private interface SetColumnIndex {
Expand Down
Loading