Discovery: Improve the lifecycle management of the join control thread in zen discovery.

Also added:
* Better exception handling in UnicastZenPing#ping and MulticastZenPing#ping
* In the join thread that runs the innerJoinCluster loop, remember the last known exception and throw that when assertions are enabled. We loop until inner join has completed and if continues exceptions are thrown we should fail the test, because the exception shouldn't occur in production (at least not too often).
Applied feedback 3

Closes #8327
This commit is contained in:
Martijn van Groningen 2014-11-03 12:36:13 +01:00
parent 82278bb7bc
commit 4ddb0575b5
4 changed files with 125 additions and 88 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
@ -184,4 +185,18 @@ public final class ExceptionsHelper {
)
);
}
/**
* Throws the specified exception. If null if specified then <code>true</code> is returned.
*/
public static boolean reThrowIfNotNull(@Nullable Throwable e) {
if (e != null) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
return true;
}
}

View File

@ -231,8 +231,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
protected void doStart() throws ElasticsearchException {
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
@ -249,7 +249,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.warn("failed to start initial join process", t);
}
});
}
@Override
@ -344,8 +343,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
/**
* returns true if there is a currently a background thread active for (re)joining the cluster
* used for testing.
* returns true if zen discovery is started and there is a currently a background thread active for (re)joining
* the cluster used for testing.
*/
public boolean joiningCluster() {
return joinThreadControl.joinThreadActive();
@ -1278,21 +1277,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private class JoinThreadControl {
private final ThreadPool threadPool;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();
public JoinThreadControl(ThreadPool threadPool) {
this.threadPool = threadPool;
}
/** returns true if there is currently an active join thread */
/** returns true if join thread control is started and there is currently an active join thread */
public boolean joinThreadActive() {
Thread currentThread = currentJoinThread.get();
return currentThread != null && currentThread.isAlive();
return running.get() && currentThread != null && currentThread.isAlive();
}
/** returns true if the supplied thread is the currently active joinThread */
/** returns true if join thread control is started and the supplied thread is the currently active joinThread */
public boolean joinThreadActive(Thread joinThread) {
return joinThread.equals(currentJoinThread.get());
return running.get() && joinThread.equals(currentJoinThread.get());
}
/** cleans any running joining thread and calls {@link #rejoin} */
@ -1302,7 +1302,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(clusterState, reason);
}
/** starts a new joining thread if there is no currently active one */
/** starts a new joining thread if there is no currently active one and join thread controlling is started */
public void startNewThreadIfNotRunning() {
assertClusterStateThread();
if (joinThreadActive()) {
@ -1315,15 +1315,18 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (joinThreadActive(currentThread)) {
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Throwable t) {
logger.error("unexpected error while joining cluster, trying again", t);
} catch (Exception e) {
logger.error("unexpected error while joining cluster, trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
@ -1348,6 +1351,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
public void stop() {
running.set(false);
Thread joinThread = currentJoinThread.getAndSet(null);
if (joinThread != null) {
try {
@ -1355,9 +1359,18 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} catch (Exception e) {
// ignore
}
try {
joinThread.join(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void start() {
running.set(true);
}
private void assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
}

View File

@ -185,51 +185,55 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
return;
}
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}
/**

View File

@ -201,44 +201,49 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override
public void ping(final PingListener listener, final TimeValue timeout) throws ElasticsearchException {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
listener.onPing(sendPingsHandler.pingCollection().toArray());
}
listener.onPing(sendPingsHandler.pingCollection().toArray());
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
} catch (Exception e) {
sendPingsHandler.close();
throw new ElasticsearchException("Ping execution failed", e);
}
}
class SendPingsHandler implements Closeable {