diff --git a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java index 3545a40fe38..ceb46a813f8 100644 --- a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java +++ b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/common/http/HttpRequest.java @@ -30,6 +30,7 @@ import java.net.URLDecoder; import java.net.URLEncoder; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -166,7 +167,7 @@ public class HttpRequest implements ToXContent { HttpRequest.Field.READ_TIMEOUT_HUMAN.getPreferredName(), readTimeout); } if (proxy != null) { - builder.field(Field.PROXY.getPreferredName(), proxy); + proxy.toXContent(builder, params); } return builder.endObject(); } @@ -195,19 +196,7 @@ public class HttpRequest implements ToXContent { @Override public int hashCode() { - int result = host.hashCode(); - result = 31 * result + port; - result = 31 * result + scheme.hashCode(); - result = 31 * result + method.hashCode(); - result = 31 * result + (path != null ? path.hashCode() : 0); - result = 31 * result + params.hashCode(); - result = 31 * result + headers.hashCode(); - result = 31 * result + (auth != null ? auth.hashCode() : 0); - result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0); - result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0); - result = 31 * result + (body != null ? body.hashCode() : 0); - result = 31 * result + (proxy != null ? proxy.hashCode() : 0); - return result; + return Objects.hash(host, port, scheme, method, path, params, headers, auth, connectionTimeout, readTimeout, body, proxy); } @Override diff --git a/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java b/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java index 9b2e7802a4a..3be8dd14c1e 100644 --- a/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java +++ b/elasticsearch/x-pack/src/test/java/org/elasticsearch/xpack/common/http/HttpRequestTests.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.common.http; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -14,6 +16,10 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.common.http.HttpRequest; import org.elasticsearch.xpack.common.http.Scheme; import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry; +import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth; + +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; @@ -65,6 +71,54 @@ public class HttpRequestTests extends ESTestCase { } } + public void testXContentSerialization() throws Exception { + final HttpRequest.Builder builder; + if (randomBoolean()) { + builder = HttpRequest.builder(); + builder.fromUrl("http://localhost:9200/generic/createevent"); + } else { + builder = HttpRequest.builder("localhost", 9200); + if (randomBoolean()) { + builder.scheme(randomFrom(Scheme.values())); + if (usually()) { + builder.path(randomAsciiOfLength(50)); + } + } + } + if (usually()) { + builder.method(randomFrom(HttpMethod.values())); + } + if (randomBoolean()) { + builder.setParam(randomAsciiOfLength(10), randomAsciiOfLength(10)); + if (randomBoolean()) { + builder.setParam(randomAsciiOfLength(10), randomAsciiOfLength(10)); + } + } + if (randomBoolean()) { + builder.setHeader(randomAsciiOfLength(10), randomAsciiOfLength(10)); + if (randomBoolean()) { + builder.setHeader(randomAsciiOfLength(10), randomAsciiOfLength(10)); + } + } + if (randomBoolean()) { + builder.auth(new BasicAuth(randomAsciiOfLength(10), randomAsciiOfLength(20).toCharArray())); + } + if (randomBoolean()) { + builder.body(randomAsciiOfLength(200)); + } + if (randomBoolean()) { + builder.connectionTimeout(TimeValue.parseTimeValue(randomTimeValue(), "my.setting")); + } + if (randomBoolean()) { + builder.readTimeout(TimeValue.parseTimeValue(randomTimeValue(), "my.setting")); + } + if (randomBoolean()) { + builder.proxy(new HttpProxy(randomAsciiOfLength(10), randomIntBetween(1024, 65000))); + } + + builder.build().toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS); + } + private void assertThatManualBuilderEqualsParsingFromUrl(String url, HttpRequest.Builder builder) throws Exception { XContentBuilder urlContentBuilder = jsonBuilder().startObject().field("url", url).endObject(); XContentParser urlContentParser = JsonXContent.jsonXContent.createParser(urlContentBuilder.bytes()); diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index daa21ac5d89..b96fed9f746 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ack.AckedRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; @@ -22,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import java.util.concurrent.CountDownLatch; @@ -119,34 +121,33 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } if (!event.localNodeMaster()) { - if (watcherService.state() != WatcherState.STARTED) { - // to avoid unnecessary forking of threads... - return; + if (watcherService.state() == WatcherState.STARTED) { + // We're no longer the master so we need to stop the watcher. + // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, + // so we fork here so that we don't wait too long. Other events may need to be processed and + // other cluster state listeners may need to be executed as well for this event. + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> stop(false)); } - - // We're no longer the master so we need to stop the watcher. - // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, - // so we fork here so that we don't wait too long. Other events may need to be processed and - // other cluster state listeners may need to be executed as well for this event. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - stop(false); - } - }); } else { - if (watcherService.state() != WatcherState.STOPPED) { - // to avoid unnecessary forking of threads... - return; - } + if (watcherService.state() == WatcherState.STOPPED) { + final ClusterState state = event.state(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> start(state, false)); + } else { + boolean isWatchIndexDeleted = event.indicesDeleted().stream() + .filter(index -> WatchStore.INDEX.equals(index.getName())) + .findAny() + .isPresent(); - final ClusterState state = event.state(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - start(state, false); + boolean isWatchIndexOpenInPreviousClusterState = event.previousState().metaData().hasIndex(WatchStore.INDEX) && + event.previousState().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.OPEN; + boolean isWatchIndexClosedInCurrentClusterState = event.state().metaData().hasIndex(WatchStore.INDEX) && + event.state().metaData().index(WatchStore.INDEX).getState() == IndexMetaData.State.CLOSE; + boolean hasWatcherIndexBeenClosed = isWatchIndexOpenInPreviousClusterState && isWatchIndexClosedInCurrentClusterState; + + if (isWatchIndexDeleted || hasWatcherIndexBeenClosed) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> watcherService.watchIndexDeletedOrClosed()); } - }); + } } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index fb66c4d5062..174eb77fa2f 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -16,9 +16,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; -import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchLockService; @@ -292,4 +292,13 @@ public class WatcherService extends AbstractComponent { innerMap.putAll(watchStore.usageStats()); return innerMap; } + + /** + * Something deleted or closed the {@link WatchStore#INDEX} and thus we need to do some cleanup to prevent further execution of watches + * as those watches cannot be updated anymore + */ + public void watchIndexDeletedOrClosed() { + watchStore.clearWatchesInMemory(); + executionService.clearExecutions(); + } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 35ad6a9f9da..4091f291652 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.common.stats.Counters; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.watcher.Watcher; @@ -60,13 +61,14 @@ public class ExecutionService extends AbstractComponent { private final Clock clock; private final TimeValue defaultThrottlePeriod; private final TimeValue maxStopTimeout; + private final ThreadPool threadPool; private volatile CurrentExecutions currentExecutions = null; private final AtomicBoolean started = new AtomicBoolean(false); @Inject public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, - WatchStore watchStore, WatchLockService watchLockService, Clock clock) { + WatchStore watchStore, WatchLockService watchLockService, Clock clock, ThreadPool threadPool) { super(settings); this.historyStore = historyStore; this.triggeredWatchStore = triggeredWatchStore; @@ -76,6 +78,7 @@ public class ExecutionService extends AbstractComponent { this.clock = clock; this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings); this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings); + this.threadPool = threadPool; } public void start(ClusterState state) throws Exception { @@ -141,12 +144,7 @@ public class ExecutionService extends AbstractComponent { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: - Collections.sort(currentExecutions, new Comparator() { - @Override - public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime)); return currentExecutions; } @@ -163,12 +161,8 @@ public class ExecutionService extends AbstractComponent { queuedWatches.add(new QueuedWatch(executionTask.ctx)); } // Lets show the execution that pending the longest first: - Collections.sort(queuedWatches, new Comparator() { - @Override - public int compare(QueuedWatch e1, QueuedWatch e2) { - return e1.executionTime().compareTo(e2.executionTime()); - } - }); + + Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime)); return queuedWatches; } @@ -332,20 +326,36 @@ public class ExecutionService extends AbstractComponent { thread pool that executes the watches is completely busy, we don't lose the fact that the watch was triggered (it'll have its history record) */ - - private void executeAsync(WatchExecutionContext ctx, TriggeredWatch triggeredWatch) throws Exception { + private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) { try { executor.execute(new WatchExecutionTask(ctx)); } catch (EsRejectedExecutionException e) { - String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; - logger.debug("{}", message); - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); - } - triggeredWatchStore.delete(triggeredWatch.id()); + // we are still in the transport thread here most likely, so we cannot run heavy operations + // this means some offloading needs to be done for indexing into the history and delete the triggered watches entry + threadPool.generic().execute(() -> { + String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; + logger.debug("{}", message); + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message); + try { + if (ctx.overrideRecordOnConflict()) { + historyStore.forcePut(record); + } else { + historyStore.put(record); + } + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", + triggeredWatch.id()), exc); + } + + try { + triggeredWatchStore.delete(triggeredWatch.id()); + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " + + "rejection", triggeredWatch.id()), exc); + } + }); } } @@ -438,6 +448,15 @@ public class ExecutionService extends AbstractComponent { return counters.toMap(); } + /** + * This clears out the current executions and sets new empty current executions + * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea + */ + public void clearExecutions() { + currentExecutions.sealAndAwaitEmpty(maxStopTimeout); + currentExecutions = new CurrentExecutions(); + } + private static final class StartupExecutionContext extends TriggeredExecutionContext { public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index c1b64a9a630..b964d0b74b2 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -222,7 +222,7 @@ public class TriggeredWatchStore extends AbstractComponent { } } - public void delete(Wid wid) throws Exception { + public void delete(Wid wid) { ensureStarted(); accessLock.lock(); try { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index 4975baaa470..43305267267 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -333,6 +333,10 @@ public class WatchStore extends AbstractComponent { } } + public void clearWatchesInMemory() { + watches.clear(); + } + public class WatchPut { private final Watch previous; diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 193e927cf43..feff0267558 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -5,18 +5,22 @@ */ package org.elasticsearch.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -165,4 +169,46 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { verify(watcherService, never()).start(any(ClusterState.class)); verify(watcherService, never()).stop(); } + + public void testWatchIndexDeletion() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state that does not contain watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + + public void testWatchIndexClosing() throws Exception { + DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1").build(); + // old cluster state that contains watcher index + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + ClusterState oldClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + + // new cluster state with a closed watcher index + ClusterState newClusterState = ClusterState.builder(new ClusterName("my-cluster")) + .metaData(new MetaData.Builder().put(IndexMetaData.builder(WatchStore.INDEX).state(IndexMetaData.State.CLOSE) + .settings(indexSettings).numberOfReplicas(0).numberOfShards(1))) + .nodes(discoveryNodes).build(); + when(watcherService.state()).thenReturn(WatcherState.STARTED); + + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", newClusterState, oldClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + verify(watcherService, never()).stop(); + verify(watcherService, times(1)).watchIndexDeletedOrClosed(); + } + } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 97846e9ba02..4ff96fe3597 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.ClockMock; import org.elasticsearch.xpack.watcher.actions.Action; @@ -41,7 +44,9 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; @@ -51,7 +56,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.joda.time.DateTime.now; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -68,15 +75,16 @@ public class ExecutionServiceTests extends ESTestCase { private Input.Result inputResult; private WatchStore watchStore; + private TriggeredWatchStore triggeredWatchStore; + private WatchExecutor executor; private HistoryStore historyStore; private WatchLockService watchLockService; private ExecutionService executionService; private Clock clock; + private ThreadPool threadPool; @Before public void init() throws Exception { - TriggeredWatchStore triggeredWatchStore; - payload = mock(Payload.class); input = mock(ExecutableInput.class); inputResult = mock(Input.Result.class); @@ -88,13 +96,14 @@ public class ExecutionServiceTests extends ESTestCase { triggeredWatchStore = mock(TriggeredWatchStore.class); historyStore = mock(HistoryStore.class); - WatchExecutor executor = mock(WatchExecutor.class); + executor = mock(WatchExecutor.class); when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1)); watchLockService = mock(WatchLockService.class); clock = new ClockMock(); + threadPool = mock(ThreadPool.class); executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore, - watchLockService, clock); + watchLockService, clock, threadPool); ClusterState clusterState = mock(ClusterState.class); when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>()); @@ -483,7 +492,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInner() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -560,7 +569,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInnerThrottled() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -613,7 +622,7 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteInnerConditionNotMet() throws Exception { - DateTime now = DateTime.now(DateTimeZone.UTC); + DateTime now = now(DateTimeZone.UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); @@ -774,6 +783,28 @@ public class ExecutionServiceTests extends ESTestCase { verify(action, never()).execute("_action", context, payload); } + public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("foo"); + when(watch.nonce()).thenReturn(1L); + when(watchStore.get(any())).thenReturn(watch); + + // execute needs to fail as well as storing the history + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); + doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); + + Wid wid = new Wid(watch.id(), watch.nonce(), now()); + + Executor currentThreadExecutor = command -> command.run(); + when(threadPool.generic()).thenReturn(currentThreadExecutor); + + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now())); + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); + + verify(triggeredWatchStore, times(1)).delete(wid); + verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); + } + private Tuple whenCondition(final WatchExecutionContext context) { Condition.Result conditionResult = mock(Condition.Result.class); when(conditionResult.met()).thenReturn(true); @@ -791,5 +822,4 @@ public class ExecutionServiceTests extends ESTestCase { return new Tuple<>(transform, transformResult); } - } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java index f8498fe99d0..04611dcea70 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherUtilsTests.java @@ -108,7 +108,7 @@ public class WatcherUtilsTests extends ESTestCase { } String text = randomAsciiOfLengthBetween(1, 5); ScriptService.ScriptType scriptType = randomFrom(ScriptService.ScriptType.values()); - expectedTemplate = new Script(text, scriptType, randomBoolean() ? null : "mustache", params); + expectedTemplate = new Script(text, scriptType, "mustache", params); request = new WatcherSearchTemplateRequest(expectedIndices, expectedTypes, expectedSearchType, expectedIndicesOptions, expectedTemplate); } else { diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java index dc5c0838c91..44221316b23 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java @@ -54,6 +54,7 @@ import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -123,22 +124,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartRefreshFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 0); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -158,22 +144,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartSearchFailed() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -197,22 +168,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartNoWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -234,22 +190,7 @@ public class WatchStoreTests extends ESTestCase { public void testStartWatchStored() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -300,22 +241,7 @@ public class WatchStoreTests extends ESTestCase { public void testUsageStats() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) - .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) - .build()); - indexRoutingTableBuilder.addReplica(); - routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); - csBuilder.routingTable(routingTableBuilder.build()); + createWatchIndexMetaData(csBuilder); RefreshResponse refreshResponse = mockRefreshResponse(1, 1); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); @@ -419,6 +345,65 @@ public class WatchStoreTests extends ESTestCase { assertThat(stats.getValue("watch.transform.TYPE.active"), is(greaterThan(0))); } + public void testThatCleaningWatchesWorks() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + createWatchIndexMetaData(csBuilder); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap()); + hit.sourceRef(source); + + SearchResponse searchResponse = mockSearchResponse(1, 1, 1, hit); + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse); + + SearchResponse finalSearchResponse = mockSearchResponse(1, 1, 0); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(finalSearchResponse); + + Watch watch = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(watch.status()).thenReturn(status); + when(parser.parse("_id1", true, source)).thenReturn(watch); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(1)); + + watchStore.clearWatchesInMemory(); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches(), hasSize(0)); + assertThat(watchStore.activeWatches(), hasSize(0)); + } + + /* + * Creates the standard cluster state metadata for the watches index + * with shards/replicas being marked as started + */ + private void createWatchIndexMetaData(ClusterState.Builder builder) { + MetaData.Builder metaDateBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + builder.metaData(metaDateBuilder); + builder.routingTable(routingTableBuilder.build()); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); @@ -432,7 +417,6 @@ public class WatchStoreTests extends ESTestCase { when(searchResponse.getTotalShards()).thenReturn(total); when(searchResponse.getSuccessfulShards()).thenReturn(successful); when(searchResponse.getHits()).thenReturn(internalSearchHits); - when(searchResponse.getHits()).thenReturn(internalSearchHits); return searchResponse; }