Notify onConnectionClosed rather than onNodeDisconnect to prune transport handlers (#24639)

Today we prune transport handlers in TransportService when a node is disconnected.
This can cause connections to starve in the TransportService if the connection is
opened as a short living connection ie. without sharing the connection to a node
via registering in the transport itself. This change now moves to pruning based
on the connections cache key to ensure we notify handlers as soon as the connection
is closed for all connections not just for registered connections.

Relates to #24632
Relates to #24575
Relates to #24557
This commit is contained in:
Simon Willnauer 2017-05-12 15:40:40 +02:00 committed by GitHub
parent 04e08f5e49
commit be2a6ce80b
10 changed files with 114 additions and 28 deletions

View File

@ -101,6 +101,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -357,8 +358,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private final DiscoveryNode node; private final DiscoveryNode node;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final Version version; private final Version version;
private final Consumer<Connection> onClose;
public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) { public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile, Consumer<Connection> onClose) {
this.node = node; this.node = node;
this.channels = channels; this.channels = channels;
assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == " assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == "
@ -369,6 +371,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
typeMapping.put(type, handle); typeMapping.put(type, handle);
} }
version = node.getVersion(); version = node.getVersion();
this.onClose = onClose;
} }
NodeChannels(NodeChannels channels, Version handshakeVersion) { NodeChannels(NodeChannels channels, Version handshakeVersion) {
@ -376,6 +379,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
this.channels = channels.channels; this.channels = channels.channels;
this.typeMapping = channels.typeMapping; this.typeMapping = channels.typeMapping;
this.version = handshakeVersion; this.version = handshakeVersion;
this.onClose = channels.onClose;
} }
@Override @Override
@ -408,6 +412,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList())); closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()));
onClose.accept(this);
} }
} }
@ -519,8 +524,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ? final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout(); connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout); final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node); transportServiceAdapter.onConnectionOpened(nodeChannels);
nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
success = true; success = true;
return nodeChannels; return nodeChannels;
} catch (ConnectTransportException e) { } catch (ConnectTransportException e) {

View File

@ -132,5 +132,13 @@ public interface Transport extends LifecycleComponent {
default Version getVersion() { default Version getVersion() {
return getNode().getVersion(); return getNode().getVersion();
} }
/**
* Returns a key that this connection can be cached on. Delegating subclasses must delegate method call to
* the original connection.
*/
default Object getCacheKey() {
return this;
}
} }
} }

View File

@ -33,8 +33,14 @@ public interface TransportConnectionListener {
*/ */
default void onNodeDisconnected(DiscoveryNode node) {} default void onNodeDisconnected(DiscoveryNode node) {}
/**
* Called once a node connection is closed. The connection might not have been registered in the
* transport as a shared connection to a specific node
*/
default void onConnectionClosed(Transport.Connection connection) {}
/** /**
* Called once a node connection is opened. * Called once a node connection is opened.
*/ */
default void onConnectionOpened(DiscoveryNode node) {} default void onConnectionOpened(Transport.Connection connection) {}
} }

View File

