Skip to content

Commit 9a20e1b

Browse files
Merge pull request #185 from benjchristensen/request-context
BugFix: Observable.observeOn Scheduler Lost RequestContext
2 parents 7220a9c + 5b900da commit 9a20e1b

4 files changed

Lines changed: 177 additions & 3 deletions

File tree

hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
7474
import com.netflix.hystrix.strategy.concurrency.HystrixContextCallable;
7575
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
76+
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
7677
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
7778
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
7879
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
@@ -848,7 +849,12 @@ public Observable<R> call(Throwable e) {
848849
});
849850

850851
// we want to hand off work to a different scheduler so we don't tie up the Hystrix thread
851-
o = o.observeOn(observeOn);
852+
if (!Schedulers.immediate().equals(observeOn)) {
853+
// don't waste overhead if it's the 'immediate' scheduler
854+
// otherwise we'll 'observeOn' and wrap with the HystrixContextScheduler
855+
// to copy state across threads (if threads are involved)
856+
o = o.observeOn(new HystrixContextScheduler(observeOn));
857+
}
852858

853859
o = o.finallyDo(new Action0() {
854860

hystrix-core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategy.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2012 Netflix, Inc.
2+
* Copyright 2013 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,8 +24,17 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicInteger;
2626

27-
import com.netflix.hystrix.HystrixCollapser;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
import rx.util.functions.Action1;
32+
import rx.util.functions.Func1;
33+
34+
import com.netflix.config.ConfigurationManager;
2835
import com.netflix.hystrix.HystrixCommand;
36+
import com.netflix.hystrix.HystrixCommandGroupKey;
37+
import com.netflix.hystrix.HystrixRequestLog;
2938
import com.netflix.hystrix.HystrixThreadPool;
3039
import com.netflix.hystrix.HystrixThreadPoolKey;
3140
import com.netflix.hystrix.strategy.HystrixPlugins;
@@ -151,5 +160,64 @@ public void shutdown(T value) {
151160
};
152161
};
153162
}
163+
164+
165+
public static class UnitTest {
166+
167+
@Before
168+
public void prepareForTest() {
169+
/* we must call this to simulate a new request lifecycle running and clearing caches */
170+
HystrixRequestContext.initializeContext();
171+
}
172+
173+
@After
174+
public void cleanup() {
175+
// instead of storing the reference from initialize we'll just get the current state and shutdown
176+
if (HystrixRequestContext.getContextForCurrentThread() != null) {
177+
// it could have been set NULL by the test
178+
HystrixRequestContext.getContextForCurrentThread().shutdown();
179+
}
180+
181+
// force properties to be clean as well
182+
ConfigurationManager.getConfigInstance().clear();
183+
}
184+
185+
/**
186+
* If the RequestContext does not get transferred across threads correctly this blows up.
187+
* No specific assertions are necessary.
188+
*/
189+
@Test
190+
public void testRequestContextPropagatesAcrossObserveOnPool() {
191+
new SimpleCommand().execute();
192+
new SimpleCommand().observe().map(new Func1<String, String>() {
193+
194+
@Override
195+
public String call(String s) {
196+
System.out.println("Map => Commands: " + HystrixRequestLog.getCurrentRequest().getExecutedCommands());
197+
return s;
198+
}
199+
}).toBlockingObservable().forEach(new Action1<String>() {
200+
201+
@Override
202+
public void call(String s) {
203+
System.out.println("Result [" + s + "] => Commands: " + HystrixRequestLog.getCurrentRequest().getExecutedCommands());
204+
}
205+
});
206+
}
207+
208+
private static class SimpleCommand extends HystrixCommand<String> {
209+
210+
public SimpleCommand() {
211+
super(HystrixCommandGroupKey.Factory.asKey("SimpleCommand"));
212+
}
213+
214+
@Override
215+
protected String run() throws Exception {
216+
System.out.println("Executing => Commands: " + HystrixRequestLog.getCurrentRequest().getExecutedCommands());
217+
return "Hello";
218+
}
219+
220+
}
221+
}
154222

155223
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright 2012 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.strategy.concurrency;
17+
18+
import rx.Scheduler;
19+
import rx.Subscription;
20+
import rx.util.functions.Func2;
21+
22+
/**
23+
* Wrapper around {@link Func2} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Func2}
24+
*
25+
* @param <T>
26+
* Return type of {@link Func2}
27+
*
28+
* @ExcludeFromJavadoc
29+
*/
30+
public class HystrixContextFunc2<T> implements Func2<Scheduler, T, Subscription> {
31+
32+
private final Func2<? super Scheduler, ? super T, ? extends Subscription> actual;
33+
private final HystrixRequestContext parentThreadState;
34+
35+
public HystrixContextFunc2(Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
36+
this.actual = action;
37+
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
38+
}
39+
40+
@Override
41+
public Subscription call(Scheduler t1, T t2) {
42+
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
43+
try {
44+
// set the state of this thread to that of its parent
45+
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
46+
// execute actual Func2 with the state of the parent
47+
return actual.call(t1, t2);
48+
} finally {
49+
// restore this thread back to its original state
50+
HystrixRequestContext.setContextOnCurrentThread(existingState);
51+
}
52+
}
53+
54+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.strategy.concurrency;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.Scheduler;
21+
import rx.Subscription;
22+
import rx.util.functions.Func2;
23+
24+
/**
25+
* Wrap a {@link Scheduler} so that scheduled actions are wrapped with {@link HystrixContextFunc2} so that
26+
* the {@link HystrixRequestContext} is properly copied across threads (if they are used by the {@link Scheduler}).
27+
*/
28+
public class HystrixContextScheduler extends Scheduler {
29+
30+
private final Scheduler actualScheduler;
31+
32+
public HystrixContextScheduler(Scheduler scheduler) {
33+
this.actualScheduler = scheduler;
34+
}
35+
36+
@Override
37+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
38+
return actualScheduler.schedule(state, new HystrixContextFunc2<T>(action));
39+
}
40+
41+
@Override
42+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
43+
return actualScheduler.schedule(state, new HystrixContextFunc2<T>(action), delayTime, unit);
44+
}
45+
46+
}

0 commit comments

Comments
 (0)