113113import org .immutables .value .Value .Parameter ;
114114import org .jetbrains .annotations .NotNull ;
115115import org .jetbrains .annotations .Nullable ;
116+ import org .jetbrains .annotations .VisibleForTesting ;
116117
117118import java .util .ArrayList ;
118119import java .util .Arrays ;
@@ -1034,7 +1035,20 @@ public static Table consumeToTable(
10341035 @ NotNull final Consume .KeyOrValueSpec keySpec ,
10351036 @ NotNull final Consume .KeyOrValueSpec valueSpec ,
10361037 @ NotNull final TableType tableType ) {
1037- final MutableObject <Table > resultHolder = new MutableObject <>();
1038+ return consumeToTableAndAdapter (kafkaProperties , topic , partitionFilter , partitionToInitialOffset , keySpec ,
1039+ valueSpec , tableType ).table ();
1040+ }
1041+
1042+ @ VisibleForTesting
1043+ static TableAndAdapter consumeToTableAndAdapter (
1044+ @ NotNull final Properties kafkaProperties ,
1045+ @ NotNull final String topic ,
1046+ @ NotNull final IntPredicate partitionFilter ,
1047+ @ NotNull final IntToLongFunction partitionToInitialOffset ,
1048+ @ NotNull final Consume .KeyOrValueSpec keySpec ,
1049+ @ NotNull final Consume .KeyOrValueSpec valueSpec ,
1050+ @ NotNull final TableType tableType ) {
1051+ final MutableObject <TableAndAdapter > resultHolder = new MutableObject <>();
10381052 final ExecutionContext enclosingExecutionContext = ExecutionContext .getContext ();
10391053 final LivenessManager enclosingLivenessManager = LivenessScopeStack .peek ();
10401054
@@ -1052,14 +1066,35 @@ public static Table consumeToTable(
10521066 final Table blinkTable = streamToBlinkTableAdapter .table ();
10531067 final Table result = tableType .walk (new BlinkTableOperation (blinkTable ));
10541068 enclosingLivenessManager .manage (result );
1055- resultHolder .setValue (result );
1069+ // Note: not adding streamToBlinkTableAdapter to liveness manager; liveness is expected to be
1070+ // retained through the result table. This is only relevant for testing.
1071+ resultHolder .setValue (new TableAndAdapter (result , streamToBlinkTableAdapter ));
10561072 }
10571073 };
10581074
10591075 consume (kafkaProperties , topic , partitionFilter ,
10601076 InitialOffsetLookup .adapt (partitionToInitialOffset ), keySpec , valueSpec ,
10611077 StreamConsumerRegistrarProvider .single (registrar ), null );
1062- return resultHolder .getValue ();
1078+ return resultHolder .get ();
1079+ }
1080+
1081+ @ VisibleForTesting
1082+ static class TableAndAdapter {
1083+ private final Table table ;
1084+ private final StreamToBlinkTableAdapter adapter ;
1085+
1086+ private TableAndAdapter (Table table , StreamToBlinkTableAdapter adapter ) {
1087+ this .table = Objects .requireNonNull (table );
1088+ this .adapter = Objects .requireNonNull (adapter );
1089+ }
1090+
1091+ public Table table () {
1092+ return table ;
1093+ }
1094+
1095+ public StreamToBlinkTableAdapter adapter () {
1096+ return adapter ;
1097+ }
10631098 }
10641099
10651100 /**
0 commit comments