Watcher: Allow to defer startup for tests (elastic/x-pack-elasticsearch#3903)
This commit introduces a new watcher setting to defer starting watcher until it has been called with the API for the first time. This is primarily useful in testing environments, as this ensures that watcher does not try to reload itself because of starting first and then creating watcher indices. In addition the undocumented and unused option xpack.watcher.start_immediately has been removed. Relates elastic/x-pack-elasticsearch#3854 Original commit: elastic/x-pack-elasticsearch@2b55aec4ad
This commit is contained in:
parent
48f6a752cb
commit
54cb890eb7
|
@ -415,7 +415,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
settings.add(Setting.simpleString("xpack.watcher.input.search.default_timeout", Setting.Property.NodeScope));
|
settings.add(Setting.simpleString("xpack.watcher.input.search.default_timeout", Setting.Property.NodeScope));
|
||||||
settings.add(Setting.simpleString("xpack.watcher.transform.search.default_timeout", Setting.Property.NodeScope));
|
settings.add(Setting.simpleString("xpack.watcher.transform.search.default_timeout", Setting.Property.NodeScope));
|
||||||
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
|
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
|
||||||
settings.add(Setting.simpleString("xpack.watcher.start_immediately", Setting.Property.NodeScope));
|
settings.add(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START);
|
||||||
|
|
||||||
// notification services
|
// notification services
|
||||||
settings.addAll(SlackService.getSettings());
|
settings.addAll(SlackService.getSettings());
|
||||||
|
|
|
@ -18,6 +18,8 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.component.LifecycleListener;
|
import org.elasticsearch.common.component.LifecycleListener;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -40,17 +42,25 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||||
|
|
||||||
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
|
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
|
||||||
|
|
||||||
|
// this option configures watcher not to start, unless the cluster state contains information to start watcher
|
||||||
|
// if you start with an empty cluster, you can delay starting watcher until you call the API manually
|
||||||
|
// if you start with a cluster containing data, this setting might have no effect, once you called the API yourself
|
||||||
|
// this is merely for testing, to make sure that watcher only starts when manually called
|
||||||
|
public static final Setting<Boolean> SETTING_REQUIRE_MANUAL_START =
|
||||||
|
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
|
||||||
|
|
||||||
private final WatcherService watcherService;
|
private final WatcherService watcherService;
|
||||||
private final ExecutorService executor;
|
private final ExecutorService executor;
|
||||||
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
|
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
|
||||||
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
|
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
|
||||||
|
private final boolean requireManualStart;
|
||||||
|
|
||||||
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||||
WatcherService watcherService) {
|
WatcherService watcherService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.executor = threadPool.executor(ThreadPool.Names.GENERIC);
|
this.executor = threadPool.executor(ThreadPool.Names.GENERIC);
|
||||||
this.watcherService = watcherService;
|
this.watcherService = watcherService;
|
||||||
|
this.requireManualStart = SETTING_REQUIRE_MANUAL_START.get(settings);
|
||||||
clusterService.addListener(this);
|
clusterService.addListener(this);
|
||||||
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
|
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
|
||||||
// happen because we're shutting down and an watch is scheduled.
|
// happen because we're shutting down and an watch is scheduled.
|
||||||
|
@ -127,6 +137,13 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if watcher should not be started immediately unless it is has been manually configured to do so
|
||||||
|
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);
|
||||||
|
if (watcherMetaData == null && requireManualStart) {
|
||||||
|
clearAllocationIds();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
|
if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
|
||||||
clearAllocationIds();
|
clearAllocationIds();
|
||||||
executor.execute(() -> this.stop("no master node"));
|
executor.execute(() -> this.stop("no master node"));
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
|
||||||
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
|
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||||
|
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
|
||||||
import org.elasticsearch.xpack.watcher.history.HistoryStore;
|
import org.elasticsearch.xpack.watcher.history.HistoryStore;
|
||||||
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
|
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
|
||||||
import org.elasticsearch.xpack.watcher.notification.email.Email;
|
import org.elasticsearch.xpack.watcher.notification.email.Email;
|
||||||
|
@ -112,8 +113,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||||
// watcher settings that should work despite randomization
|
// watcher settings that should work despite randomization
|
||||||
.put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100))
|
.put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100))
|
||||||
.put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100))
|
.put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100))
|
||||||
//.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4)
|
.put(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START.getKey(), true)
|
||||||
//.put(NetworkModule.HTTP_TYPE_KEY, SecurityField.NAME4)
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -611,4 +611,4 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
||||||
return "time frozen";
|
return "time frozen";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue