lifecycle: Only fork a thread if we start and stop from cluster state update if the watcher state is expected

Also improved some log statements.

Original commit: elastic/x-pack-elasticsearch@7c72550c63
This commit is contained in:
Martijn van Groningen 2015-06-25 15:16:20 +02:00
parent fcad80b973
commit 521b6e8cf3
4 changed files with 26 additions and 8 deletions

View File

@ -55,6 +55,12 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
private synchronized void stop(boolean manual) {
WatcherState watcherState = watcherService.state();
if (watcherState != WatcherState.STARTED) {
logger.debug("not stopping watcher. watcher can only stop if its current state is [{}], but its current state now is [{}]", WatcherState.STARTED, watcherState);
return;
}
manuallyStopped = manual;
watcherService.stop();
}
@ -62,7 +68,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private synchronized void start(ClusterState state, boolean manual) {
WatcherState watcherState = watcherService.state();
if (watcherState != WatcherState.STOPPED) {
logger.debug("not starting watcher. watcher can only start if its current state is [{}]", WatcherState.STOPPED);
logger.debug("not starting watcher. watcher can only start if its current state is [{}], but its current state now is [{}]", WatcherState.STOPPED, watcherState);
return;
}
@ -78,7 +84,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
return;
}
logger.trace("starting... (based on cluster state version [{}])", state.getVersion());
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
try {
watcherService.start(state);
} catch (Exception e) {
@ -89,6 +95,11 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
if (watcherService.state() != WatcherState.STARTED) {
// to avoid unnecessary forking of threads...
return;
}
// We're no longer the master so we need to stop the watcher.
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
@ -105,6 +116,10 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
return;
}
if (watcherService.state() != WatcherState.STOPPED) {
// to avoid unnecessary forking of threads...
return;
}
final ClusterState state = event.state();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {

View File

@ -260,7 +260,7 @@ public class TriggeredWatchStore extends AbstractComponent {
public Collection<TriggeredWatch> loadTriggeredWatches(ClusterState state) {
IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME);
if (indexMetaData == null) {
logger.debug("no .watch_history indices found. skipping loading awaiting triggered watches");
logger.debug("no .triggered_watches indices found. skipping loading awaiting triggered watches");
return Collections.emptySet();
}

View File

@ -97,14 +97,16 @@ public class WatchStore extends AbstractComponent {
public boolean validate(ClusterState state) {
IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX);
if (watchesIndexMetaData == null) {
logger.debug("watches index [{}] doesn't exist, so we can start", INDEX);
logger.debug("index [{}] doesn't exist, so we can start", INDEX);
return true;
}
if (state.routingTable().index(INDEX).allPrimaryShardsActive()) {
logger.debug("watches index [{}] exists and all primary shards are started, so we can start", INDEX);
logger.debug("index [{}] exists and all primary shards are started, so we can start", INDEX);
return true;
} else {
logger.debug("not all primary shards active for index [{}], so we cannot start", INDEX);
return false;
}
return false;
}
public boolean started() {

View File

@ -83,14 +83,15 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.start();
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, never()).stop();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.stop();
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop();
@ -110,7 +111,7 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id2");
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(2)).start(any(ClusterState.class));
verify(watcherService, times(2)).stop();