|
23 | 23 | from typing import Callable, Generator, cast |
24 | 24 |
|
25 | 25 | from twisted.internet import defer, reactor as _reactor |
| 26 | +from twisted.internet.defer import CancelledError, Deferred |
26 | 27 |
|
27 | 28 | from synapse.logging.context import ( |
28 | 29 | SENTINEL_CONTEXT, |
@@ -456,6 +457,105 @@ async def competing_callback() -> None: |
456 | 457 | # Back to the sentinel context |
457 | 458 | self._check_test_key("sentinel") |
458 | 459 |
|
| 460 | + @logcontext_clean |
| 461 | + async def test_deferred_callback_fire_and_forget_with_current_context_asdf( |
| 462 | + self, |
| 463 | + ) -> None: |
| 464 | + """ |
| 465 | + Test that it's possible to call the deferred callback with the current context |
| 466 | + while fire-and-forgetting the callback (no adverse effects like leaking the |
| 467 | + logcontext into the reactor or restarting an already finished logcontext). |
| 468 | +
|
| 469 | + Demonstrates one pattern that we can use fix the naive case where we just call |
| 470 | + `d.callback(None)` without anything else. See the *Deferred callbacks* section |
| 471 | + of docs/log_contexts.md for more details. |
| 472 | + """ |
| 473 | + # Ignore linter error since we are creating a `Clock` for testing purposes. |
| 474 | + clock = Clock(reactor, server_name="test_server") # type: ignore[multiple-internal-clocks] |
| 475 | + |
| 476 | + # Sanity check that we start in the sentinel context |
| 477 | + self._check_test_key("sentinel") |
| 478 | + |
| 479 | + callback_finished = False |
| 480 | + |
| 481 | + async def competing_callback() -> None: |
| 482 | + nonlocal callback_finished |
| 483 | + try: |
| 484 | + # The deferred callback should have the same logcontext as the caller |
| 485 | + self._check_test_key("foo") |
| 486 | + |
| 487 | + with LoggingContext(name="competing", server_name="test_server"): |
| 488 | + await clock.sleep(0) |
| 489 | + self._check_test_key("competing") |
| 490 | + |
| 491 | + self._check_test_key("foo") |
| 492 | + finally: |
| 493 | + # When exceptions happen, we still want to mark the callback as finished |
| 494 | + # so that the test can complete and we see the underlying error. |
| 495 | + callback_finished = True |
| 496 | + |
| 497 | + def competing_callback2() -> None: |
| 498 | + nonlocal callback_finished |
| 499 | + try: |
| 500 | + with LoggingContext(name="competing2", server_name="test_server"): |
| 501 | + # clock.call_later( |
| 502 | + # 0, lambda: defer.ensureDeferred(competing_callback()) |
| 503 | + # ) |
| 504 | + pass |
| 505 | + finally: |
| 506 | + # When exceptions happen, we still want to mark the callback as finished |
| 507 | + # so that the test can complete and we see the underlying error. |
| 508 | + callback_finished = True |
| 509 | + |
| 510 | + # Part of fix for the naive case is here (i.e. things don't work correctly if we |
| 511 | + # don't `PreserveLoggingContext(...)` here). |
| 512 | + # |
| 513 | + # We can extend the lifetime of the "foo" logcontext is to avoid calling the |
| 514 | + # context manager lifetime methods of `LoggingContext` (`__enter__`/`__exit__`). |
| 515 | + # And we can still set the current logcontext by using `PreserveLoggingContext` |
| 516 | + # and passing in the "foo" logcontext. |
| 517 | + with PreserveLoggingContext( |
| 518 | + LoggingContext(name="foo", server_name="test_server") |
| 519 | + ): |
| 520 | + d: defer.Deferred[None] = defer.Deferred() |
| 521 | + # d.addErrback(lambda _: defer.ensureDeferred(competing_callback())) |
| 522 | + d.addErrback(lambda _: competing_callback2()) |
| 523 | + # make_deferred_yieldable(d) |
| 524 | + self._check_test_key("foo") |
| 525 | + # Other part of fix for the naive case is here (i.e. things don't work |
| 526 | + # correctly if we don't `run_in_background(...)` here). |
| 527 | + # |
| 528 | + # `run_in_background(...)` will run the whole lambda in the current |
| 529 | + # logcontext and it handles the magic behind the scenes of a) restoring the |
| 530 | + # calling logcontext before returning to the caller and b) resetting the |
| 531 | + # logcontext to the sentinel after the deferred completes and we yield |
| 532 | + # control back to the reactor to avoid leaking the logcontext into the |
| 533 | + # reactor. |
| 534 | + # |
| 535 | + # We're using a lambda here as a little trick so we can still get everything |
| 536 | + # to run in the "foo" logcontext, but return the deferred `d` itself so that |
| 537 | + # `run_in_background` will wait on that to complete before resetting the |
| 538 | + # logcontext to the sentinel. |
| 539 | + # |
| 540 | + # type-ignore[call-overload]: This appears like a mypy type inference bug. A |
| 541 | + # function that returns a deferred is exactly what `run_in_background` |
| 542 | + # expects. |
| 543 | + # |
| 544 | + # type-ignore[func-returns-value]: This appears like a mypy type inference |
| 545 | + # bug. We're always returning the deferred `d`. |
| 546 | + run_in_background(lambda: (d.cancel(), d)[1]) # type: ignore[call-overload, func-returns-value] |
| 547 | + self._check_test_key("foo") |
| 548 | + |
| 549 | + await clock.sleep(0) |
| 550 | + |
| 551 | + self.assertTrue( |
| 552 | + callback_finished, |
| 553 | + "Callback never finished which means the test probably didn't wait long enough", |
| 554 | + ) |
| 555 | + |
| 556 | + # Back to the sentinel context |
| 557 | + self._check_test_key("sentinel") |
| 558 | + |
459 | 559 | async def _test_run_in_background(self, function: Callable[[], object]) -> None: |
460 | 560 | # Ignore linter error since we are creating a `Clock` for testing purposes. |
461 | 561 | clock = Clock(reactor, server_name="test_server") # type: ignore[multiple-internal-clocks] |
@@ -688,6 +788,77 @@ def test_make_deferred_yieldable_with_chained_deferreds( |
688 | 788 | # now it should be restored |
689 | 789 | self._check_test_key("foo") |
690 | 790 |
|
| 791 | + # @logcontext_clean |
| 792 | + async def test_make_deferred_yieldable_asdf(self) -> None: |
| 793 | + # Ignore linter error since we are creating a `Clock` for testing purposes. |
| 794 | + clock = Clock(reactor, server_name="test_server") # type: ignore[multiple-internal-clocks] |
| 795 | + |
| 796 | + # Sanity check that we start in the sentinel context |
| 797 | + self.assertEqual(current_context(), SENTINEL_CONTEXT) |
| 798 | + |
| 799 | + # Create a deferred which we will never complete |
| 800 | + incomplete_d: Deferred = Deferred() |
| 801 | + |
| 802 | + async def competing_task() -> None: |
| 803 | + with LoggingContext( |
| 804 | + name="competing", server_name="test_server" |
| 805 | + ) as context_competing: |
| 806 | + self.assertNoResult(incomplete_d) |
| 807 | + # We should still be in the logcontext we started in |
| 808 | + self.assertIs(current_context(), context_competing) |
| 809 | + |
| 810 | + # Mimic the normal use case to wait for the work to complete or timeout. |
| 811 | + # |
| 812 | + # In this specific test, we expect the deferred to timeout and raise an |
| 813 | + # exception at this point. |
| 814 | + logger.info("asdf awaiting make_deferred_yieldable") |
| 815 | + await make_deferred_yieldable(incomplete_d) |
| 816 | + |
| 817 | + # We're still in the same logcontext |
| 818 | + self.assertIs(current_context(), context_competing) |
| 819 | + |
| 820 | + # with LoggingContext(name="foo", server_name="test_server"): |
| 821 | + with PreserveLoggingContext( |
| 822 | + LoggingContext(name="foo", server_name="test_server") |
| 823 | + ): |
| 824 | + # d = defer.ensureDeferred(competing_task()) |
| 825 | + d = run_in_background(competing_task) |
| 826 | + # reactor.callLater(0, lambda: defer.ensureDeferred(competing_task())) |
| 827 | + await clock.sleep(0) |
| 828 | + |
| 829 | + logger.info("asdf 1") |
| 830 | + |
| 831 | + self._check_test_key("foo") |
| 832 | + |
| 833 | + # Pump the competing task a bit |
| 834 | + # await clock.sleep(0) |
| 835 | + |
| 836 | + # Cancel the deferred |
| 837 | + logger.info("asdf going to cancel incomplete_d") |
| 838 | + # incomplete_d.cancel() |
| 839 | + # with PreserveLoggingContext(): |
| 840 | + # incomplete_d.cancel() |
| 841 | + # await run_in_background(lambda: (incomplete_d.cancel(), incomplete_d)[1]) |
| 842 | + |
| 843 | + def _cancel_d() -> None: |
| 844 | + logger.info("asdf before cancel") |
| 845 | + incomplete_d.cancel() |
| 846 | + logger.info("asdf after cancel") |
| 847 | + |
| 848 | + await run_in_background(lambda: (_cancel_d(), incomplete_d)[1]) |
| 849 | + |
| 850 | + logger.info("asdf 2") |
| 851 | + |
| 852 | + self._check_test_key("foo") |
| 853 | + |
| 854 | + # We expect a failure due to the timeout |
| 855 | + self.failureResultOf(d, defer.CancelledError) |
| 856 | + |
| 857 | + self._check_test_key("foo") |
| 858 | + |
| 859 | + # Back to the sentinel context at the end of the day |
| 860 | + self.assertEqual(current_context(), SENTINEL_CONTEXT) |
| 861 | + |
691 | 862 | @logcontext_clean |
692 | 863 | def test_nested_logging_context(self) -> None: |
693 | 864 | with LoggingContext(name="foo", server_name="test_server"): |
|
0 commit comments