Skip to content

rabbit_stream_coordinator: Make new_stream command idempotent (backport #15706)#15768

Merged
the-mikedavis merged 1 commit intov4.3.xfrom
mergify/bp/v4.3.x/pr-15706
Mar 18, 2026
Merged

rabbit_stream_coordinator: Make new_stream command idempotent (backport #15706)#15768
the-mikedavis merged 1 commit intov4.3.xfrom
mergify/bp/v4.3.x/pr-15706

Conversation

@mergify
Copy link
Copy Markdown

@mergify mergify Bot commented Mar 18, 2026

This is similar to #14884 but for new_stream.

Although its unlikely from the calling code, the stream coordinator can end up with two {new_stream, StreamId, #{}} commands in its log for the same StreamId. When handling the second new_stream it will then warning-log that the stream can't be updated, like so:

2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> rabbit_stream_coordinator failed to update stream:
2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> function_clause
2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> [{rabbit_stream_coordinator,update_stream0,[#{index => 51,system_time => 1773163898892,reply_mode => await_consensus,machine_version => 6,...},{new_stream,[95|...],#{}},{stream,[...],...}],[{file,[114|...]},{line,1397}]},{rabbit_stream_coordinator,update_stream,3,[{file,[...]},{line,...}]},{rabbit_stream_coordinator,apply,3,[{file,...},{...}]},{ra_server,apply_with,2,[{...}|...]},{ra_log,fold,5,[...]},{ra_server,apply_to,5,...},{ra_server,evaluate_commit_index_follower,...},{ra_server,...}]

update_stream0/3 only has a function clause for when there is no stream (i.e. undefined,

update_stream0(#{system_time := _} = Meta,
{new_stream, StreamId, #{leader_node := LeaderNode,
queue := Q}}, undefined) ->
#{nodes := Nodes} = Conf = amqqueue:get_type_state(Q),
%% this jumps straight to the state where all members
%% have been stopped and a new writer has been chosen
E = 1,
QueueRef = amqqueue:get_name(Q),
Members = maps:from_list(
[{N, #member{role = case LeaderNode of
N -> {writer, E};
_ -> {replica, E}
end,
state = {ready, E},
%% no members are running actions
current = undefined}
} || N <- Nodes]),
#stream{id = StreamId,
epoch = E,
nodes = Nodes,
queue_ref = QueueRef,
conf = Conf,
members = Members,
reply_to = maps:get(from, Meta, undefined)};
). I think it's reasonable to have the new_stream command become idempotent and effectively skip it if it is in the log twice.

I haven't been able to reproduce the scenario where this happened. Next time I will dump Khepri and the coordinator's logs to see what sequence of commands lead to this.


This is an automatic backport of pull request #15706 done by Mergify.

@the-mikedavis the-mikedavis added this to the 4.3.0 milestone Mar 18, 2026
@the-mikedavis the-mikedavis merged commit 5c17d3e into v4.3.x Mar 18, 2026
184 checks passed
@the-mikedavis the-mikedavis deleted the mergify/bp/v4.3.x/pr-15706 branch March 18, 2026 18:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant