Skip to content

Commit a6916c9

Browse files
authored
core: Fix pick_first NPE when accepting resolved addresses and in CONNECTING state (#12814)
This PR resolves #12796. It makes sure that whenever `PickFirstLeafLoadBalancer` transitions into `CONNECTING` the current address in the `addressIndex` has a corresponding subchannel. This prevents an NPE when hitting https://github.com/grpc/grpc-java/blob/86fa86063b9a7f3965a0401506b0f8616463c73d/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L143-L154 in `acceptResolvedAddresses` in some situations. Note that for `READY`, a state from which this path can also be entered, this is already guaranteed.
1 parent 52f2cd5 commit a6916c9

2 files changed

Lines changed: 148 additions & 0 deletions

File tree

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,16 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
333333

334334
case CONNECTING:
335335
rawConnectivityState = CONNECTING;
336+
// If we get a newly resolved address list via acceptResolvedAddresses,
337+
// as we are in CONNECTING, we will try to .updateAddresses the currently
338+
// connecting subchannel if it exists in the new list.
339+
// As such, We need to make sure that with transitioning to CONNECTING the subchannel for
340+
// the current address of a valid index exists.
341+
if ((!enableHappyEyeballs && !addressIndex.isValid())
342+
|| (addressIndex.isValid() && !subchannels.containsKey(
343+
addressIndex.getCurrentAddress()))) {
344+
addressIndex.seekTo(getAddress(subchannelData.subchannel));
345+
}
336346
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
337347
break;
338348

@@ -636,6 +646,11 @@ ConnectivityState getConcludedConnectivityState() {
636646
return this.concludedState;
637647
}
638648

649+
@VisibleForTesting
650+
ConnectivityState getRawConnectivityState() {
651+
return this.rawConnectivityState;
652+
}
653+
639654
/**
640655
* Picker that requests connection during the first pick, and returns noResult.
641656
*/

core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,139 @@ public void healthCheckFlow() {
642642
verifyNoMoreInteractions(mockHelper);
643643
}
644644

645+
// reproduces #12796
646+
@Test
647+
public void healthCheckWithTF_AllowsStateInconsistency() {
648+
assumeTrue(!serializeRetries);
649+
650+
when(mockSubchannel1.getAttributes()).thenReturn(
651+
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());
652+
653+
Attributes petioleAttributes =
654+
Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build();
655+
656+
loadBalancer.acceptResolvedAddresses(
657+
ResolvedAddresses.newBuilder()
658+
.setAddresses(
659+
Lists.newArrayList(
660+
/* server 1 */servers.get(0),
661+
/* server 3 */servers.get(2)
662+
))
663+
.setAttributes(petioleAttributes)
664+
.build());
665+
666+
// Get the state and health listener for subchannel 1
667+
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
668+
SubchannelStateListener healthListener1 =
669+
createArgsCaptor.getValue().getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY);
670+
verify(mockSubchannel1).start(stateListenerCaptor.capture());
671+
SubchannelStateListener stateListener1 = stateListenerCaptor.getValue();
672+
673+
// As start() was called, we transition subchannel 1 to CONNECTING...
674+
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
675+
healthListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
676+
677+
// ...which eventually ends up READY.
678+
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
679+
healthListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
680+
681+
// Let the fun begin: subchannel 1's health turns into TRANSIENT_FAILURE
682+
healthListener1.onSubchannelState(
683+
ConnectivityStateInfo.forTransientFailure(
684+
Status.UNAVAILABLE.withDescription("health failure")));
685+
// HealthListener.onSubchannelState gets called. It updates the LBs balancing
686+
// state/concludedState.
687+
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
688+
assertEquals(READY, loadBalancer.getRawConnectivityState());
689+
690+
// Subchannel 1's transport goes idle
691+
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
692+
693+
// LB's raw connectivity stays ready as the TRANSIENT_FAILURE health state
694+
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
695+
assertEquals(READY, loadBalancer.getRawConnectivityState());
696+
assertEquals(0, loadBalancer.getIndexLocation());
697+
698+
// LB tries to reconnect subchannel 1.
699+
verify(mockSubchannel1, times(2)).requestConnection();
700+
701+
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
702+
703+
// LB is waiting for subchannel 1 to report status.
704+
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
705+
assertEquals(READY, loadBalancer.getRawConnectivityState());
706+
assertEquals(0, loadBalancer.getIndexLocation());
707+
708+
// Subchannel 1's new connection attempt fails and reports TRANSIENT_FAILURE.
709+
stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
710+
711+
// LB increments the index and tries to connect to server 3.
712+
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
713+
assertEquals(READY, loadBalancer.getRawConnectivityState());
714+
assertEquals(1, loadBalancer.getIndexLocation());
715+
verify(mockSubchannel3).start(stateListenerCaptor.capture());
716+
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
717+
verify(mockSubchannel3).requestConnection();
718+
stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
719+
720+
// Subchannel 3 connection did not change the state as we are
721+
// still in TRANSIENT_FAILURE health state.
722+
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
723+
assertEquals(READY, loadBalancer.getRawConnectivityState());
724+
assertEquals(1, loadBalancer.getIndexLocation());
725+
726+
List<EquivalentAddressGroup> newServers =
727+
Lists.newArrayList(
728+
/* server 2 */
729+
servers.get(1),
730+
/* server 1 */
731+
servers.get(0)
732+
);
733+
734+
// The resolver update removes the (current) subchannel 3, keeps server 1, and
735+
// resets addressIndex to server2, which has no subchannel.
736+
loadBalancer.acceptResolvedAddresses(
737+
ResolvedAddresses.newBuilder()
738+
.setAddresses(newServers)
739+
.setAttributes(petioleAttributes)
740+
.build());
741+
742+
verify(mockSubchannel3, times(1)).shutdown();
743+
744+
// LB thinks that there are no subchannels that are trying to connect.
745+
assertEquals(IDLE, loadBalancer.getRawConnectivityState());
746+
assertEquals(IDLE, loadBalancer.getConcludedConnectivityState());
747+
// As mentioned, the LB resets the index to 0 by calling addressIndex.updateGroups.
748+
// Given the new list, it is now pointing to server 2 which does not have a subchannel.
749+
assertEquals(0, loadBalancer.getIndexLocation());
750+
751+
// Subchannel 1 is still in TRANSIENT_FAILURE state. Is backoff expires,
752+
// and now it is retrying to connect. This state listener transitions the LB to CONNECTING.
753+
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
754+
755+
// As our health state is IDLE now the LB handles the CONNECTING subchannel state change
756+
// by transitioning into CONNECTING itself.
757+
assertEquals(CONNECTING, loadBalancer.getRawConnectivityState());
758+
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
759+
760+
// Before the fix:
761+
// The index is now pointing to server 2 for which the LB did not create a subchannel yet.
762+
// assertEquals(0, loadBalancer.getIndexLocation());
763+
764+
// The index is now pointing to server 1
765+
assertEquals(1, loadBalancer.getIndexLocation());
766+
767+
// The resolver refreshes and provides the same addresses.
768+
// As the LB is in CONNECTING, acceptResolvedAddresses tries
769+
// to get the subchannel represented from the current index (server 2) and
770+
// update its addresses. As the subchannel still does not exist an NPE is thrown.
771+
assertEquals(Status.OK, loadBalancer.acceptResolvedAddresses(
772+
ResolvedAddresses.newBuilder()
773+
.setAddresses(newServers)
774+
.setAttributes(petioleAttributes)
775+
.build()));
776+
}
777+
645778
@Test
646779
public void pickAfterStateChangeAfterResolution() {
647780
InOrder inOrder =

0 commit comments

Comments
 (0)