Improve Watcher test framework resiliency (#40658)

It is possible for the watches tracked by ScheduleTriggerEngineMock to
get out of sync with the Watches in the ScheduleTriggerEngine
production code, which can lead to watches failing to run.

This commit:

1. Changes TimeWarp to try to run the watch on all schedulers, rather than stopping after one which claims to have the watch registered. This reduces the impact of desynchronization between the mocking code and the backing production code.
2. Makes ScheduleTriggerEngineMock respect pauses of execution again. This is necessary to prevent duplicate watch invocations due to the above change.
3. Tweaks how watches are registered in ScheduleTriggerEngineMock to prevent race conditions due to concurrent modification.
4. Tweaks WatcherConcreteIndexTests to use TimeWarp instead of waiting for watches to be triggered, as TimeWarp is more reliable and accomplishes the same goal.
This commit is contained in:
Gordon Brown 2019-04-12 16:53:50 -06:00 committed by GitHub
parent 47ba45732d
commit c8bc4ab003
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 39 additions and 31 deletions

View File

@ -23,11 +23,6 @@ import static org.hamcrest.Matchers.greaterThan;
public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {
@Override
protected boolean timeWarped() {
return false;
}
public void testCanUseAnyConcreteIndexName() throws Exception {
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
@ -35,6 +30,7 @@ public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCas
stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName);
ensureGreen(newWatcherIndexName);
startWatcher();
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
@ -45,6 +41,9 @@ public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCas
.get();
assertTrue(putWatchResponse.isCreated());
refresh();
timeWarp().trigger("mywatch");
assertBusy(() -> {
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();

View File

@ -15,10 +15,10 @@ import org.elasticsearch.xpack.core.ssl.TestsSSLService;
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.Scheme;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
@ -67,7 +67,6 @@ public class WebhookHttpsIntegrationTests extends AbstractWatcherIntegrationTest
webServer.close();
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public void testHttps() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort())

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.actions.webhook;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.transport.TransportAddress;
@ -18,9 +17,9 @@ import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
@ -44,7 +43,6 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase {
private MockWebServer webServer = new MockWebServer();

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -580,6 +582,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
protected static class TimeWarp {
private static final Logger logger = LogManager.getLogger(TimeWarp.class);
private final List<ScheduleTriggerEngineMock> schedulers;
private final ClockMock clock;
@ -598,9 +601,14 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
public void trigger(String watchId, int times, TimeValue timeValue) {
boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue));
String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId);
assertThat(msg, isTriggered, is(true));
long triggeredCount = schedulers.stream()
.filter(scheduler -> scheduler.trigger(watchId, times, timeValue))
.count();
String msg = String.format(Locale.ROOT, "watch was triggered on [%d] schedulers, expected [1]", triggeredCount);
if (triggeredCount > 1) {
logger.warn(msg);
}
assertThat(msg, triggeredCount, greaterThanOrEqualTo(1L));
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
@ -63,7 +62,6 @@ import static org.hamcrest.Matchers.notNullValue;
@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," +
"org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE")
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
public void testIndexWatch() throws Exception {

View File

@ -87,7 +87,6 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
return super.nodeSettings(nodeOrdinal);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40587")
public void testHttpInput() throws Exception {
WatcherClient watcherClient = watcherClient();
watcherClient.preparePutWatch("_id")

View File

@ -122,7 +122,6 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(throttledCount, greaterThan(0L));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35506")
public void testAckAllActions() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
.setId("_id")

View File

@ -38,7 +38,6 @@ import static org.hamcrest.Matchers.greaterThan;
public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40631")
public void testWatchMetadata() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("foo", "bar");

View File

@ -21,8 +21,10 @@ import java.time.Clock;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger
@ -31,7 +33,8 @@ import java.util.concurrent.ConcurrentMap;
public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class);
private final ConcurrentMap<String, Watch> watches = new ConcurrentHashMap<>();
private final AtomicReference<Map<String, Watch>> watches = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicBoolean paused = new AtomicBoolean(false);
public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) {
super(scheduleRegistry, clock);
@ -49,30 +52,32 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
}
@Override
public void start(Collection<Watch> jobs) {
jobs.forEach(this::add);
public synchronized void start(Collection<Watch> jobs) {
Map<String, Watch> newWatches = new ConcurrentHashMap<>();
jobs.forEach((watch) -> newWatches.put(watch.id(), watch));
watches.set(newWatches);
paused.set(false);
}
@Override
public void stop() {
watches.clear();
watches.set(new ConcurrentHashMap<>());
}
@Override
public void add(Watch watch) {
public synchronized void add(Watch watch) {
logger.debug("adding watch [{}]", watch.id());
watches.put(watch.id(), watch);
watches.get().put(watch.id(), watch);
}
@Override
public void pauseExecution() {
// No action is needed because this engine does not trigger watches on a schedule (instead
// they must be triggered manually).
paused.set(true);
}
@Override
public boolean remove(String jobId) {
return watches.remove(jobId) != null;
public synchronized boolean remove(String jobId) {
return watches.get().remove(jobId) != null;
}
public boolean trigger(String jobName) {
@ -80,7 +85,11 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
}
public boolean trigger(String jobName, int times, TimeValue interval) {
if (watches.containsKey(jobName) == false) {
if (watches.get().containsKey(jobName) == false) {
return false;
}
if (paused.get()) {
logger.info("not executing watch [{}] on this scheduler because it is paused", jobName);
return false;
}
@ -89,7 +98,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
logger.debug("firing watch [{}] at [{}]", jobName, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event)));
if (interval != null) {
if (interval != null) {
if (clock instanceof ClockMock) {
((ClockMock) clock).fastForward(interval);
} else {