Watcher: Remove guice modules from codebase (elastic/elasticsearch#4030)

This removes all guice module classes from the watcher codebase, so that guice is
only used for the transport and rest actions, but nowhere else in the codebase.

Also it ensures, that only ticker/schedule are valid trigger engine options.

Original commit: elastic/x-pack-elasticsearch@400ba24c33
This commit is contained in:
Alexander Reelsen 2016-11-10 10:46:20 +01:00 committed by GitHub
parent 7f5216a112
commit 2081399738
37 changed files with 215 additions and 460 deletions

View File

@ -203,8 +203,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
ArrayList<Module> modules = new ArrayList<>();
modules.add(b -> b.bind(Clock.class).toInstance(getClock()));
modules.addAll(security.nodeModules());
modules.addAll(watcher.nodeModules());
modules.addAll(monitoring.nodeModules());
modules.addAll(watcher.nodeModules());
modules.addAll(graph.createGuiceModules());
if (transportClientMode) {
@ -232,7 +232,6 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
extensionsService.getExtensions()));
components.addAll(monitoring.createComponents(internalClient, threadPool, clusterService, licenseService, sslService));
// watcher http stuff
Map<String, HttpAuthFactory> httpAuthFactories = new HashMap<>();
httpAuthFactories.put(BasicAuth.TYPE, new BasicAuthFactory(security.getCryptoService()));
@ -248,7 +247,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
components.addAll(notificationComponents);
components.addAll(watcher.createComponents(getClock(), scriptService, internalClient, searchRequestParsers, licenseState,
httpClient, httpTemplateParser, components));
httpClient, httpTemplateParser, threadPool, clusterService, security.getCryptoService(), components));
// just create the reloader as it will pull all of the loaded ssl configurations and start watching them

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.graph;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;

View File

