Watcher: Allow to defer startup for tests (elastic/x-pack-elasticsearch#3903)

This commit introduces a new watcher setting to defer starting watcher
until it has been called with the API for the first time. This is
primarily useful in testing environments, as this ensures that watcher
does not try to reload itself because of starting first and then
creating watcher indices.

In addition the undocumented and unused option
xpack.watcher.start_immediately has been removed.

Relates elastic/x-pack-elasticsearch#3854

Original commit: elastic/x-pack-elasticsearch@2b55aec4ad
This commit is contained in:
Alexander Reelsen 2018-02-12 14:24:05 +01:00 committed by GitHub
parent 48f6a752cb
commit 54cb890eb7
3 changed files with 22 additions and 5 deletions

View File

@ -415,7 +415,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
settings.add(Setting.simpleString("xpack.watcher.input.search.default_timeout", Setting.Property.NodeScope)); settings.add(Setting.simpleString("xpack.watcher.input.search.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.transform.search.default_timeout", Setting.Property.NodeScope)); settings.add(Setting.simpleString("xpack.watcher.transform.search.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope)); settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.start_immediately", Setting.Property.NodeScope)); settings.add(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START);
// notification services // notification services
settings.addAll(SlackService.getSettings()); settings.addAll(SlackService.getSettings());

View File

@ -18,6 +18,8 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -40,17 +42,25 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener { public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
// this option configures watcher not to start, unless the cluster state contains information to start watcher
// if you start with an empty cluster, you can delay starting watcher until you call the API manually
// if you start with a cluster containing data, this setting might have no effect, once you called the API yourself
// this is merely for testing, to make sure that watcher only starts when manually called
public static final Setting<Boolean> SETTING_REQUIRE_MANUAL_START =
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
private final WatcherService watcherService; private final WatcherService watcherService;
private final ExecutorService executor; private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private final boolean requireManualStart;
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService, WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) { WatcherService watcherService) {
super(settings); super(settings);
this.executor = threadPool.executor(ThreadPool.Names.GENERIC); this.executor = threadPool.executor(ThreadPool.Names.GENERIC);
this.watcherService = watcherService; this.watcherService = watcherService;
this.requireManualStart = SETTING_REQUIRE_MANUAL_START.get(settings);
clusterService.addListener(this); clusterService.addListener(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will // Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled. // happen because we're shutting down and an watch is scheduled.
@ -127,6 +137,13 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
return; return;
} }
// if watcher should not be started immediately unless it is has been manually configured to do so
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);
if (watcherMetaData == null && requireManualStart) {
clearAllocationIds();
return;
}
if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) { if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
clearAllocationIds(); clearAllocationIds();
executor.execute(() -> this.stop("no master node")); executor.execute(() -> this.stop("no master node"));

View File

@ -52,6 +52,7 @@ import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.notification.email.Authentication; import org.elasticsearch.xpack.watcher.notification.email.Authentication;
import org.elasticsearch.xpack.watcher.notification.email.Email; import org.elasticsearch.xpack.watcher.notification.email.Email;
@ -112,8 +113,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
// watcher settings that should work despite randomization // watcher settings that should work despite randomization
.put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100))
.put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100))
//.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4) .put(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START.getKey(), true)
//.put(NetworkModule.HTTP_TYPE_KEY, SecurityField.NAME4)
.build(); .build();
} }
@ -611,4 +611,4 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
return "time frozen"; return "time frozen";
} }
} }
} }