Reactive: add Sentinel Reactor adapter module#545
Merged
Conversation
- Initial work for common Mono/Flux Signed-off-by: Eric Zhao <sczyh16@gmail.com>
- Add a `InheritableBaseSubscriber` that derives from the original BaseSubscriber of reactor-core Signed-off-by: Eric Zhao <sczyh16@gmail.com>
Signed-off-by: Eric Zhao <sczyh16@gmail.com>
Signed-off-by: Eric Zhao <sczyh16@gmail.com>
…rmer Signed-off-by: Eric Zhao <sczyh16@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Describe what this PR does / why we need it
Add Sentinel Reactor adapter module and implement reactive transformer.
Does this pull request fix one issue?
Resolves #544
Describe how you did it
API Design:
Internal implementation:
Here I've implemented a
SentinelReactorTransformerthat will augment the original stream with the encapsulated Sentinel operator during the assembly stage. There are two kinds of operators:MonoSentinelOperatorimplementsMonoOperatorFluxSentinelOperatorimplementsFluxOperatorBoth will wrap the actual subscriber with the
SentinelReactorSubscriberduringsubscribe.SentinelReactorSubscriberis derived from the ReactorBaseSubscriber(due to the fact that the originalBaseSubscriberdeclares all event callbacks asfinalmethods, I have to copy the original code and modify it with so that the methods are inheritable).Entry logic of Sentinel is wrapped in each hook function (
hookOnSubscribe,hookOnNext,hookOnError,hookOnComplete). The stage:The
hookOnSubscribewill be called during the subscribe stage. Here Sentinel will executeSphU.asyncEntry(xxx)and propagate the event. If the request is blocked, Sentinel will cancel the upstream and propagate the correspondingBlockExceptionwithonErrorcallback.The
hookOnNextwill be called when receiving data. If the upstream has been canceled, Sentinel will just try to exit the entry.Mono), theonNextshould be called at most once, followed by the terminal signal. There may be such kind of event stream:onSubscribe() -> onNext() -> cancel() -> onComplete(). In that case, theonCompleteevent would be dropped. So forMonowe need totryCompleteEntryin bothhookOnNextandhookOnComplete.Flux, we just callonNextto propagate the event and data.In
hookOnCompletewe justtryCompleteEntryand callactual.onComplete()to propagate the event.In
hookOnError, if the error is biz-exception (not aBlockException), we'll record the exception within the entry. Then we'lltryCompleteEntryand propogate the error.How to carry
Contextof Sentinel to build the invocation chain: via ReactorContext(override the actualContext)The response time: from
onSubscribeto terminal stateDescribe how to verify it
Run the test cases.
Special notes for reviews
The TCK did not work well so here I removed tests with TCK. Please check whether the
SentinelReactorSubscriberobeys the Reactive Streams specification.