From ad5606d1f704378625cb00b8936acea26b232d72 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 6 Sep 2016 23:00:56 +0200 Subject: [PATCH 1/4] [TEST] don't use null script lang in WatcherUtilsTests same as elastic/x-pack@9e1e0988c114f6ef3c76b6f1aef1ab746fdfba34 but for testSerializeSearchRequest this time. Original commit: elastic/x-pack-elasticsearch@aa7e3814b687c66904a9353c41ff2d16812b0d27 --- .../elasticsearch/xpack/watcher/support/WatcherUtilsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java index f8498fe99d0..04611dcea70 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java @@ -108,7 +108,7 @@ public class WatcherUtilsTests extends ESTestCase { } String text = randomAsciiOfLengthBetween(1, 5); ScriptService.ScriptType scriptType = randomFrom(ScriptService.ScriptType.values()); - expectedTemplate = new Script(text, scriptType, randomBoolean() ? null : "mustache", params); + expectedTemplate = new Script(text, scriptType, "mustache", params); request = new WatcherSearchTemplateRequest(expectedIndices, expectedTypes, expectedSearchType, expectedIndicesOptions, expectedTemplate); } else { From bb033f1e0085454134d4da1cd9d9fd4238b2b529 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 7 Sep 2016 15:06:03 +0200 Subject: [PATCH 2/4] Watcher: Clear out WatchStore on watch index deletion (elastic/elasticsearch#2807) If someone deletes the watch index (i.e. by deleting all indices), the watcher in memory store still contains all the watches and tries to execute watches - which results in exceptions as the watch itself cannot be updated anymore. In order to minimize this problem (it cant be get rid of completely), we should act accordingly if the watch index goes missing (either deleted or closed) and clear out the memory representation of watches in the watchstore as well as trying to finish all the current executions. Closes elastic/elasticsearch#2794 Original commit: elastic/x-pack-elasticsearch@12d98cd566ee4d0d37fc5eecf0ac6a314722c8b9 --- .../watcher/WatcherLifeCycleService.java | 49 +++--- .../xpack/watcher/WatcherService.java | 11 +- .../watcher/execution/ExecutionService.java | 24 +-- .../xpack/watcher/watch/WatchStore.java | 4 + .../watcher/WatcherLifeCycleServiceTests.java | 48 +++++- .../xpack/watcher/watch/WatchStoreTests.java | 146 ++++++++---------- 6 files changed, 163 insertions(+), 119 deletions(-) diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index daa21ac5d89..b96fed9f746 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; @@ -22,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import java.util.concurrent.CountDownLatch; @@ -119,34 +121,33 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } if (!event.localNodeMaster()) { - if (watcherService.state() != WatcherState.STARTED) { - // to avoid unnecessary forking of threads... - return; + if (watcherService.state() == WatcherState.STARTED) { + // We're no longer the master so we need to stop the watcher. + // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, + // so we fork here so that we don't wait too long. Other events may need to be processed and + // other cluster state listeners may need to be executed as well for this event. + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> stop(false)); } - - // We're no longer the master so we need to stop the watcher. - // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, - // so we fork here so that we don't wait too long. Other events may need to be processed and - // other cluster state listeners may need to be executed as well for this event. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - stop(false); - } - }); } else { - if (watcherService.state() != WatcherState.STOPPED) { - // to avoid unnecessary forking of threads... - return; - } + if (watcherService.state() == WatcherState.STOPPED) { + final ClusterState state = event.state(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> start(state, false)); + } else { + boolean isWatchIndexDeleted = event.indicesDeleted().stream() + .filter(index -> WatchStore.INDEX.equals(index.getName())) + .findAny() + .isPresent(); - final ClusterState state = event.state(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - start(state, false); + boolean isWatchIndexOpenInPreviousClusterState = event.previousState().metaData().hasIndex(WatchStore.INDEX) && + event.previousState().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.OPEN; + boolean isWatchIndexClosedInCurrentClusterState = event.state().metaData().hasIndex(WatchStore.INDEX) && + event.state().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.CLOSE; + boolean hasWatcherIndexBeenClosed = isWatchIndexOpenInPreviousClusterState && isWatchIndexClosedInCurrentClusterState; + + if (isWatchIndexDeleted || hasWatcherIndexBeenClosed) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> watcherService.watchIndexDeletedOrClosed()); } - }); + } } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index fb66c4d5062..174eb77fa2f 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -16,9 +16,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; -import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchLockService; @@ -292,4 +292,13 @@ public class WatcherService extends AbstractComponent { innerMap.putAll(watchStore.usageStats()); return innerMap; } + + /** + * Something deleted or closed the {@link WatchStore#INDEX} and thus we need to do some cleanup to prevent further execution of watches + * as those watches cannot be updated anymore + */ + public void watchIndexDeletedOrClosed() { + watchStore.clearWatchesInMemory(); + executionService.clearExecutions(); + } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 35ad6a9f9da..5e4f97e4102 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -141,12 +141,7 @@ public class ExecutionService extends AbstractComponent { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: - Collections.sort(currentExecutions, new Comparator() { - @Override - public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime)); return currentExecutions; } @@ -163,12 +158,8 @@ public class ExecutionService extends AbstractComponent { queuedWatches.add(new QueuedWatch(executionTask.ctx)); } // Lets show the execution that pending the longest first: - Collections.sort(queuedWatches, new Comparator() { - @Override - public int compare(QueuedWatch e1, QueuedWatch e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + + Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime)); return queuedWatches; } @@ -438,6 +429,15 @@ public class ExecutionService extends AbstractComponent { return counters.toMap(); } + /** + * This clears out the current executions and sets new empty current executions + * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + */ + public void clearExecutions() { + currentExecutions.sealAndAwaitEmpty(maxStopTimeout); + currentExecutions = new CurrentExecutions(); + } + private static final class StartupExecutionContext extends TriggeredExecutionContext { public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index 4975baaa470..43305267267 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -333,6 +333,10 @@ public class WatchStore extends AbstractComponent { } } + public void clearWatchesInMemory() { + watches.clear(); + } + public class WatchPut { private final Watch previous; diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 193e927cf43..feff0267558 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,18 +5,22 @@ */ package org.elasticsearch.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -165,4 +169,46 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { verify(watcherService, never()).start(any(ClusterState.class)); verify(watcherService, never()).stop(); } + + public void testWatchIndexDeletion() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state that does not contain watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + + public void testWatchIndexClosing() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state with a closed watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX).state(IndexMetaData.State.CLOSE) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java index dc5c0838c91..44221316b23 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java @@ -54,6 +54,7 @@ import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -123,22 +124,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartRefreshFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 0); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -158,22 +144,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartSearchFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -197,22 +168,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartNoWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -234,22 +190,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -300,22 +241,7 @@ public class WatchStoreTests extends ESTestCase { public void testUsageStats() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -419,6 +345,65 @@ public class WatchStoreTests extends ESTestCase { assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0))); } + public void testThatCleaningWatchesWorks() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + createWatchIndexMetaData(csBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap()); + hit.sourceRef(source); + + SearchResponse searchResponse = mockSearchResponse(1, 1, 1, hit); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse); + + SearchResponse finalSearchResponse = mockSearchResponse(1, 1, 0); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(finalSearchResponse); + + Watch watch = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(watch.status()).thenReturn(status); + when(parser.parse("_id1", true, source)).thenReturn(watch); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(1)); + + watchStore.clearWatchesInMemory(); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(0)); + assertThat(watchStore.activeWatches(), hasSize(0)); + } + + /* + * Creates the standard cluster state metadata for the watches index + * with shards/replicas being marked as started + */ + private void createWatchIndexMetaData(ClusterState.Builder builder) { + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + builder.metaData(metaDateBuilder); + builder.routingTable(routingTableBuilder.build()); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); @@ -432,7 +417,6 @@ public class WatchStoreTests extends ESTestCase { when(searchResponse.getTotalShards()).thenReturn(total); when(searchResponse.getSuccessfulShards()).thenReturn(successful); when(searchResponse.getHits()).thenReturn(internalSearchHits); - when(searchResponse.getHits()).thenReturn(internalSearchHits); return searchResponse; } From a296e31a7c5274ccf1bcec108e83b8ae5faf1c0f Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 7 Sep 2016 15:55:33 +0200 Subject: [PATCH 3/4] Watcher: Ensure triggered watch is deleted on thread pool rejection (elastic/elasticsearch#3049) This fixes a bug I found with a customer when he updated from 1.x to 2.x. Due to an BWC incompatible change in the watch history mapping and a thread pool rejection during execution a watch was not removed from the triggered watches and tried to be executed again. While trying to fix it it turned out that the execution of the failure test case was still done in the transport thread and thus required some offloading to another thread pool. Original commit: elastic/x-pack-elasticsearch@df04ce31f23aa6ba1800e4f25ac3768e829e884f --- .../watcher/execution/ExecutionService.java | 43 ++++++++++++----- .../execution/TriggeredWatchStore.java | 2 +- .../execution/ExecutionServiceTests.java | 46 +++++++++++++++---- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 5e4f97e4102..4091f291652 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.common.stats.Counters; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.Watcher; @@ -60,13 +61,14 @@ public class ExecutionService extends AbstractComponent { private final Clock clock; private final TimeValue defaultThrottlePeriod; private final TimeValue maxStopTimeout; + private final ThreadPool threadPool; private volatile CurrentExecutions currentExecutions = null; private final AtomicBoolean started = new AtomicBoolean(false); @Inject public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, - WatchStore watchStore, WatchLockService watchLockService, Clock clock) { + WatchStore watchStore, WatchLockService watchLockService, Clock clock, ThreadPool threadPool) { super(settings); this.historyStore = historyStore; this.triggeredWatchStore = triggeredWatchStore; @@ -76,6 +78,7 @@ public class ExecutionService extends AbstractComponent { this.clock = clock; this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings); this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings); + this.threadPool = threadPool; } public void start(ClusterState state) throws Exception { @@ -323,20 +326,36 @@ public class ExecutionService extends AbstractComponent { thread pool that executes the watches is completely busy, we don't lose the fact that the watch was triggered (it'll have its history record) */ - - private void executeAsync(WatchExecutionContext ctx, TriggeredWatch triggeredWatch) throws Exception { + private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) { try { executor.execute(new WatchExecutionTask(ctx)); } catch (EsRejectedExecutionException e) { - String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; - logger.debug("{}", message); - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); - } - triggeredWatchStore.delete(triggeredWatch.id()); + // we are still in the transport thread here most likely, so we cannot run heavy operations + // this means some offloading needs to be done for indexing into the history and delete the triggered watches entry + threadPool.generic().execute(() -> { + String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; + logger.debug("{}", message); + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); + try { + if (ctx.overrideRecordOnConflict()) { + historyStore.forcePut(record); + } else { + historyStore.put(record); + } + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", + triggeredWatch.id()), exc); + } + + try { + triggeredWatchStore.delete(triggeredWatch.id()); + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " + + "rejection", triggeredWatch.id()), exc); + } + }); } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index c1b64a9a630..b964d0b74b2 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -222,7 +222,7 @@ public class TriggeredWatchStore extends AbstractComponent { } } - public void delete(Wid wid) throws Exception { + public void delete(Wid wid) { ensureStarted(); accessLock.lock(); try { diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 97846e9ba02..4ff96fe3597 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.ClockMock; import org.elasticsearch.xpack.watcher.actions.Action; @@ -41,7 +44,9 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; @@ -51,7 +56,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.joda.time.DateTime.now; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -68,15 +75,16 @@ public class ExecutionServiceTests extends ESTestCase { private Input.Result inputResult; private WatchStore watchStore; + private TriggeredWatchStore triggeredWatchStore; + private WatchExecutor executor; private HistoryStore historyStore; private WatchLockService watchLockService; private ExecutionService executionService; private Clock clock; + private ThreadPool threadPool; @Before public void init() throws Exception { - TriggeredWatchStore triggeredWatchStore; - payload = mock(Payload.class); input = mock(ExecutableInput.class); inputResult = mock(Input.Result.class); @@ -88,13 +96,14 @@ public class ExecutionServiceTests extends ESTestCase { triggeredWatchStore = mock(TriggeredWatchStore.class); historyStore = mock(HistoryStore.class); - WatchExecutor executor = mock(WatchExecutor.class); + executor = mock(WatchExecutor.class); when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1)); watchLockService = mock(WatchLockService.class); clock = new ClockMock(); + threadPool = mock(ThreadPool.class); executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, - watchLockService, clock); + watchLockService, clock, threadPool); ClusterState clusterState = mock(ClusterState.class); when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>()); @@ -483,7 +492,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInner() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -560,7 +569,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInnerThrottled() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -613,7 +622,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInnerConditionNotMet() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -774,6 +783,28 @@ public class ExecutionServiceTests extends ESTestCase { verify(action, never()).execute("_action", context, payload); } + public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("foo"); + when(watch.nonce()).thenReturn(1L); + when(watchStore.get(any())).thenReturn(watch); + + // execute needs to fail as well as storing the history + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); + doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); + + Wid wid = new Wid(watch.id(), watch.nonce(), now()); + + Executor currentThreadExecutor = command -> command.run(); + when(threadPool.generic()).thenReturn(currentThreadExecutor); + + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now())); + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); + + verify(triggeredWatchStore, times(1)).delete(wid); + verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); + } + private Tuple whenCondition(final WatchExecutionContext context) { Condition.Result conditionResult = mock(Condition.Result.class); when(conditionResult.met()).thenReturn(true); @@ -791,5 +822,4 @@ public class ExecutionServiceTests extends ESTestCase { return new Tuple<>(transform, transformResult); } - } From 0f571685b9f0560b8ac579b18348dfdec4a361d5 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 7 Sep 2016 16:29:15 +0200 Subject: [PATCH 4/4] Watcher: Fix proxy xcontent serialization (elastic/elasticsearch#3364) Calling to xcontent in a HttpRequest, with a proxy enabled, lead to serialization exceptions, resulting in failing to write the watch history. Closes elastic/elasticsearch#3334 Original commit: elastic/x-pack-elasticsearch@a04dff686c86dd2222f2a34595f3cf3ccd40c82f --- .../xpack/common/http/HttpRequest.java | 17 ++---- .../xpack/common/http/HttpRequestTests.java | 54 +++++++++++++++++++ 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java index 3545a40fe38..ceb46a813f8 100644 --- a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java +++ b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java @@ -30,6 +30,7 @@ import java.net.URLDecoder; import java.net.URLEncoder; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -166,7 +167,7 @@ public class HttpRequest implements ToXContent { HttpRequest.Field.READ_TIMEOUT_HUMAN.getPreferredName(), readTimeout); } if (proxy != null) { - builder.field(Field.PROXY.getPreferredName(), proxy); + proxy.toXContent(builder, params); } return builder.endObject(); } @@ -195,19 +196,7 @@ public class HttpRequest implements ToXContent { @Override public int hashCode() { - int result = host.hashCode(); - result = 31 * result + port; - result = 31 * result + scheme.hashCode(); - result = 31 * result + method.hashCode(); - result = 31 * result + (path != null ? path.hashCode() : 0); - result = 31 * result + params.hashCode(); - result = 31 * result + headers.hashCode(); - result = 31 * result + (auth != null ? auth.hashCode() : 0); - result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0); - result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0); - result = 31 * result + (body != null ? body.hashCode() : 0); - result = 31 * result + (proxy != null ? proxy.hashCode() : 0); - return result; + return Objects.hash(host, port, scheme, method, path, params, headers, auth, connectionTimeout, readTimeout, body, proxy); } @Override diff --git a/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java b/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java index 9b2e7802a4a..3be8dd14c1e 100644 --- a/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java +++ b/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.common.http; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -14,6 +16,10 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.common.http.HttpRequest; import org.elasticsearch.xpack.common.http.Scheme; import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry; +import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth; + +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; @@ -65,6 +71,54 @@ public class HttpRequestTests extends ESTestCase { } } + public void testXContentSerialization() throws Exception { + final HttpRequest.Builder builder; + if (randomBoolean()) { + builder = HttpRequest.builder(); + builder.fromUrl("http://localhost:9200/generic/createevent"); + } else { + builder = HttpRequest.builder("localhost", 9200); + if (randomBoolean()) { + builder.scheme(randomFrom(Scheme.values())); + if (usually()) { + builder.path(randomAsciiOfLength(50)); + } + } + } + if (usually()) { + builder.method(randomFrom(HttpMethod.values())); + } + if (randomBoolean()) { + builder.setParam(randomAsciiOfLength(10), randomAsciiOfLength(10)); + if (randomBoolean()) { + builder.setParam(randomAsciiOfLength(10), randomAsciiOfLength(10)); + } + } + if (randomBoolean()) { + builder.setHeader(randomAsciiOfLength(10), randomAsciiOfLength(10)); + if (randomBoolean()) { + builder.setHeader(randomAsciiOfLength(10), randomAsciiOfLength(10)); + } + } + if (randomBoolean()) { + builder.auth(new BasicAuth(randomAsciiOfLength(10), randomAsciiOfLength(20).toCharArray())); + } + if (randomBoolean()) { + builder.body(randomAsciiOfLength(200)); + } + if (randomBoolean()) { + builder.connectionTimeout(TimeValue.parseTimeValue(randomTimeValue(), "my.setting")); + } + if (randomBoolean()) { + builder.readTimeout(TimeValue.parseTimeValue(randomTimeValue(), "my.setting")); + } + if (randomBoolean()) { + builder.proxy(new HttpProxy(randomAsciiOfLength(10), randomIntBetween(1024, 65000))); + } + + builder.build().toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS); + } + private void assertThatManualBuilderEqualsParsingFromUrl(String url, HttpRequest.Builder builder) throws Exception { XContentBuilder urlContentBuilder = jsonBuilder().startObject().field("url", url).endObject(); XContentParser urlContentParser = JsonXContent.jsonXContent.createParser(urlContentBuilder.bytes());