@ -10,9 +10,11 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
@ -30,6 +32,7 @@ import org.elasticsearch.script.ScriptSettings;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.common.http.HttpClient;
@ -41,6 +44,7 @@ import org.elasticsearch.xpack.notification.hipchat.HipChatService;
import org.elasticsearch.xpack.notification.pagerduty.PagerDutyService;
import org.elasticsearch.xpack.notification.slack.SlackService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.watcher.actions.ActionFactory;
import org.elasticsearch.xpack.watcher.actions.ActionRegistry;
import org.elasticsearch.xpack.watcher.actions.email.EmailAction;
@ -65,10 +69,12 @@ import org.elasticsearch.xpack.watcher.condition.ConditionFactory;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.condition.NeverCondition;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionModule;
import org.elasticsearch.xpack.watcher.execution.AsyncTriggerListener;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.InternalWatchExecutor;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatch;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.execution.WatchExecutor;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.input.InputFactory;
import org.elasticsearch.xpack.watcher.input.InputRegistry;
@ -93,6 +99,8 @@ import org.elasticsearch.xpack.watcher.rest.action.RestWatchServiceAction;
import org.elasticsearch.xpack.watcher.rest.action.RestWatcherStatsAction;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
import org.elasticsearch.xpack.watcher.transform.script.ScriptTransform;
@ -115,9 +123,22 @@ import org.elasticsearch.xpack.watcher.transport.actions.service.TransportWatche
import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.TransportWatcherStatsAction;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerModule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.xpack.watcher.watch.WatchModule;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.DailySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.HourlySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.MonthlySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.WeeklySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.YearlySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -128,12 +149,15 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class Watcher implements ActionPlugin, ScriptPlugin {
@ -143,6 +167,19 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope);
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
public static final Setting<String> TRIGGER_SCHEDULE_ENGINE_SETTING =
new Setting<>("xpack.watcher.trigger.schedule.engine", "ticker", s -> {
switch (s) {
case "ticker":
case "scheduler":
return s;
default:
throw new IllegalArgumentException("Can't parse [xpack.watcher.trigger.schedule.engine] must be one of [ticker, " +
"scheduler], was [" + s + "]");
}
}, Setting.Property.NodeScope);
private static final ScriptContext.Plugin SCRIPT_PLUGIN = new ScriptContext.Plugin("xpack", "watch");
public static final ScriptContext SCRIPT_CONTEXT = SCRIPT_PLUGIN::getKey;
@ -156,18 +193,25 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
protected final Settings settings;
protected final boolean transportClient;
protected final boolean enabled;
private final boolean transportClientMode;
public Watcher(Settings settings) {
this.settings = settings;
transportClient = "transport".equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));
this.enabled = XPackSettings.WATCHER_ENABLED.get(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
validAutoCreateIndex(settings);
}
public Collection<Object> createComponents(Clock clock, ScriptService scriptService, InternalClient internalClient,
SearchRequestParsers searchRequestParsers, XPackLicenseState licenseState,
HttpClient httpClient, HttpRequestTemplate.Parser httpTemplateParser,
ThreadPool threadPool, ClusterService clusterService, CryptoService cryptoService,
Collection<Object> components) {
if (enabled == false) {
return Collections.emptyList();
}
final Map<String, ConditionFactory> parsers = new HashMap<>();
parsers.put(AlwaysCondition.TYPE, (c, id, p, upgrade) -> AlwaysCondition.parse(id, p));
parsers.put(NeverCondition.TYPE, (c, id, p, upgrade) -> NeverCondition.parse(id, p));
@ -208,11 +252,79 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories);
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry));
// TODO replace internal client where needed, so we can remove ctors
final WatcherClientProxy watcherClientProxy = new WatcherClientProxy(settings, internalClient);
final WatcherClient watcherClient = new WatcherClient(internalClient);
final HistoryStore historyStore = new HistoryStore(settings, internalClient);
final HistoryStore historyStore = new HistoryStore(settings, watcherClientProxy);
return Arrays.asList(registry, watcherClient, inputRegistry, historyStore);
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
scheduleParsers.add(new CronSchedule.Parser());
scheduleParsers.add(new DailySchedule.Parser());
scheduleParsers.add(new HourlySchedule.Parser());
scheduleParsers.add(new IntervalSchedule.Parser());
scheduleParsers.add(new MonthlySchedule.Parser());
scheduleParsers.add(new WeeklySchedule.Parser());
scheduleParsers.add(new YearlySchedule.Parser());
final ScheduleRegistry scheduleRegistry = new ScheduleRegistry(scheduleParsers);
TriggerEngine manualTriggerEngine = new ManualTriggerEngine();
TriggerEngine configuredTriggerEngine = getTriggerEngine(clock, scheduleRegistry);
final Set<TriggerEngine> triggerEngines = new HashSet<>();
triggerEngines.add(manualTriggerEngine);
triggerEngines.add(configuredTriggerEngine);
final TriggerService triggerService = new TriggerService(settings, triggerEngines);
final TriggeredWatch.Parser triggeredWatchParser = new TriggeredWatch.Parser(settings, triggerService);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, watcherClientProxy, triggeredWatchParser);
final WatcherSearchTemplateService watcherSearchTemplateService =
new WatcherSearchTemplateService(settings, scriptService, searchRequestParsers);
final WatchLockService watchLockService = new WatchLockService(settings);
final WatchExecutor watchExecutor = getWatchExecutor(threadPool);
final Watch.Parser watchParser = new Watch.Parser(settings, triggerService, registry, inputRegistry, cryptoService, clock);
final WatchStore watchStore = new WatchStore(settings, watcherClientProxy, watchParser);
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
watchStore, watchLockService, clock, threadPool);
final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService);
triggerService.register(triggerEngineListener);
final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings,
clusterService.getClusterSettings(), clusterService, threadPool, internalClient);
final WatcherService watcherService = new WatcherService(settings, clock, triggerService, watchStore,
watchParser, executionService, watchLockService, watcherIndexTemplateRegistry);
final WatcherLifeCycleService watcherLifeCycleService =
new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService);
return Arrays.asList(registry, watcherClient, inputRegistry, historyStore, triggerService, triggeredWatchParser,
watcherLifeCycleService, executionService, watchStore, triggerEngineListener, watcherService, watchParser,
configuredTriggerEngine, triggeredWatchStore, watcherSearchTemplateService);
}
protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) {
String engine = TRIGGER_SCHEDULE_ENGINE_SETTING.get(settings);
switch (engine) {
case "scheduler":
return new SchedulerScheduleTriggerEngine(settings, scheduleRegistry, clock);
case "ticker":
return new TickerScheduleTriggerEngine(settings, scheduleRegistry, clock);
default: // should never happen, as the setting is already parsing for scheduler/ticker
throw illegalState("schedule engine must be either set to [scheduler] or [ticker], but was []", engine);
}
}
protected WatchExecutor getWatchExecutor(ThreadPool threadPool) {
return new InternalWatchExecutor(threadPool);
}
protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) {
return new AsyncTriggerListener(settings, executionService);
}
private <T> T getService(Class<T> serviceClass, Collection<Object> services) {
@ -227,13 +339,13 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>();
modules.add(new WatcherModule(enabled, transportClient));
if (enabled && transportClient == false) {
modules.add(new WatchModule());
modules.add(new TriggerModule(settings));
modules.add(new ScheduleModule());
modules.add(new ExecutionModule());
modules.add(b -> {
XPackPlugin.bindFeatureSet(b, WatcherFeatureSet.class);
if (transportClientMode || enabled == false) {
b.bind(WatcherService.class).toProvider(Providers.of(null));
}
});
return modules;
}
@ -241,12 +353,12 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
return Settings.EMPTY;
}
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = new ArrayList<>();
for (TemplateConfig templateConfig : WatcherIndexTemplateRegistry.TEMPLATE_CONFIGS) {
settings.add(templateConfig.getSetting());
}
settings.add(TRIGGER_SCHEDULE_ENGINE_SETTING);
settings.add(INDEX_WATCHER_TEMPLATE_VERSION_SETTING);
settings.add(MAX_STOP_TIMEOUT_SETTING);
settings.add(ExecutionService.DEFAULT_THROTTLE_PERIOD_SETTING);
@ -259,7 +371,6 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
settings.add(Setting.simpleString("xpack.watcher.internal.ops.index.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.actions.index.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.index.rest.direct_access", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.trigger.schedule.engine", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.input.search.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.transform.search.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.trigger.schedule.ticker.tick_interval", Setting.Property.NodeScope));

