lifecycle: Verify whether Watcher was stopped manually under a lock

Before if timing permitted it a start and stop could be executed very close to each other time wise. This could make the manual stopped check seem to be valid, because the stop hadn't yet been performed.

Original commit: elastic/x-pack-elasticsearch@c1865c1069
This commit is contained in:
Martijn van Groningen 2015-06-18 13:45:55 +02:00
parent 8af461daaf
commit c4cb85104e
3 changed files with 98 additions and 48 deletions

View File

@ -24,7 +24,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private final WatcherService watcherService; private final WatcherService watcherService;
private final ClusterService clusterService; private final ClusterService clusterService;
// Maybe this should be a setting in the cluster settings? // TODO: If Watcher was stopped via api and the master is changed then Watcher will start regardless of the previous
// stop command, so at some point this needs to be a cluster setting
private volatile boolean manuallyStopped; private volatile boolean manuallyStopped;
@Inject @Inject
@ -46,55 +47,42 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
} }
public void start() { public void start() {
start(clusterService.state()); start(clusterService.state(), true);
} }
public void stop() { public void stop() {
stop(true); stop(true);
} }
private synchronized void start(ClusterState state) {
watcherService.start(state);
}
private synchronized void stop(boolean manual) { private synchronized void stop(boolean manual) {
manuallyStopped = manual; manuallyStopped = manual;
watcherService.stop(); watcherService.stop();
} }
@Override private synchronized void start(ClusterState state, boolean manual) {
public void clusterChanged(ClusterChangedEvent event) { WatcherState watcherState = watcherService.state();
if (!event.localNodeMaster()) { if (watcherState != WatcherState.STOPPED) {
// We're no longer the master so we need to stop the watcher. logger.debug("Not starting, because state [{}] while [{}] is expected", watcherState, WatcherState.STOPPED);
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, return;
// so we fork here so that we don't wait too long. Other events may need to be processed and }
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { // If we start from a cluster state update we need to check if previously we stopped manually
@Override // otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run
public void run() { if (!manual && manuallyStopped) {
stop(false); logger.debug("Not starting, because watcher has been stopped manually, so watcher can't be started automatically");
}
});
} else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .watchs and
// a .watch_history index, but they may not have been restored from the cluster state on disk
return; return;
} }
final ClusterState state = event.state();
if (!watcherService.validate(state)) { if (!watcherService.validate(state)) {
logger.debug("Not starting, because the cluster state isn't valid");
return; return;
} }
if (watcherService.state() == WatcherState.STOPPED && !manuallyStopped) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
int attempts = 0; int attempts = 0;
while(true) { while(true) {
try { try {
start(state); logger.debug("Start attempt [{}], based on cluster state version [{}]", attempts, state.getVersion());
watcherService.start(state);
return; return;
} catch (Exception e) { } catch (Exception e) {
if (++attempts < 3) { if (++attempts < 3) {
@ -115,8 +103,34 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
} }
} }
} }
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
// We're no longer the master so we need to stop the watcher.
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
stop(false);
}
});
} else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
return;
}
final ClusterState state = event.state();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
start(state, false);
}
}); });
} }
} }
}
} }

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.*;
*/ */
public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase { public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
private ClusterService clusterService;
private WatcherService watcherService; private WatcherService watcherService;
private WatcherLifeCycleService lifeCycleService; private WatcherLifeCycleService lifeCycleService;
@ -34,7 +35,7 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService()); when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
watcherService = mock(WatcherService.class); watcherService = mock(WatcherService.class);
ClusterService clusterService = mock(ClusterService.class); clusterService = mock(ClusterService.class);
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, threadPool, watcherService); lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, threadPool, watcherService);
} }
@ -78,6 +79,14 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
@Test @Test
public void testManualStartStop() { public void testManualStartStop() {
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.start(); lifeCycleService.start();
verify(watcherService, times(1)).start(any(ClusterState.class)); verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, never()).stop(); verify(watcherService, never()).stop();
@ -87,9 +96,6 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
verify(watcherService, times(1)).stop(); verify(watcherService, times(1)).stop();
// Starting via cluster state update, we shouldn't start because we have been stopped manually. // Starting via cluster state update, we shouldn't start because we have been stopped manually.
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watcherService.state()).thenReturn(WatcherState.STOPPED); when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(1)).start(any(ClusterState.class)); verify(watcherService, times(1)).start(any(ClusterState.class));
@ -120,4 +126,34 @@ public class WatcherLifeCycleServiceTests extends ElasticsearchTestCase {
verify(watcherService, times(2)).stop(); verify(watcherService, times(2)).stop();
} }
@Test
public void testManualStartStop_clusterStateNotValid() {
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.validate(clusterState)).thenReturn(false);
lifeCycleService.start();
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
}
@Test
public void testManualStartStop_watcherNotStopped() {
DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(clusterService.state()).thenReturn(clusterState);
when(watcherService.state()).thenReturn(WatcherState.STOPPING);
lifeCycleService.start();
verify(watcherService, never()).validate(any(ClusterState.class));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
}
} }

View File

@ -20,7 +20,7 @@ import static org.hamcrest.core.Is.is;
/** /**
*/ */
@TestLogging("cluster:DEBUG,action.admin.cluster.settings:DEBUG") @TestLogging("cluster:DEBUG,action.admin.cluster.settings:DEBUG,watcher:DEBUG")
@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) @ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests {