|
39 | 39 | -define(MAX_PERMISSION_CACHE_SIZE, 12). |
40 | 40 | -define(CONSUMER_TAG, <<"mqtt">>). |
41 | 41 | -define(QUEUE_TTL_KEY, <<"x-expires">>). |
| 42 | +-define(WILL_QUEUE_EXPIRY_MARGIN_MS, 1_000). |
42 | 43 | -define(DEFAULT_EXCHANGE_NAME, <<>>). |
43 | 44 | -define(FENCE_TIMEOUT, 30_000). |
44 | 45 |
|
@@ -1812,7 +1813,15 @@ maybe_send_will( |
1812 | 1813 | vhost = Vhost |
1813 | 1814 | }} = State) |
1814 | 1815 | when is_integer(Delay) andalso Delay > 0 andalso SessionExpiry > 0 -> |
1815 | | - QArgs0 = queue_ttl_args(SessionExpiry), |
| 1816 | + %% Extend x-expires by a small margin above the Session Expiry Interval |
| 1817 | + %% so that the queue-expiry timer cannot race with the message-TTL + |
| 1818 | + %% dead-letter path and silently drop the Will Message. |
| 1819 | + QArgs0 = case queue_ttl_args(SessionExpiry) of |
| 1820 | + [] -> |
| 1821 | + []; |
| 1822 | + [{Key, Type, TtlMs}] -> |
| 1823 | + [{Key, Type, TtlMs + ?WILL_QUEUE_EXPIRY_MARGIN_MS}] |
| 1824 | + end, |
1816 | 1825 | QArgs = QArgs0 ++ [{<<"x-dead-letter-exchange">>, longstr, XName}, |
1817 | 1826 | {<<"x-dead-letter-routing-key">>, longstr, mqtt_to_amqp(Topic)}], |
1818 | 1827 | T = erlang:monotonic_time(millisecond), |
|
0 commit comments