@ -569,7 +569,7 @@ public class TransportService extends AbstractLifecycleComponent {
} }
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true); Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
TransportResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler); TransportResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection.getNode(), action, timeoutHandler)); clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection, action, timeoutHandler));
if (lifecycle.stoppedOrClosed()) { if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
// the caller. It will only notify if the toStop code hasn't done the work yet. // the caller. It will only notify if the toStop code hasn't done the work yet.
@ -810,7 +810,7 @@ public class TransportService extends AbstractLifecycleComponent {
} }
holder.cancelTimeout(); holder.cancelTimeout();
if (traceEnabled() && shouldTraceAction(holder.action())) { if (traceEnabled() && shouldTraceAction(holder.action())) {
traceReceivedResponse(requestId, holder.node(), holder.action()); traceReceivedResponse(requestId, holder.connection().getNode(), holder.action());
} }
return holder.handler(); return holder.handler();
} }
@ -855,12 +855,12 @@ public class TransportService extends AbstractLifecycleComponent {
} }
@Override @Override
public void onConnectionOpened(DiscoveryNode node) { public void onConnectionOpened(Transport.Connection connection) {
// capture listeners before spawning the background callback so the following pattern won't trigger a call // capture listeners before spawning the background callback so the following pattern won't trigger a call
// connectToNode(); connection is completed successfully // connectToNode(); connection is completed successfully
// addConnectionListener(); this listener shouldn't be called // addConnectionListener(); this listener shouldn't be called
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream(); final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node))); threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
} }
@Override @Override
@ -871,20 +871,28 @@ public class TransportService extends AbstractLifecycleComponent {
connectionListener.onNodeDisconnected(node); connectionListener.onNodeDisconnected(node);
} }
}); });
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
}
}
@Override
public void onConnectionClosed(Transport.Connection connection) {
try {
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) { for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue(); RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) { if (holder.connection().getCacheKey().equals(connection.getCacheKey())) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey()); final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) { if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't // callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows // want handlers to worry about stack overflows
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(node, threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
holderToNotify.action()))); connection.getNode(), holderToNotify.action())));
} }
} }
} }
} catch (EsRejectedExecutionException ex) { } catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex); logger.debug("Rejected execution on onConnectionClosed", ex);
} }
} }
@ -929,13 +937,14 @@ public class TransportService extends AbstractLifecycleComponent {
if (holder != null) { if (holder != null) {
// add it to the timeout information holder, in case we are going to get a response later // add it to the timeout information holder, in case we are going to get a response later
long timeoutTime = System.currentTimeMillis(); long timeoutTime = System.currentTimeMillis();
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action(), sentTime, timeoutTime)); timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.connection().getNode(), holder.action(), sentTime,
timeoutTime));
// now that we have the information visible via timeoutInfoHandlers, we try to remove the request id // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id
final RequestHolder removedHolder = clientHandlers.remove(requestId); final RequestHolder removedHolder = clientHandlers.remove(requestId);
if (removedHolder != null) { if (removedHolder != null) {
assert removedHolder == holder : "two different holder instances for request [" + requestId + "]"; assert removedHolder == holder : "two different holder instances for request [" + requestId + "]";
removedHolder.handler().handleException( removedHolder.handler().handleException(
new ReceiveTimeoutTransportException(holder.node(), holder.action(), new ReceiveTimeoutTransportException(holder.connection().getNode(), holder.action(),
"request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]")); "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
} else { } else {
// response was processed, remove timeout info. // response was processed, remove timeout info.
@ -990,15 +999,15 @@ public class TransportService extends AbstractLifecycleComponent {
private final TransportResponseHandler<T> handler; private final TransportResponseHandler<T> handler;
private final DiscoveryNode node; private final Transport.Connection connection;
private final String action; private final String action;
private final TimeoutHandler timeoutHandler; private final TimeoutHandler timeoutHandler;
RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, TimeoutHandler timeoutHandler) { RequestHolder(TransportResponseHandler<T> handler, Transport.Connection connection, String action, TimeoutHandler timeoutHandler) {
this.handler = handler; this.handler = handler;
this.node = node; this.connection = connection;
this.action = action; this.action = action;
this.timeoutHandler = timeoutHandler; this.timeoutHandler = timeoutHandler;
} }
@ -1007,8 +1016,8 @@ public class TransportService extends AbstractLifecycleComponent {
return handler; return handler;
} }
public DiscoveryNode node() { public Transport.Connection connection() {
return this.node; return this.connection;
} }
public String action() { public String action() {

View File

@ -604,8 +604,8 @@ public class UnicastZenPingTests extends ESTestCase {
// install a listener to check that no new connections are made // install a listener to check that no new connections are made
handleA.transportService.addConnectionListener(new TransportConnectionListener() { handleA.transportService.addConnectionListener(new TransportConnectionListener() {
@Override @Override
public void onConnectionOpened(DiscoveryNode node) { public void onConnectionOpened(Transport.Connection connection) {
fail("should not open any connections. got [" + node + "]"); fail("should not open any connections. got [" + connection.getNode() + "]");
} }
}); });

View File

@ -204,7 +204,7 @@ public class TCPTransportTests extends ESTestCase {
@Override @Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException { protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return new NodeChannels(node, new Object[profile.getNumConnections()], profile); return new NodeChannels(node, new Object[profile.getNumConnections()], profile, c -> {});
} }
@Override @Override
@ -220,7 +220,7 @@ public class TCPTransportTests extends ESTestCase {
@Override @Override
public NodeChannels getConnection(DiscoveryNode node) { public NodeChannels getConnection(DiscoveryNode node) {
return new NodeChannels(node, new Object[MockTcpTransport.LIGHT_PROFILE.getNumConnections()], return new NodeChannels(node, new Object[MockTcpTransport.LIGHT_PROFILE.getNumConnections()],
MockTcpTransport.LIGHT_PROFILE); MockTcpTransport.LIGHT_PROFILE, c -> {});
} }
}; };
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);

View File

@ -320,7 +320,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
@Override @Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) { protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) {
final Channel[] channels = new Channel[profile.getNumConnections()]; final Channel[] channels = new Channel[profile.getNumConnections()];
final NodeChannels nodeChannels = new NodeChannels(node, channels, profile); final NodeChannels nodeChannels = new NodeChannels(node, channels, profile, transportServiceAdapter::onConnectionClosed);
boolean success = false; boolean success = false;
try { try {
final TimeValue connectTimeout; final TimeValue connectTimeout;

View File

@ -777,6 +777,11 @@ public final class MockTransportService extends TransportService {
public void close() throws IOException { public void close() throws IOException {
connection.close(); connection.close();
} }
@Override
public Object getCacheKey() {
return connection.getCacheKey();
}
} }
public Transport getOriginalTransport() { public Transport getOriginalTransport() {

View File

@ -2099,9 +2099,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
@Override @Override
public String executor() { public String executor() {
if (1 == 1)
return "same";
return randomFrom(executors); return randomFrom(executors);
} }
}; };
@ -2111,4 +2108,59 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
latch.await(); latch.await();
} }
public void testHandlerIsInvokedOnConnectionClose() throws IOException, InterruptedException {
List<String> executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet());
CollectionUtil.timSort(executors); // makes sure it's reproducible
TransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
serviceC.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
// do nothing
});
serviceC.start();
serviceC.acceptIncomingRequests();
CountDownLatch latch = new CountDownLatch(1);
TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse response) {
try {
fail("no response expected");
} finally {
latch.countDown();
}
}
@Override
public void handleException(TransportException exp) {
try {
assertTrue(exp.getClass().toString(), exp instanceof NodeDisconnectedException);
} finally {
latch.countDown();
}
}
@Override
public String executor() {
return randomFrom(executors);
}
};
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
Transport.Connection connection = serviceB.openConnection(serviceC.getLocalNode(), builder.build());
serviceB.sendRequest(connection, "action", new TestRequest(randomFrom("fail", "pass")), TransportRequestOptions.EMPTY,
transportResponseHandler);
connection.close();
latch.await();
serviceC.close();
}
} }

View File

@ -180,7 +180,8 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
@Override @Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException { protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
final MockChannel[] mockChannels = new MockChannel[1]; final MockChannel[] mockChannels = new MockChannel[1];
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE,
transportServiceAdapter::onConnectionClosed); // we always use light here
boolean success = false; boolean success = false;
final MockSocket socket = new MockSocket(); final MockSocket socket = new MockSocket();
try { try {