Raise node disconnected even if the transport is stopped

during the stop process, we raise network disconnect, so it is valid to raise then while we are in stop mode, and actually, we should not miss any events in such a case.
Typically, this is not a problem, since its during the normal shutdown process on the JVM, but when running a reused cluster within the JVM (like in our test infra with the shared cluster), we should properly raise those node disconnects
closes #5918
This commit is contained in:
Shay Banon 2014-04-23 14:06:44 +02:00
parent 4b9f1d261d
commit dedddf3908
2 changed files with 80 additions and 27 deletions

View File

@ -96,7 +96,25 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override
protected void doStop() throws ElasticsearchException {
transport.stop();
try {
transport.stop();
} finally {
// in case the transport is not connected to our local node (thus cleaned on node disconnect)
// make sure to clean any leftover on going handles
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(new TransportException("transport stopped, action: " + holderToNotify.action()));
}
});
}
}
}
}
@Override
@ -291,38 +309,34 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override
public void raiseNodeDisconnected(final DiscoveryNode node) {
if (lifecycle.stoppedOrClosed()) {
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
for (TransportConnectionListener connectionListener : connectionListeners) {
try {
for (final TransportConnectionListener connectionListener : connectionListeners) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
connectionListener.onNodeDisconnected(node);
}
// node got disconnected, raise disconnection on possible ongoing handlers
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedException(node, holderToNotify.action()));
}
});
});
}
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedException(node, holderToNotify.action()));
}
}
});
}
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
}
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
}
}
}

View File

@ -381,6 +381,45 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
}
@Test
public void testNotifyOnShutdown() throws Exception {
final CountDownLatch latch2 = new CountDownLatch(1);
serviceA.registerHandler("foobar", new BaseTransportRequestHandler<StringMessageRequest>() {
@Override
public StringMessageRequest newInstance() {
return new StringMessageRequest();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
try {
latch2.await();
logger.info("Stop ServiceB now");
serviceB.stop();
} catch (Exception e) {
fail(e.getMessage());
}
}
});
TransportFuture<TransportResponse.Empty> foobar = serviceB.submitRequest(nodeA, "foobar",
new StringMessageRequest(""), options(), EmptyTransportResponseHandler.INSTANCE_SAME);
latch2.countDown();
try {
foobar.txGet();
fail("TransportException expected");
} catch (TransportException ex) {
}
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
}
@Test
public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception {
serviceA.registerHandler("sayHelloTimeoutNoResponse", new BaseTransportRequestHandler<StringMessageRequest>() {