@@ -76,12 +76,14 @@ groups() ->
7676 test_stream_test_utils ,
7777 sac_subscription_with_partition_index_conflict_should_return_error ,
7878 test_metadata_with_advertised_hints ,
79- test_connection_properties_with_advertised_hints
79+ test_connection_properties_with_advertised_hints ,
80+ test_resolve_offset_spec
8081 ]},
8182 % % Run `test_global_counters` on its own so the global metrics are
8283 % % initialised to 0 for each testcase
8384 {single_node_1 , [], [test_global_counters ]},
84- {cluster , [], [test_stream , test_stream_tls , test_metadata , java ]}].
85+ {cluster , [], [test_stream , test_stream_tls , test_metadata , java ,
86+ test_resolve_offset_spec ]}].
8587
8688init_per_suite (Config ) ->
8789 case rabbit_ct_helpers :is_mixed_versions () of
@@ -1236,6 +1238,56 @@ test_connection_properties_with_advertised_hints(Config) ->
12361238
12371239 ok .
12381240
1241+ test_resolve_offset_spec (Config ) ->
1242+ Stream = atom_to_binary (? FUNCTION_NAME , utf8 ),
1243+ Transport = gen_tcp ,
1244+ Port = get_stream_port (Config ),
1245+ Opts = [{active , false }, {mode , binary }],
1246+ {ok , S } = Transport :connect (" localhost" , Port , Opts ),
1247+ C0 = rabbit_stream_core :init (0 ),
1248+ C1 = test_peer_properties (Transport , S , C0 ),
1249+ C2 = test_authenticate (Transport , S , C1 ),
1250+ C3 = test_create_stream (Transport , S , Stream , C2 ),
1251+
1252+ % % Test resolve_offset_spec on empty stream
1253+ C4 = test_resolve_offset_spec (Transport , S , Stream , first , #{},
1254+ ? RESPONSE_CODE_OK , 0 , C3 ),
1255+ C5 = test_resolve_offset_spec (Transport , S , Stream , last , #{},
1256+ ? RESPONSE_CODE_OK , 0 , C4 ),
1257+ C6 = test_resolve_offset_spec (Transport , S , Stream , next , #{},
1258+ ? RESPONSE_CODE_OK , 0 , C5 ),
1259+
1260+ % % Publish some messages
1261+ PublisherId = 1 ,
1262+ C7 = test_declare_publisher (Transport , S , PublisherId , Stream , C6 ),
1263+ Body = <<" hello" >>,
1264+ C8 = test_publish_confirm (Transport , S , PublisherId , 1 , Body , C7 ),
1265+ C9 = test_publish_confirm (Transport , S , PublisherId , 2 , Body , C8 ),
1266+
1267+ % % Test resolve_offset_spec after publishing
1268+ C10 = test_resolve_offset_spec (Transport , S , Stream , first , #{},
1269+ ? RESPONSE_CODE_OK , 0 , C9 ),
1270+ C11 = test_resolve_offset_spec (Transport , S , Stream , last , #{},
1271+ ? RESPONSE_CODE_OK , C10 ),
1272+ C12 = test_resolve_offset_spec (Transport , S , Stream , next , #{},
1273+ ? RESPONSE_CODE_OK , 2 , C11 ),
1274+ C13 = test_resolve_offset_spec (Transport , S , Stream , 0 , #{},
1275+ ? RESPONSE_CODE_OK , 0 , C12 ),
1276+
1277+ % % Test with timestamp (far future should return next offset)
1278+ FutureTimestamp = os :system_time (millisecond ) + 3600000 ,
1279+ C14 = test_resolve_offset_spec (Transport , S , Stream , {timestamp , FutureTimestamp }, #{},
1280+ ? RESPONSE_CODE_OK , 2 , C13 ),
1281+
1282+ % % Test on non-existent stream
1283+ C15 = test_resolve_offset_spec (Transport , S , <<" non_existent_stream" >>, first , #{},
1284+ ? RESPONSE_CODE_STREAM_DOES_NOT_EXIST , 0 , C14 ),
1285+
1286+ C16 = test_delete_stream (Transport , S , Stream , C15 ),
1287+ _C17 = test_close (Transport , S , C16 ),
1288+ closed = wait_for_socket_close (Transport , S , 10 ),
1289+ ok .
1290+
12391291filtered_events (Config , EventType ) ->
12401292 Events = rabbit_ct_broker_helpers :rpc (Config , 0 ,
12411293 gen_event ,
@@ -1729,6 +1781,24 @@ test_stream_stats(Transport, S, Stream, C0) ->
17291781 Cmd ),
17301782 C .
17311783
1784+ test_resolve_offset_spec (Transport , S , Stream , OffsetSpec , Properties ,
1785+ ExpectedResponseCode , C0 ) ->
1786+ Frame = request ({resolve_offset_spec , Stream , OffsetSpec , Properties }),
1787+ ok = Transport :send (S , Frame ),
1788+ {Cmd , C } = receive_commands (Transport , S , C0 ),
1789+ ? assertMatch ({response , 1 , {resolve_offset_spec , ExpectedResponseCode , _ }},
1790+ Cmd ),
1791+ C .
1792+
1793+ test_resolve_offset_spec (Transport , S , Stream , OffsetSpec , Properties ,
1794+ ExpectedResponseCode , ExpectedOffset , C0 ) ->
1795+ Frame = request ({resolve_offset_spec , Stream , OffsetSpec , Properties }),
1796+ ok = Transport :send (S , Frame ),
1797+ {Cmd , C } = receive_commands (Transport , S , C0 ),
1798+ ? assertMatch ({response , 1 , {resolve_offset_spec , ExpectedResponseCode , ExpectedOffset }},
1799+ Cmd ),
1800+ C .
1801+
17321802test_close (Transport , S , C0 ) ->
17331803 CloseReason = <<" OK" >>,
17341804 CloseFrame = request ({close , ? RESPONSE_CODE_OK , CloseReason }),
0 commit comments