Watcher: Clear out WatchStore on watch index deletion (elastic/elasticsearch#2807)

If someone deletes the watch index (i.e. by deleting all indices), the watcher
in memory store still contains all the watches and tries to execute watches -
which results in exceptions as the watch itself cannot be updated anymore.

In order to minimize this problem (it cant be get rid of completely), we should
act accordingly if the watch index goes missing (either deleted or closed) and
clear out the memory representation of watches in the watchstore as well as trying
to finish all the current executions.

Closes elastic/elasticsearch#2794

Original commit: elastic/x-pack-elasticsearch@12d98cd566
This commit is contained in:
Alexander Reelsen 2016-09-07 15:06:03 +02:00 committed by GitHub
parent f53fd02da0
commit bb033f1e00
6 changed files with 163 additions and 119 deletions

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
@ -22,6 +23,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import java.util.concurrent.CountDownLatch;
@ -119,34 +121,33 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
if (!event.localNodeMaster()) {
if (watcherService.state() != WatcherState.STARTED) {
// to avoid unnecessary forking of threads...
return;
if (watcherService.state() == WatcherState.STARTED) {
// We're no longer the master so we need to stop the watcher.
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> stop(false));
}
// We're no longer the master so we need to stop the watcher.
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
// so we fork here so that we don't wait too long. Other events may need to be processed and
// other cluster state listeners may need to be executed as well for this event.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
stop(false);
}
});
} else {
if (watcherService.state() != WatcherState.STOPPED) {
// to avoid unnecessary forking of threads...
return;
}
if (watcherService.state() == WatcherState.STOPPED) {
final ClusterState state = event.state();
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> start(state, false));
} else {
boolean isWatchIndexDeleted = event.indicesDeleted().stream()
.filter(index -> WatchStore.INDEX.equals(index.getName()))
.findAny()
.isPresent();
final ClusterState state = event.state();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
start(state, false);
boolean isWatchIndexOpenInPreviousClusterState = event.previousState().metaData().hasIndex(WatchStore.INDEX) &&
event.previousState().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.OPEN;
boolean isWatchIndexClosedInCurrentClusterState = event.state().metaData().hasIndex(WatchStore.INDEX) &&
event.state().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.CLOSE;
boolean hasWatcherIndexBeenClosed = isWatchIndexOpenInPreviousClusterState && isWatchIndexClosedInCurrentClusterState;
if (isWatchIndexDeleted || hasWatcherIndexBeenClosed) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> watcherService.watchIndexDeletedOrClosed());
}
});
}
}
}

View File

@ -16,9 +16,9 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
@ -292,4 +292,13 @@ public class WatcherService extends AbstractComponent {
innerMap.putAll(watchStore.usageStats());
return innerMap;
}
/**
* Something deleted or closed the {@link WatchStore#INDEX} and thus we need to do some cleanup to prevent further execution of watches
* as those watches cannot be updated anymore
*/
public void watchIndexDeletedOrClosed() {
watchStore.clearWatchesInMemory();
executionService.clearExecutions();
}
}

View File

@ -141,12 +141,7 @@ public class ExecutionService extends AbstractComponent {
currentExecutions.add(watchExecution.createSnapshot());
}
// Lets show the longest running watch first:
Collections.sort(currentExecutions, new Comparator<WatchExecutionSnapshot>() {
@Override
public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) {
return e1.executionTime().compareTo(e2.executionTime());
}
});
Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime));
return currentExecutions;
}
@ -163,12 +158,8 @@ public class ExecutionService extends AbstractComponent {
queuedWatches.add(new QueuedWatch(executionTask.ctx));
}
// Lets show the execution that pending the longest first:
Collections.sort(queuedWatches, new Comparator<QueuedWatch>() {
@Override
public int compare(QueuedWatch e1, QueuedWatch e2) {
return e1.executionTime().compareTo(e2.executionTime());
}
});
Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime));
return queuedWatches;
}
@ -438,6 +429,15 @@ public class ExecutionService extends AbstractComponent {
return counters.toMap();
}
/**
* This clears out the current executions and sets new empty current executions
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
*/
public void clearExecutions() {
currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
currentExecutions = new CurrentExecutions();
}
private static final class StartupExecutionContext extends TriggeredExecutionContext {
public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {

View File

@ -333,6 +333,10 @@ public class WatchStore extends AbstractComponent {
}
}
public void clearWatchesInMemory() {
watches.clear();
}
public class WatchPut {
private final Watch previous;

View File

@ -5,18 +5,22 @@
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -165,4 +169,46 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
}
public void testWatchIndexDeletion() throws Exception {
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build();
// old cluster state that contains watcher index
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX)
.settings(indexSettings).numberOfReplicas(0).numberOfShards(1)))
.nodes(discoveryNodes).build();
// new cluster state that does not contain watcher index
ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
verify(watcherService, times(1)).watchIndexDeletedOrClosed();
}
public void testWatchIndexClosing() throws Exception {
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build();
// old cluster state that contains watcher index
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX)
.settings(indexSettings).numberOfReplicas(0).numberOfShards(1)))
.nodes(discoveryNodes).build();
// new cluster state with a closed watcher index
ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX).state(IndexMetaData.State.CLOSE)
.settings(indexSettings).numberOfReplicas(0).numberOfShards(1)))
.nodes(discoveryNodes).build();
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
verify(watcherService, never()).stop();
verify(watcherService, times(1)).watchIndexDeletedOrClosed();
}
}

View File

@ -54,6 +54,7 @@ import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo;
@ -123,22 +124,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartRefreshFailed() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 0);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -158,22 +144,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartSearchFailed() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -197,22 +168,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartNoWatchStored() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -234,22 +190,7 @@ public class WatchStoreTests extends ESTestCase {
public void testStartWatchStored() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -300,22 +241,7 @@ public class WatchStoreTests extends ESTestCase {
public void testUsageStats() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder.build());
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
@ -419,6 +345,65 @@ public class WatchStoreTests extends ESTestCase {
assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0)));
}
public void testThatCleaningWatchesWorks() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
createWatchIndexMetaData(csBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
BytesReference source = new BytesArray("{}");
InternalSearchHit hit = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap());
hit.sourceRef(source);
SearchResponse searchResponse = mockSearchResponse(1, 1, 1, hit);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse);
SearchResponse finalSearchResponse = mockSearchResponse(1, 1, 0);
when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(finalSearchResponse);
Watch watch = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
when(watch.status()).thenReturn(status);
when(parser.parse("_id1", true, source)).thenReturn(watch);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches(), hasSize(1));
watchStore.clearWatchesInMemory();
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches(), hasSize(0));
assertThat(watchStore.activeWatches(), hasSize(0));
}
/*
* Creates the standard cluster state metadata for the watches index
* with shards/replicas being marked as started
*/
private void createWatchIndexMetaData(ClusterState.Builder builder) {
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
builder.metaData(metaDateBuilder);
builder.routingTable(routingTableBuilder.build());
}
private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total);
@ -432,7 +417,6 @@ public class WatchStoreTests extends ESTestCase {
when(searchResponse.getTotalShards()).thenReturn(total);
when(searchResponse.getSuccessfulShards()).thenReturn(successful);
when(searchResponse.getHits()).thenReturn(internalSearchHits);
when(searchResponse.getHits()).thenReturn(internalSearchHits);
return searchResponse;
}