Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.WrongMethodTypeException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -55,13 +51,6 @@ public class ApimlInstanceRegistry extends InstanceRegistry {

private static final String EXCEPTION_MESSAGE = "Implementation of InstanceRegistry changed, please verify fix of order sending events";

private MethodHandle handleRegistrationMethod;
private MethodHandle handlerResolveInstanceLeaseDurationMethod;
private MethodHandle handleCancellationMethod;

private MethodHandle register2ArgsMethodHandle;
private MethodHandle register3ArgsMethodHandle;
private MethodHandle cancelMethodHandle;
private MethodHandle replicateToPeersMethodHandle;

private final ApplicationContext appCntx;
Expand All @@ -70,6 +59,8 @@ public class ApimlInstanceRegistry extends InstanceRegistry {
private ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry;
private Set<String> staticRegistrationIds = Collections.synchronizedSet(new HashSet<>());

private static final ThreadLocal<Integer> RENEW_CORRECTION = new ThreadLocal<>();

public ApimlInstanceRegistry(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
Expand All @@ -96,79 +87,29 @@ public ApimlInstanceRegistry(
*/
private void init() {
try {
Method registrationMethod =
InstanceRegistry.class.getDeclaredMethod("handleRegistration",
InstanceInfo.class, int.class, boolean.class
);
registrationMethod.setAccessible(true);
handleRegistrationMethod = MethodHandles.lookup().unreflect(registrationMethod);

Method cancelationMethod =
InstanceRegistry.class.getDeclaredMethod("handleCancelation",
String.class, String.class, boolean.class
);
cancelationMethod.setAccessible(true);
handleCancellationMethod = MethodHandles.lookup().unreflect(cancelationMethod);

Method resolveInstanceLeaseDurationMethod =
InstanceRegistry.class.getDeclaredMethod("resolveInstanceLeaseDuration",
InstanceInfo.class
);
resolveInstanceLeaseDurationMethod.setAccessible(true);
handlerResolveInstanceLeaseDurationMethod = MethodHandles.lookup().unreflect(resolveInstanceLeaseDurationMethod);

Constructor<MethodHandles.Lookup> lookupConstructor = MethodHandles.Lookup.class.getDeclaredConstructor(Class.class);
lookupConstructor.setAccessible(true);
MethodHandles.Lookup lookup = lookupConstructor.newInstance(PeerAwareInstanceRegistryImpl.class);

register2ArgsMethodHandle =
lookup.findSpecial(
PeerAwareInstanceRegistryImpl.class,
"register",
MethodType.methodType(void.class, InstanceInfo.class, boolean.class),
PeerAwareInstanceRegistryImpl.class
);

cancelMethodHandle =
lookup.findSpecial(
PeerAwareInstanceRegistryImpl.class,
"cancel",
MethodType.methodType(boolean.class, String.class, String.class, boolean.class),
PeerAwareInstanceRegistryImpl.class
);

lookup = lookupConstructor.newInstance(AbstractInstanceRegistry.class);

register3ArgsMethodHandle =
lookup.findSpecial(
AbstractInstanceRegistry.class,
"register",
MethodType.methodType(void.class, InstanceInfo.class, int.class, boolean.class),
AbstractInstanceRegistry.class
);
Field registryField = AbstractInstanceRegistry.class.getDeclaredField("registry");
registryField.setAccessible(true);
this.registry = (ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>) registryField.get(this);

Method replicateToPeers = PeerAwareInstanceRegistryImpl.class.getDeclaredMethod("replicateToPeers", Action.class, String.class, String.class, InstanceInfo.class, InstanceStatus.class, boolean.class);
replicateToPeers.setAccessible(true);

replicateToPeersMethodHandle = MethodHandles.lookup().unreflect(replicateToPeers);
} catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
} catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
}
}

protected int resolveInstanceLeaseDurationRewritten(final InstanceInfo info) {
try {
return (int) handlerResolveInstanceLeaseDurationMethod.invokeWithArguments(this, info);
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
@Override
protected void updateRenewsPerMinThreshold() {
Integer correction = RENEW_CORRECTION.get();
if (correction != null) {
synchronized (lock) {
this.expectedNumberOfClientsSendingRenews += correction;
}
RENEW_CORRECTION.remove();
}

super.updateRenewsPerMinThreshold();
}

public void peerAwareHeartbeat(InstanceInfo instanceInfo) {
Expand All @@ -191,19 +132,17 @@ public void registerStatically(InstanceInfo instanceInfo, boolean isReplication,
// the maximum lease duration time (Eureka bug: overflow of int during conversion to ms)
int leaseDuration = Integer.MAX_VALUE / 1000;

// temporary register (do not increase count of service to avoid threshold)
synchronized (lock) {
int backup = expectedNumberOfClientsSendingRenews;
try {
register(instanceInfo, leaseDuration, isReplication);
if (peerReplicate) {
replicateToPeersMethodHandle.invokeWithArguments(this, Action.Register, instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null, isReplication);
}
} catch (Throwable e) {
throw new IllegalStateException(EXCEPTION_MESSAGE, e);
} finally {
expectedNumberOfClientsSendingRenews = backup;
try {
// temporary register (do not increase count of service to avoid threshold)
RENEW_CORRECTION.set(-1);
register(instanceInfo, leaseDuration, isReplication);
if (peerReplicate) {
replicateToPeersMethodHandle.invokeWithArguments(this, Action.Register, instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null, isReplication);
}
} catch (Throwable e) {
throw new IllegalStateException(EXCEPTION_MESSAGE, e);
} finally {
RENEW_CORRECTION.remove();
}

// register lease plan to never expired
Expand All @@ -229,34 +168,18 @@ public boolean isExpired(long additionalLeaseMs) {
*/
@Override
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
validateInstanceInfo(info);
info = changeServiceId(info);
try {
register3ArgsMethodHandle.invokeWithArguments(this, info, leaseDuration, isReplication);
handleRegistrationMethod.invokeWithArguments(this, info, leaseDuration, isReplication);
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
}
validateInstanceInfo(info);
info = changeServiceId(info);

super.register(info, leaseDuration, isReplication);
}

@Override
public void register(InstanceInfo info, final boolean isReplication) {
validateInstanceInfo(info);
info = changeServiceId(info);
try {
register2ArgsMethodHandle.invokeWithArguments(this, info, isReplication);
handleRegistrationMethod.invokeWithArguments(this, info, resolveInstanceLeaseDurationRewritten(info), isReplication);
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
}
validateInstanceInfo(info);
info = changeServiceId(info);

super.register(info, isReplication);
}

/**
Expand Down Expand Up @@ -309,25 +232,16 @@ public boolean isRegisterable(InstanceInfo instanceInfo) {

@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
synchronized (lock) {
int backup = expectedNumberOfClientsSendingRenews;
try {
String[] updatedValues = replaceValues(appName, serverId);
final boolean out = (boolean) cancelMethodHandle.invokeWithArguments(this, updatedValues[0], updatedValues[1], isReplication);
handleCancellationMethod.invokeWithArguments(this, updatedValues[0], updatedValues[1], isReplication);
return out;
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
} finally {
if (staticRegistrationIds.removeAll(Optional.ofNullable(registry.get(appName)).orElse(Collections.emptyMap()).keySet())) {
// do not change count of instances if it was registered statically
expectedNumberOfClientsSendingRenews = backup;
}
try {
String[] updatedValues = replaceValues(appName, serverId);

if (staticRegistrationIds.removeAll(Optional.ofNullable(registry.get(appName)).orElse(Collections.emptyMap()).keySet())) {
// do not change count of instances if it was registered statically
RENEW_CORRECTION.set(1);
}
return super.cancel(updatedValues[0], updatedValues[1], isReplication);
} finally {
RENEW_CORRECTION.remove();
}
}

Expand Down
Loading
Loading