View File

@ -18,7 +18,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService;
@ -35,7 +34,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private volatile WatcherMetaData watcherMetaData;
@Inject
public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
super(settings);

View File

@ -1,41 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
public class WatcherModule extends AbstractModule {
private final boolean enabled;
private final boolean transportClientMode;
public WatcherModule(boolean enabled, boolean transportClientMode) {
this.enabled = enabled;
this.transportClientMode = transportClientMode;
}
@Override
protected void configure() {
if (transportClientMode) {
return;
}
if (enabled == false) {
// watcher service must be null, so that the watcher feature set can be instantiated even if watcher is not enabled
bind(WatcherService.class).toProvider(Providers.of(null));
} else {
bind(WatcherLifeCycleService.class).asEagerSingleton();
bind(WatcherIndexTemplateRegistry.class).asEagerSingleton();
}
XPackPlugin.bindFeatureSet(binder(), WatcherFeatureSet.class);
}
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
@ -47,7 +46,6 @@ public class WatcherService extends AbstractComponent {
// package-private for testing
final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STOPPED);
@Inject
public WatcherService(Settings settings, Clock clock, TriggerService triggerService, WatchStore watchStore,
Watch.Parser watchParser, ExecutionService executionService, WatchLockService watchLockService,
WatcherIndexTemplateRegistry watcherIndexTemplateRegistry) {

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.xpack.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.xpack.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.watcher.transport.actions.ack.AckWatchRequestBuilder;
@ -48,7 +47,6 @@ public class WatcherClient {
private final Client client;
@Inject
public WatcherClient(Client client) {
this.client = client;
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.watcher.execution;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import static java.util.stream.StreamSupport.stream;
@ -22,11 +20,9 @@ public class AsyncTriggerListener implements TriggerEngine.Listener {
private final Logger logger;
private final ExecutionService executionService;
@Inject
public AsyncTriggerListener(Settings settings, ExecutionService executionService, TriggerService triggerService) {
public AsyncTriggerListener(Settings settings, ExecutionService executionService) {
this.logger = Loggers.getLogger(SyncTriggerListener.class, settings);
this.executionService = executionService;
triggerService.register(this);
}
@Override

View File

@ -1,35 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
public class ExecutionModule extends AbstractModule {
private final Class<? extends WatchExecutor> executorClass;
private final Class<? extends TriggerEngine.Listener> triggerEngineListenerClass;
public ExecutionModule() {
this(InternalWatchExecutor.class, AsyncTriggerListener.class);
}
protected ExecutionModule(Class<? extends WatchExecutor> executorClass,
Class<? extends TriggerEngine.Listener> triggerEngineListenerClass) {
this.executorClass = executorClass;
this.triggerEngineListenerClass = triggerEngineListenerClass;
}
@Override
protected void configure() {
bind(TriggeredWatch.Parser.class).asEagerSingleton();
bind(TriggeredWatchStore.class).asEagerSingleton();
bind(ExecutionService.class).asEagerSingleton();
bind(executorClass).asEagerSingleton();
bind(triggerEngineListenerClass).asEagerSingleton();
bind(WatchExecutor.class).to(executorClass);
}
}

View File

@ -11,7 +11,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Setting;
@ -68,7 +67,6 @@ public class ExecutionService extends AbstractComponent {
private volatile CurrentExecutions currentExecutions;
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
WatchStore watchStore, WatchLockService watchLockService, Clock clock, ThreadPool threadPool) {
super(settings);

View File

@ -5,21 +5,19 @@
*/
package org.elasticsearch.xpack.watcher.execution;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Stream;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Stream;
public class InternalWatchExecutor implements WatchExecutor {
public static final String THREAD_POOL_NAME = XPackPlugin.WATCHER;
private final ThreadPool threadPool;
@Inject
public InternalWatchExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.watcher.execution;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import static java.util.stream.StreamSupport.stream;
@ -22,11 +20,9 @@ public class SyncTriggerListener implements TriggerEngine.Listener {
private final ExecutionService executionService;
private final Logger logger;
@Inject
public SyncTriggerListener(Settings settings, ExecutionService executionService, TriggerService triggerService) {
public SyncTriggerListener(Settings settings, ExecutionService executionService) {
this.logger = Loggers.getLogger(SyncTriggerListener.class, settings);
this.executionService = executionService;
triggerService.register(this);
}
@Override

View File

@ -10,7 +10,6 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -53,7 +52,6 @@ public class TriggeredWatch implements ToXContent {
private final TriggerService triggerService;
@Inject
public Parser(Settings settings, TriggerService triggerService) {
super(settings);
this.triggerService = triggerService;

View File

@ -21,7 +21,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -29,7 +28,6 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
@ -61,11 +59,6 @@ public class TriggeredWatchStore extends AbstractComponent {
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public TriggeredWatchStore(Settings settings, InternalClient client, TriggeredWatch.Parser triggeredWatchParser) {
this(settings, new WatcherClientProxy(settings, client), triggeredWatchParser);
}
public TriggeredWatchStore(Settings settings, WatcherClientProxy client, TriggeredWatch.Parser triggeredWatchParser) {
super(settings);
this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 100);

View File

@ -11,7 +11,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
@ -42,10 +41,6 @@ public class HistoryStore extends AbstractComponent {
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
public HistoryStore(Settings settings, InternalClient client) {
this(settings, new WatcherClientProxy(settings, client));
}
public HistoryStore(Settings settings, WatcherClientProxy client) {
super(settings);
this.client = client;

View File

@ -15,7 +15,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -62,7 +61,6 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
private volatile Map<String, Settings> customIndexSettings;
@Inject
public WatcherIndexTemplateRegistry(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService,
ThreadPool threadPool, InternalClient client) {
super(settings);

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
@ -37,7 +36,6 @@ public class WatcherSearchTemplateService extends AbstractComponent {
private final ParseFieldMatcher parseFieldMatcher;
private final SearchRequestParsers searchRequestParsers;
@Inject
public WatcherSearchTemplateService(Settings settings, ScriptService scriptService, SearchRequestParsers searchRequestParsers) {
super(settings);
this.scriptService = scriptService;

View File

@ -1,47 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule;
import java.util.HashSet;
import java.util.Set;
public class TriggerModule extends AbstractModule {
private final Settings settings;
private final Set<Class<? extends TriggerEngine>> engines = new HashSet<>();
public TriggerModule(Settings settings) {
this.settings = settings;
registerStandardEngines();
}
public void registerEngine(Class<? extends TriggerEngine> engineType) {
engines.add(engineType);
}
protected void registerStandardEngines() {
registerEngine(ScheduleModule.triggerEngineType(settings));
registerEngine(ManualTriggerEngine.class);
}
@Override
protected void configure() {
Multibinder<TriggerEngine> mbinder = Multibinder.newSetBinder(binder(), TriggerEngine.class);
for (Class<? extends TriggerEngine> engine : engines) {
bind(engine).asEagerSingleton();
mbinder.addBinding().to(engine);
}
bind(TriggerService.class).asEagerSingleton();
}
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.watcher.trigger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
@ -27,7 +26,6 @@ public class TriggerService extends AbstractComponent {
private final Listeners listeners;
private final Map<String, TriggerEngine> engines;
@Inject
public TriggerService(Settings settings, Set<TriggerEngine> engines) {
super(settings);
listeners = new Listeners();

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.watcher.trigger.manual;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
@ -21,10 +20,6 @@ public class ManualTriggerEngine implements TriggerEngine<ManualTrigger, ManualT
static final String TYPE = "manual";
@Inject
public ManualTriggerEngine() {
}
@Override
public String type() {
return TYPE;

View File

@ -1,86 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.trigger.schedule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
public class ScheduleModule extends AbstractModule {
private final Map<String, Class<? extends Schedule.Parser>> parsers = new HashMap<>();
public ScheduleModule() {
registerScheduleParser(CronSchedule.TYPE, CronSchedule.Parser.class);
registerScheduleParser(DailySchedule.TYPE, DailySchedule.Parser.class);
registerScheduleParser(HourlySchedule.TYPE, HourlySchedule.Parser.class);
registerScheduleParser(IntervalSchedule.TYPE, IntervalSchedule.Parser.class);
registerScheduleParser(MonthlySchedule.TYPE, MonthlySchedule.Parser.class);
registerScheduleParser(WeeklySchedule.TYPE, WeeklySchedule.Parser.class);
registerScheduleParser(YearlySchedule.TYPE, YearlySchedule.Parser.class);
}
public static Class<? extends TriggerEngine> triggerEngineType(Settings nodeSettings) {
Engine engine = Engine.resolve(nodeSettings);
Loggers.getLogger(ScheduleModule.class, nodeSettings).debug("using [{}] schedule trigger engine",
engine.name().toLowerCase(Locale.ROOT));
return engine.engineType();
}
public void registerScheduleParser(String parserType, Class<? extends Schedule.Parser> parserClass) {
parsers.put(parserType, parserClass);
}
@Override
protected void configure() {
MapBinder<String, Schedule.Parser> mbinder = MapBinder.newMapBinder(binder(), String.class, Schedule.Parser.class);
for (Map.Entry<String, Class<? extends Schedule.Parser>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
mbinder.addBinding(entry.getKey()).to(entry.getValue());
}
bind(ScheduleRegistry.class).asEagerSingleton();
}
public enum Engine {
SCHEDULER() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return SchedulerScheduleTriggerEngine.class;
}
},
TICKER() {
@Override
protected Class<? extends TriggerEngine> engineType() {
return TickerScheduleTriggerEngine.class;
}
};
protected abstract Class<? extends TriggerEngine> engineType();
public static Engine resolve(Settings settings) {
String engine = settings.get("xpack.watcher.trigger.schedule.engine", "ticker");
switch (engine.toLowerCase(Locale.ROOT)) {
case "ticker" : return TICKER;
case "scheduler" : return SCHEDULER;
default:
return TICKER;
}
}
}
}

View File

@ -6,19 +6,18 @@
package org.elasticsearch.xpack.watcher.trigger.schedule;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class ScheduleRegistry {
private final Map<String, Schedule.Parser> parsers;
private final Map<String, Schedule.Parser> parsers = new HashMap<>();
@Inject
public ScheduleRegistry(Map<String, Schedule.Parser> parsers) {
this.parsers = parsers;
public ScheduleRegistry(Set<Schedule.Parser> parsers) {
parsers.stream().forEach(parser -> this.parsers.put(parser.type(), parser));
}
public Set<String> types() {

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
@ -27,7 +26,6 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final SchedulerEngine schedulerEngine;
@Inject
public SchedulerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry, clock);
this.schedulerEngine = new SchedulerEngine(clock);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
@ -32,7 +31,6 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
private volatile Map<String, ActiveSchedule> schedules;
private Ticker ticker;
@Inject
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry, clock);
this.tickInterval = settings.getAsTime("xpack.watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500));

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -219,7 +218,6 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final List<ActionWrapper> defaultActions;
private final Clock clock;
@Inject
public Parser(Settings settings, TriggerService triggerService, ActionRegistry actionRegistry, InputRegistry inputRegistry,
@Nullable CryptoService cryptoService, Clock clock) {

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -20,24 +19,19 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class WatchLockService extends AbstractComponent {
public static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "xpack.watcher.stop.timeout";
private final KeyedLock<String> watchLocks = new KeyedLock<>(true);
private final AtomicBoolean running = new AtomicBoolean(false);
private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "xpack.watcher.stop.timeout";
private final TimeValue maxStopTimeout;
@Inject
public WatchLockService(Settings settings){
super(settings);
maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT);
}
WatchLockService(TimeValue maxStopTimeout){
super(Settings.EMPTY);
this.maxStopTimeout = maxStopTimeout;
}
public Releasable acquire(String name) {
if (!running.get()) {
throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name);

View File

@ -1,20 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.xpack.watcher.WatcherService;
public class WatchModule extends AbstractModule {
@Override
protected void configure() {
bind(Watch.Parser.class).asEagerSingleton();
bind(WatchLockService.class).asEagerSingleton();
bind(WatcherService.class).asEagerSingleton();
bind(WatchStore.class).asEagerSingleton();
}
}

View File

@ -22,7 +22,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -36,7 +35,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
@ -66,11 +64,6 @@ public class WatchStore extends AbstractComponent {
private final int scrollSize;
private final TimeValue scrollTimeout;
@Inject
public WatchStore(Settings settings, InternalClient client, Watch.Parser watchParser) {
this(settings, new WatcherClientProxy(settings, client), watchParser);
}
public WatchStore(Settings settings, WatcherClientProxy client, Watch.Parser watchParser) {
super(settings);
this.client = client;

View File

@ -10,7 +10,6 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.WatcherModule;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;

View File

@ -68,7 +68,6 @@ import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.hamcrest.Matcher;
@ -89,7 +88,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -121,7 +119,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
private static Boolean securityEnabled;
private static ScheduleModule.Engine scheduleEngine;
private static String scheduleEngineName;
private boolean useSecurity3;
@ -134,15 +132,14 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
if (securityEnabled == null) {
securityEnabled = enableSecurity();
scheduleEngine = randomFrom(ScheduleModule.Engine.values());
scheduleEngineName = randomFrom("ticker", "scheduler");
}
return super.buildTestCluster(scope, seed);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
String scheduleImplName = scheduleEngine().name().toLowerCase(Locale.ROOT);
logger.info("using schedule engine [{}]", scheduleImplName);
logger.info("using schedule engine [{}]", scheduleEngineName);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
//TODO: for now lets isolate watcher tests from monitoring (randomize this later)
@ -152,7 +149,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
.put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100))
.put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100))
.put(SecuritySettings.settings(securityEnabled, useSecurity3))
.put("xpack.watcher.trigger.schedule.engine", scheduleImplName)
.put("xpack.watcher.trigger.schedule.engine", scheduleEngineName)
.put("script.inline", "true")
.build();
}
@ -232,13 +229,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
return securityEnabled;
}
/**
* @return The schedule trigger engine that will be used for the nodes.
*/
protected final ScheduleModule.Engine scheduleEngine() {
return scheduleEngine;
}
/**
* Override and returns {@code false} to force running without security
*/
@ -270,7 +260,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
@AfterClass
public static void _cleanupClass() {
securityEnabled = null;
scheduleEngine = null;
scheduleEngineName = null;
}
@Override

View File

@ -5,20 +5,18 @@
*/
package org.elasticsearch.xpack.watcher.test;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.execution.ExecutionModule;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.SyncTriggerListener;
import org.elasticsearch.xpack.watcher.execution.WatchExecutor;
import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.xpack.watcher.trigger.TriggerModule;
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.time.Clock;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Stream;
@ -32,44 +30,18 @@ public class TimeWarpedWatcher extends Watcher {
}
@Override
public Collection<Module> nodeModules() {
if (!enabled) {
return super.nodeModules();
}
List<Module> modules = new ArrayList<>(super.nodeModules());
for (int i = 0; i < modules.size(); ++i) {
Module module = modules.get(i);
if (module instanceof TriggerModule) {
// replacing scheduler module so we'll
// have control on when it fires a job
modules.set(i, new MockTriggerModule(settings));
} else if (module instanceof ExecutionModule) {
// replacing the execution module so all the watches will be
// executed on the same thread as the trigger engine
modules.set(i, new MockExecutionModule());
}
}
return modules;
}
public static class MockTriggerModule extends TriggerModule {
public MockTriggerModule(Settings settings) {
super(settings);
protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) {
return new ScheduleTriggerEngineMock(settings, scheduleRegistry, clock);
}
@Override
protected void registerStandardEngines() {
registerEngine(ScheduleTriggerEngineMock.class);
registerEngine(ManualTriggerEngine.class);
}
protected WatchExecutor getWatchExecutor(ThreadPool threadPool) {
return new SameThreadExecutor();
}
public static class MockExecutionModule extends ExecutionModule {
public MockExecutionModule() {
super(SameThreadExecutor.class, SyncTriggerListener.class);
@Override
protected TriggerEngine.Listener getTriggerEngineListener(ExecutionService executionService) {
return new SyncTriggerListener(settings, executionService);
}
public static class SameThreadExecutor implements WatchExecutor {
@ -95,5 +67,3 @@ public class TimeWarpedWatcher extends Watcher {
}
}
}
}

View File

@ -28,7 +28,7 @@ import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
@SuppressForbidden(reason = "benchmark")
@ -61,7 +61,7 @@ public class ScheduleEngineTriggerBenchmark {
for (int i = 0; i < numWatches; i++) {
jobs.add(new SimpleJob("job_" + i, interval(interval + "s")));
}
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptyMap());
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet());
List<String> impls = new ArrayList<>(Arrays.asList(new String[]{"schedule", "ticker"}));
Randomness.shuffle(impls);

View File

@ -6,28 +6,26 @@
package org.elasticsearch.xpack.watcher.test.bench;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchRequest;
import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.xpack.watcher.trigger.TriggerModule;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import java.io.IOException;
import java.util.ArrayList;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.httpInput;
@ -215,33 +213,11 @@ public class WatcherExecutorServiceBenchmark {
}
@Override
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>(super.nodeModules());
for (int i = 0; i < modules.size(); ++i) {
Module module = modules.get(i);
if (module instanceof TriggerModule) {
// replacing scheduler module so we'll
// have control on when it fires a job
modules.set(i, new MockTriggerModule(settings));
}
}
return modules;
protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) {
return new ScheduleTriggerEngineMock(settings, scheduleRegistry, clock);
}
public static class MockTriggerModule extends TriggerModule {
public MockTriggerModule(Settings settings) {
super(settings);
}
@Override
protected void registerStandardEngines() {
registerEngine(ScheduleTriggerEngineMock.class);
}
}
}
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.watcher.trigger;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -34,7 +33,6 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private final Logger logger;
private final ConcurrentMap<String, Job> jobs = new ConcurrentHashMap<>();
@Inject
public ScheduleTriggerEngineMock(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(settings, scheduleRegistry, clock);
this.logger = Loggers.getLogger(ScheduleTriggerEngineMock.class, settings);

View File

@ -12,8 +12,8 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.Before;
import java.util.HashMap;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -26,13 +26,13 @@ public class ScheduleRegistryTests extends ScheduleTestCase {
@Before
public void init() throws Exception {
Map<String, Schedule.Parser> parsers = new HashMap<>();
parsers.put(IntervalSchedule.TYPE, new IntervalSchedule.Parser());
parsers.put(CronSchedule.TYPE, new CronSchedule.Parser());
parsers.put(HourlySchedule.TYPE, new HourlySchedule.Parser());
parsers.put(DailySchedule.TYPE, new DailySchedule.Parser());
parsers.put(WeeklySchedule.TYPE, new WeeklySchedule.Parser());
parsers.put(MonthlySchedule.TYPE, new MonthlySchedule.Parser());
Set<Schedule.Parser> parsers = new HashSet<>();
parsers.add(new IntervalSchedule.Parser());
parsers.add(new CronSchedule.Parser());
parsers.add(new HourlySchedule.Parser());
parsers.add(new DailySchedule.Parser());
parsers.add(new WeeklySchedule.Parser());
parsers.add(new MonthlySchedule.Parser());
registry = new ScheduleRegistry(parsers);
}

View File

@ -5,16 +5,15 @@
*/
package org.elasticsearch.xpack.watcher.watch;
import junit.framework.AssertionFailedError;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.containsString;
@ -23,8 +22,12 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
public class WatchLockServiceTests extends ESTestCase {
private final Settings settings =
Settings.builder().put(WatchLockService.DEFAULT_MAX_STOP_TIMEOUT_SETTING, TimeValue.timeValueSeconds(1)).build();
public void testLockingNotStarted() {
WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
WatchLockService lockService = new WatchLockService(settings);
try {
lockService.acquire("_name");
fail("exception expected");
@ -34,7 +37,7 @@ public class WatchLockServiceTests extends ESTestCase {
}
public void testLocking() {
WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
WatchLockService lockService = new WatchLockService(settings);
lockService.start();
Releasable releasable = lockService.acquire("_name");
assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true));
@ -44,7 +47,7 @@ public class WatchLockServiceTests extends ESTestCase {
}
public void testLockingStopTimeout(){
final WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
final WatchLockService lockService = new WatchLockService(settings);
lockService.start();
lockService.acquire("_name");
try {
@ -56,7 +59,7 @@ public class WatchLockServiceTests extends ESTestCase {
}
public void testLockingFair() throws Exception {
final WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
final WatchLockService lockService = new WatchLockService(settings);
lockService.start();
final AtomicInteger value = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();

View File

@ -117,8 +117,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
@ -371,28 +373,28 @@ public class WatchTests extends ESTestCase {
}
private static ScheduleRegistry registry(Schedule schedule) {
Map<String, Schedule.Parser> parsers = new HashMap<>();
Set<Schedule.Parser> parsers = new HashSet<>();
switch (schedule.type()) {
case CronSchedule.TYPE:
parsers.put(CronSchedule.TYPE, new CronSchedule.Parser());
parsers.add(new CronSchedule.Parser());
return new ScheduleRegistry(parsers);
case HourlySchedule.TYPE:
parsers.put(HourlySchedule.TYPE, new HourlySchedule.Parser());
parsers.add(new HourlySchedule.Parser());
return new ScheduleRegistry(parsers);
case DailySchedule.TYPE:
parsers.put(DailySchedule.TYPE, new DailySchedule.Parser());
parsers.add(new DailySchedule.Parser());
return new ScheduleRegistry(parsers);
case WeeklySchedule.TYPE:
parsers.put(WeeklySchedule.TYPE, new WeeklySchedule.Parser());
parsers.add(new WeeklySchedule.Parser());
return new ScheduleRegistry(parsers);
case MonthlySchedule.TYPE:
parsers.put(MonthlySchedule.TYPE, new MonthlySchedule.Parser());
parsers.add(new MonthlySchedule.Parser());
return new ScheduleRegistry(parsers);
case YearlySchedule.TYPE:
parsers.put(YearlySchedule.TYPE, new YearlySchedule.Parser());
parsers.add(new YearlySchedule.Parser());
return new ScheduleRegistry(parsers);
case IntervalSchedule.TYPE:
parsers.put(IntervalSchedule.TYPE, new IntervalSchedule.Parser());
parsers.add(new IntervalSchedule.Parser());
return new ScheduleRegistry(parsers);
default:
throw new IllegalArgumentException("unknown schedule [" + schedule + "]");