From 1de2a925ce8bf30d53ad71dfcd6e33ebbf7827c6 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 21 Sep 2018 14:22:34 +0200 Subject: [PATCH] Watcher: Ensure that execution triggers properly on initial setup (#33360) This commit reverts most of #33157 as it introduces another race condition and breaks a common case of watcher, when the first watch is added to the system and the index does not exist yet. This means, that the index will be created, which triggers a reload, but during this time the put watch operation that triggered this is not yet indexed, so that both processes finish roughly add the same time and should not overwrite each other but act complementary. This commit reverts the logic of cleaning out the ticker engine watches on start up, as this is done already when the execution is paused - which also gets paused on the cluster state listener again, as we can be sure here, that the watches index has not yet been created. This also adds a new test, that starts a one node cluster and emulates the case of a non existing watches index and a watch being added, which should result in proper execution. Closes #33320 --- .../test/rest/ESRestTestCase.java | 2 +- .../xpack/watcher/WatcherService.java | 23 +++--- .../engine/TickerScheduleTriggerEngine.java | 14 +++- .../xpack/watcher/WatcherServiceTests.java | 72 +++++++++++-------- .../test/integration/SingleNodeTests.java | 66 +++++++++++++++++ .../engine/TickerScheduleEngineTests.java | 36 ---------- .../SmokeTestWatcherWithSecurityIT.java | 36 +++++----- 7 files changed, 146 insertions(+), 103 deletions(-) create mode 100644 x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 1b29a9112c2..a5f23104dea 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -635,7 +635,7 @@ public abstract class ESRestTestCase extends ESTestCase { if (name.startsWith(".monitoring-")) { return true; } - if (name.startsWith(".watch-history-")) { + if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) { return true; } if (name.startsWith(".ml-")) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 599287bb50a..75fd13915de 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -27,12 +27,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.upgrade.UpgradeField; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -63,7 +63,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; import static org.elasticsearch.xpack.core.watcher.watch.Watch.INDEX; @@ -92,7 +91,7 @@ public class WatcherService extends AbstractComponent { this.scrollSize = settings.getAsInt("xpack.watcher.watch.scroll.size", 100); this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30)); this.parser = parser; - this.client = client; + this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN); this.executor = executor; } @@ -184,6 +183,10 @@ public class WatcherService extends AbstractComponent { // changes processedClusterStateVersion.set(state.getVersion()); + triggerService.pauseExecution(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); + executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e))); } @@ -232,10 +235,6 @@ public class WatcherService extends AbstractComponent { // also this is the place where we pause the trigger service execution and clear the current execution service, so that we make sure // that existing executions finish, but no new ones are executed if (processedClusterStateVersion.get() == state.getVersion()) { - triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); - logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); - executionService.unPause(); triggerService.start(watches); if (triggeredWatches.isEmpty() == false) { @@ -273,7 +272,7 @@ public class WatcherService extends AbstractComponent { SearchResponse response = null; List watches = new ArrayList<>(); - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + try { RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(INDEX)) .actionGet(TimeValue.timeValueSeconds(5)); if (refreshResponse.getSuccessfulShards() < indexMetaData.getNumberOfShards()) { @@ -357,11 +356,9 @@ public class WatcherService extends AbstractComponent { } } finally { if (response != null) { - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(response.getScrollId()); - client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); - } + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 4c10f794880..bd0204766af 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -22,6 +22,7 @@ import org.joda.time.DateTime; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -49,14 +50,23 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine { @Override public synchronized void start(Collection jobs) { long startTime = clock.millis(); - Map schedules = new ConcurrentHashMap<>(); + Map schedules = new HashMap<>(jobs.size()); for (Watch job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); } } - this.schedules = schedules; + // why are we calling putAll() here instead of assigning a brand + // new concurrent hash map you may ask yourself over here + // This requires some explanation how TriggerEngine.start() is + // invoked, when a reload due to the cluster state listener is done + // If the watches index does not exist, and new document is stored, + // then the creation of that index will trigger a reload which calls + // this method. The index operation however will run at the same time + // as the reload, so if we clean out the old data structure here, + // that can lead to that one watch not being triggered + this.schedules.putAll(schedules); } @Override diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 73f9271e3ef..f1c711ae00a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -6,19 +6,21 @@ package org.elasticsearch.xpack.watcher; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -42,7 +44,6 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.watcher.trigger.Trigger; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; @@ -55,6 +56,7 @@ import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.Collections; @@ -67,6 +69,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -76,6 +79,16 @@ public class WatcherServiceTests extends ESTestCase { private final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + private final Client client = mock(Client.class); + + @Before + public void configureMockClient() { + when(client.settings()).thenReturn(Settings.EMPTY); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + } + public void testValidateStartWithClosedIndex() { TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); @@ -83,7 +96,7 @@ public class WatcherServiceTests extends ESTestCase { WatchParser parser = mock(WatchParser.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, - executionService, parser, mock(Client.class), executorService) { + executionService, parser, client, executorService) { @Override void stopExecutor() { } @@ -102,18 +115,11 @@ public class WatcherServiceTests extends ESTestCase { } public void testLoadOnlyActiveWatches() throws Exception { - // this is just, so we dont have to add any mocking to the threadpool - Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); - TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); ExecutionService executionService = mock(ExecutionService.class); WatchParser parser = mock(WatchParser.class); - Client client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - WatcherService service = new WatcherService(settings, triggerService, triggeredWatchStore, + WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, executionService, parser, client, executorService) { @Override void stopExecutor() { @@ -150,21 +156,21 @@ public class WatcherServiceTests extends ESTestCase { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getSuccessfulShards()) .thenReturn(clusterState.getMetaData().getIndices().get(Watch.INDEX).getNumberOfShards()); - AdminClient adminClient = mock(AdminClient.class); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - when(client.admin()).thenReturn(adminClient); - when(adminClient.indices()).thenReturn(indicesAdminClient); - PlainActionFuture refreshFuture = new PlainActionFuture<>(); - when(indicesAdminClient.refresh(any(RefreshRequest.class))).thenReturn(refreshFuture); - refreshFuture.onResponse(refreshResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(refreshResponse); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(RefreshRequest.class), any(ActionListener.class)); // empty scroll response, no further scrolling needed SearchResponseSections scrollSearchSections = new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 1); SearchResponse scrollSearchResponse = new SearchResponse(scrollSearchSections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - PlainActionFuture searchScrollResponseFuture = new PlainActionFuture<>(); - when(client.searchScroll(any(SearchScrollRequest.class))).thenReturn(searchScrollResponseFuture); - searchScrollResponseFuture.onResponse(scrollSearchResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(scrollSearchResponse); + return null; + }).when(client).execute(eq(SearchScrollAction.INSTANCE), any(SearchScrollRequest.class), any(ActionListener.class)); // one search response containing active and inactive watches int count = randomIntBetween(2, 200); @@ -192,13 +198,17 @@ public class WatcherServiceTests extends ESTestCase { SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1); SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - PlainActionFuture searchResponseFuture = new PlainActionFuture<>(); - when(client.search(any(SearchRequest.class))).thenReturn(searchResponseFuture); - searchResponseFuture.onResponse(searchResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(searchResponse); + return null; + }).when(client).execute(eq(SearchAction.INSTANCE), any(SearchRequest.class), any(ActionListener.class)); - PlainActionFuture clearScrollFuture = new PlainActionFuture<>(); - when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture); - clearScrollFuture.onResponse(new ClearScrollResponse(true, 1)); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new ClearScrollResponse(true, 1)); + return null; + }).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), any(ActionListener.class)); service.start(clusterState, () -> {}); @@ -228,7 +238,7 @@ public class WatcherServiceTests extends ESTestCase { assertThat(triggerService.count(), is(1L)); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), - mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) { + mock(ExecutionService.class), mock(WatchParser.class), client, executorService) { @Override void stopExecutor() { } @@ -245,7 +255,7 @@ public class WatcherServiceTests extends ESTestCase { ExecutionService executionService = mock(ExecutionService.class); TriggerService triggerService = mock(TriggerService.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), - executionService, mock(WatchParser.class), mock(Client.class), executorService) { + executionService, mock(WatchParser.class), client, executorService) { @Override void stopExecutor() { } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java new file mode 100644 index 00000000000..2109f2a2d95 --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.test.integration; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, supportsDedicatedMasters = false) +public class SingleNodeTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + return false; + } + + // this is the standard setup when starting watcher in a regular cluster + // the index does not exist, a watch gets added + // the watch should be executed properly, despite the index being created and the cluster state listener being reloaded + public void testThatLoadingWithNonExistingIndexWorks() throws Exception { + stopWatcher(); + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, clusterStateResponse.getState().metaData()); + String watchIndexName = metaData.getIndex().getName(); + assertAcked(client().admin().indices().prepareDelete(watchIndexName)); + startWatcher(); + + String watchId = randomAlphaOfLength(20); + // now we start with an empty set up, store a watch and expected it to be executed + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watchId) + .setSource(watchBuilder() + .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS))) + .input(simpleInput()) + .addAction("_logger", loggingAction("logging of watch _name"))) + .get(); + assertThat(putWatchResponse.isCreated(), is(true)); + + assertBusy(() -> { + client().admin().indices().prepareRefresh(".watcher-history*"); + SearchResponse searchResponse = client().prepareSearch(".watcher-history*").setSize(0).get(); + assertThat(searchResponse.getHits().getTotalHits(), is(greaterThanOrEqualTo(1L))); + }, 5, TimeUnit.SECONDS); + } + +} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 6680b38ab94..db1d3767b59 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -35,9 +35,7 @@ import java.util.function.Consumer; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; -import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; @@ -110,40 +108,6 @@ public class TickerScheduleEngineTests extends ESTestCase { assertThat(bits.cardinality(), is(count)); } - public void testStartClearsExistingSchedules() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - List firedWatchIds = new ArrayList<>(); - engine.register(new Consumer>() { - @Override - public void accept(Iterable events) { - for (TriggerEvent event : events) { - firedWatchIds.add(event.jobName()); - } - latch.countDown(); - } - }); - - int count = randomIntBetween(2, 5); - List watches = new ArrayList<>(); - for (int i = 0; i < count; i++) { - watches.add(createWatch(String.valueOf(i), interval("1s"))); - } - engine.start(watches); - - watches.clear(); - for (int i = 0; i < count; i++) { - watches.add(createWatch("another_id" + i, interval("1s"))); - } - engine.start(watches); - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!latch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - assertThat(firedWatchIds, everyItem(startsWith("another_id"))); - } - public void testAddHourly() throws Exception { final String name = "job_name"; final CountDownLatch latch = new CountDownLatch(1); diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java index 17fbf0769fd..25b19aeea3b 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java @@ -59,20 +59,20 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { String state = objectPath.evaluate("stats.0.watcher_state"); switch (state) { - case "stopped": - Response startResponse = adminClient().performRequest(new Request("POST", "/_xpack/watcher/_start")); - String body = EntityUtils.toString(startResponse.getEntity()); - assertThat(body, containsString("\"acknowledged\":true")); - break; - case "stopping": - throw new AssertionError("waiting until stopping state reached stopped state to start again"); - case "starting": - throw new AssertionError("waiting until starting state reached started state"); - case "started": - // all good here, we are done - break; - default: - throw new AssertionError("unknown state[" + state + "]"); + case "stopped": + Response startResponse = adminClient().performRequest(new Request("POST", "/_xpack/watcher/_start")); + Map responseMap = entityAsMap(startResponse); + assertThat(responseMap, hasEntry("acknowledged", true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); } } catch (IOException e) { throw new AssertionError(e); @@ -135,7 +135,6 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testSearchInputHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -159,7 +158,6 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { assertThat(conditionMet, is(true)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29893") public void testSearchInputWithInsufficientPrivileges() throws Exception { String indexName = "index_not_allowed_to_read"; try (XContentBuilder builder = jsonBuilder()) { @@ -186,7 +184,6 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { assertThat(conditionMet, is(false)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testSearchTransformHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -216,7 +213,6 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { assertThat(value, is("15")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33291") public void testSearchTransformInsufficientPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -244,7 +240,6 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { assertThat(response.getStatusLine().getStatusCode(), is(404)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30777") public void testIndexActionHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -269,7 +264,6 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { assertThat(spam, is("eggs")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testIndexActionInsufficientPrivileges() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -299,6 +293,8 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase { Response response = client().performRequest(request); Map responseMap = entityAsMap(response); assertThat(responseMap, hasEntry("_id", watchId)); + assertThat(responseMap, hasEntry("created", true)); + assertThat(responseMap, hasEntry("_version", 1)); } private ObjectPath getWatchHistoryEntry(String watchId) throws Exception {