Watcher: Remote WatcherClientProxy/ClientProxy class (elastic/x-pack-elasticsearch#1561)

This commit removes ClientProxy and WatcherClientProxy classes. They
were added in times, where there were issues with guice and circular
dependencies. However there is no guice anymore and on top of that
the classes do not add any value.

We can switch to use a regular client, but have to make sure that
the InternalClient is injected in all the transport actions as those
is able to query data, when security is enabled.

Original commit: elastic/x-pack-elasticsearch@763a79b2f7
This commit is contained in:
Alexander Reelsen 2017-06-01 16:30:21 +02:00 committed by GitHub
parent 34f526b60b
commit 730cfd7c7a
32 changed files with 373 additions and 448 deletions

View File

@ -1,49 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.common.init.proxy;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.xpack.security.InternalClient;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
* needs to injected to be avoid circular dependencies issues.
*/
public class ClientProxy {
protected final InternalClient client;
public ClientProxy(InternalClient client) {
this.client = client;
}
public AdminClient admin() {
return client.admin();
}
public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
client.bulk(preProcess(request), listener);
}
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
protected <M extends TransportMessage> M preProcess(M message) {
return message;
}
public static InternalClient fromClient(Client client) {
return client instanceof InternalClient ? (InternalClient) client :
new InternalClient(client.settings(), client.threadPool(), client);
}
}

View File

@ -112,7 +112,6 @@ import org.elasticsearch.xpack.watcher.rest.action.RestWatchServiceAction;
import org.elasticsearch.xpack.watcher.rest.action.RestWatcherStatsAction;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
@ -267,10 +266,9 @@ public class Watcher implements ActionPlugin {
final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories);
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry));
final WatcherClientProxy watcherClientProxy = new WatcherClientProxy(settings, internalClient);
final WatcherClient watcherClient = new WatcherClient(internalClient);
final HistoryStore historyStore = new HistoryStore(settings, watcherClientProxy);
final HistoryStore historyStore = new HistoryStore(settings, internalClient);
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
scheduleParsers.add(new CronSchedule.Parser());
scheduleParsers.add(new DailySchedule.Parser());
@ -290,7 +288,7 @@ public class Watcher implements ActionPlugin {
final TriggerService triggerService = new TriggerService(settings, triggerEngines);
final TriggeredWatch.Parser triggeredWatchParser = new TriggeredWatch.Parser(settings, triggerService);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, watcherClientProxy, triggeredWatchParser);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, internalClient, triggeredWatchParser);
final WatcherSearchTemplateService watcherSearchTemplateService =
new WatcherSearchTemplateService(settings, scriptService, xContentRegistry);
@ -298,7 +296,7 @@ public class Watcher implements ActionPlugin {
final Watch.Parser watchParser = new Watch.Parser(settings, triggerService, registry, inputRegistry, cryptoService, clock);
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
clock, threadPool, watchParser, clusterService, watcherClientProxy);
clock, threadPool, watchParser, clusterService, internalClient);
final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
triggerService.register(triggerEngineListener);
@ -307,7 +305,7 @@ public class Watcher implements ActionPlugin {
clusterService.getClusterSettings(), clusterService, threadPool, internalClient);
WatcherService watcherService = new WatcherService(settings, triggerService, triggeredWatchStore, executionService,
watchParser, watcherClientProxy);
watchParser, internalClient);
final WatcherLifeCycleService watcherLifeCycleService =
new WatcherLifeCycleService(settings, threadPool, clusterService, watcherService);
@ -317,8 +315,7 @@ public class Watcher implements ActionPlugin {
return Arrays.asList(registry, watcherClient, inputRegistry, historyStore, triggerService, triggeredWatchParser,
watcherLifeCycleService, executionService, triggerEngineListener, watcherService, watchParser,
configuredTriggerEngine, triggeredWatchStore, watcherSearchTemplateService, watcherClientProxy,
watcherIndexTemplateRegistry);
configuredTriggerEngine, triggeredWatchStore, watcherSearchTemplateService, watcherIndexTemplateRegistry);
}
protected TriggerEngine getTriggerEngine(Clock clock, ScheduleRegistry scheduleRegistry) {
@ -376,6 +373,7 @@ public class Watcher implements ActionPlugin {
settings.add(Setting.simpleString("xpack.watcher.internal.ops.bulk.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.internal.ops.index.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.actions.index.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.actions.bulk.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.index.rest.direct_access", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.input.search.default_timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.transform.search.default_timeout", Setting.Property.NodeScope));

View File

@ -137,7 +137,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
List<String> previousAllocationIds = event.previousState().getRoutingNodes().node(localNode.getId())
.shardsWithState(Watch.INDEX, RELOCATING, STARTED).stream()
.shardsWithState(watchIndex, RELOCATING, STARTED).stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());

View File

@ -11,8 +11,11 @@ import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
@ -27,10 +30,10 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatch;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
@ -60,18 +63,20 @@ public class WatcherService extends AbstractComponent {
private final TimeValue scrollTimeout;
private final int scrollSize;
private final Watch.Parser parser;
private final WatcherClientProxy client;
private final Client client;
// package-private for testing
final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STOPPED);
private final TimeValue defaultSearchTimeout;
public WatcherService(Settings settings, TriggerService triggerService, TriggeredWatchStore triggeredWatchStore,
ExecutionService executionService, Watch.Parser parser, WatcherClientProxy client) {
ExecutionService executionService, Watch.Parser parser, InternalClient client) {
super(settings);
this.triggerService = triggerService;
this.triggeredWatchStore = triggeredWatchStore;
this.executionService = executionService;
this.scrollTimeout = settings.getAsTime("xpack.watcher.watch.scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = settings.getAsInt("xpack.watcher.watch.scroll.size", 100);
this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
this.parser = parser;
this.client = client;
}
@ -194,7 +199,8 @@ public class WatcherService extends AbstractComponent {
return Collections.emptyList();
}
RefreshResponse refreshResponse = client.refresh(new RefreshRequest(INDEX));
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(INDEX))
.actionGet(TimeValue.timeValueSeconds(5));
if (refreshResponse.getSuccessfulShards() < indexMetaData.getNumberOfShards()) {
throw illegalState("not all required shards have been refreshed");
}
@ -220,7 +226,7 @@ public class WatcherService extends AbstractComponent {
.size(scrollSize)
.sort(SortBuilders.fieldSort("_doc"))
.version(true));
SearchResponse response = client.search(searchRequest);
SearchResponse response = client.search(searchRequest).actionGet(defaultSearchTimeout);
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
throw new ElasticsearchException("Partial response while loading watches");
@ -269,10 +275,14 @@ public class WatcherService extends AbstractComponent {
logger.error((Supplier<?>) () -> new ParameterizedMessage("couldn't load watch [{}], ignoring it...", id), e);
}
}
response = client.searchScroll(response.getScrollId(), scrollTimeout);
SearchScrollRequest request = new SearchScrollRequest(response.getScrollId());
request.scroll(scrollTimeout);
response = client.searchScroll(request).actionGet(defaultSearchTimeout);
}
} finally {
client.clearScroll(response.getScrollId());
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
}
logger.debug("Loaded [{}] watches for execution", watches.size());

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
@ -23,7 +24,6 @@ import org.elasticsearch.xpack.watcher.actions.ExecutableAction;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.ArrayObjectIterator;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
@ -31,6 +31,7 @@ import org.joda.time.DateTime;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -38,15 +39,18 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
public static final String ID_FIELD = "_id";
private static final String ID_FIELD = "_id";
private final WatcherClientProxy client;
private final TimeValue timeout;
private final Client client;
private final TimeValue indexDefaultTimeout;
private final TimeValue bulkDefaultTimeout;
public ExecutableIndexAction(IndexAction action, Logger logger, WatcherClientProxy client, @Nullable TimeValue defaultTimeout) {
public ExecutableIndexAction(IndexAction action, Logger logger, Client client,
TimeValue indexDefaultTimeout, TimeValue bulkDefaultTimeout) {
super(action, logger);
this.client = client;
this.timeout = action.timeout != null ? action.timeout : defaultTimeout;
this.indexDefaultTimeout = action.timeout != null ? action.timeout : indexDefaultTimeout;
this.bulkDefaultTimeout = action.timeout != null ? action.timeout : bulkDefaultTimeout;
}
@Override
@ -99,7 +103,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
XContentType.JSON));
}
response = client.index(indexRequest, timeout);
response = client.index(indexRequest).get(indexDefaultTimeout.millis(), TimeUnit.MILLISECONDS);
try (XContentBuilder builder = jsonBuilder()) {
indexResponseToXContent(builder, response);
bytesReference = builder.bytes();
@ -132,7 +136,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, action.timeout);
BulkResponse bulkResponse = client.bulk(bulkRequest).get(bulkDefaultTimeout.millis(), TimeUnit.MILLISECONDS);
try (XContentBuilder jsonBuilder = jsonBuilder().startArray()) {
for (BulkItemResponse item : bulkResponse) {
itemResponseToXContent(jsonBuilder, item);

View File

@ -5,33 +5,31 @@
*/
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.ActionFactory;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import java.io.IOException;
public class IndexActionFactory extends ActionFactory {
private final WatcherClientProxy client;
private final TimeValue defaultTimeout;
private final Client client;
private final TimeValue indexDefaultTimeout;
private final TimeValue bulkDefaultTimeout;
public IndexActionFactory(Settings settings, InternalClient client) {
this(settings, new WatcherClientProxy(settings, client));
}
public IndexActionFactory(Settings settings, WatcherClientProxy client ) {
public IndexActionFactory(Settings settings, Client client) {
super(Loggers.getLogger(IndexActionFactory.class, settings));
this.client = client;
this.defaultTimeout = settings.getAsTime("xpack.watcher.actions.index.default_timeout", null);
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.actions.index.default_timeout", TimeValue.timeValueSeconds(30));
this.bulkDefaultTimeout = settings.getAsTime("xpack.watcher.actions.bulk.default_timeout", TimeValue.timeValueMinutes(1));
}
@Override
public ExecutableIndexAction parseExecutable(String watchId, String actionId, XContentParser parser) throws IOException {
return new ExecutableIndexAction(IndexAction.parse(watchId, actionId, parser), actionLogger, client, defaultTimeout);
return new ExecutableIndexAction(IndexAction.parse(watchId, actionId, parser), actionLogger, client,
indexDefaultTimeout, bulkDefaultTimeout);
}
}

View File

@ -8,8 +8,13 @@ package org.elasticsearch.xpack.watcher.execution;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.metrics.MeanMetric;
@ -17,8 +22,12 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.watcher.Watcher;
@ -27,7 +36,6 @@ import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
@ -65,14 +73,15 @@ public class ExecutionService extends AbstractComponent {
private final ThreadPool threadPool;
private final Watch.Parser parser;
private final ClusterService clusterService;
private final WatcherClientProxy client;
private final Client client;
private final TimeValue indexDefaultTimeout;
private volatile CurrentExecutions currentExecutions;
private final AtomicBoolean started = new AtomicBoolean(false);
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
Clock clock, ThreadPool threadPool, Watch.Parser parser, ClusterService clusterService,
WatcherClientProxy client) {
Client client) {
super(settings);
this.historyStore = historyStore;
this.triggeredWatchStore = triggeredWatchStore;
@ -84,6 +93,7 @@ public class ExecutionService extends AbstractComponent {
this.parser = parser;
this.clusterService = clusterService;
this.client = client;
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30));
}
public void start(ClusterState state) throws Exception {
@ -186,7 +196,7 @@ public class ExecutionService extends AbstractComponent {
threadPool.generic().execute(() -> {
for (TriggerEvent event : events) {
GetResponse response = client.getWatch(event.jobName());
GetResponse response = getWatch(event.jobName());
if (response.isExists() == false) {
logger.warn("unable to find watch [{}] in watch index, perhaps it has been deleted", event.jobName());
} else {
@ -227,7 +237,7 @@ public class ExecutionService extends AbstractComponent {
DateTime now = new DateTime(clock.millis(), UTC);
for (TriggerEvent event : events) {
GetResponse response = client.getWatch(event.jobName());
GetResponse response = getWatch(event.jobName());
if (response.isExists() == false) {
logger.warn("unable to find watch [{}] in watch index, perhaps it has been deleted", event.jobName());
continue;
@ -263,7 +273,7 @@ public class ExecutionService extends AbstractComponent {
} else {
boolean watchExists = false;
try {
GetResponse response = client.getWatch(ctx.watch().id());
GetResponse response = getWatch(ctx.watch().id());
watchExists = response.isExists();
} catch (IndexNotFoundException e) {}
@ -277,7 +287,7 @@ public class ExecutionService extends AbstractComponent {
record = executeInner(ctx);
if (ctx.recordExecution()) {
client.updateWatchStatus(ctx.watch());
updateWatchStatus(ctx.watch());
}
}
}
@ -308,6 +318,32 @@ public class ExecutionService extends AbstractComponent {
return record;
}
/**
* Updates and persists the status of the given watch
*
* If the watch is missing (because it might have been deleted by the user during an execution), then this method
* does nothing and just returns without throwing an exception
*/
public void updateWatchStatus(Watch watch) throws IOException {
// at the moment we store the status together with the watch,
// so we just need to update the watch itself
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(Watch.INCLUDE_STATUS_KEY, "true"));
XContentBuilder source = JsonXContent.contentBuilder().
startObject()
.field(Watch.Field.STATUS.getPreferredName(), watch.status(), params)
.endObject();
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
updateRequest.doc(source);
updateRequest.version(watch.version());
try {
client.update(updateRequest).actionGet(indexDefaultTimeout);
} catch (DocumentMissingException e) {
// do not rethrow this exception, otherwise the watch history will contain an exception
// even though the execution might have been fine
}
}
private WatchRecord createWatchRecord(WatchRecord existingRecord, WatchExecutionContext ctx, Exception e) {
// it is possible that the watch store update failed, the execution phase is finished
if (ctx.executionPhase().sealed()) {
@ -326,7 +362,7 @@ public class ExecutionService extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to execute watch [{}]", ctx.watch().id()), e);
} else {
logger.warn("Failed to execute watch [{}]", ctx.watch().id());
logger.warn("failed to execute watch [{}]", ctx.watch().id());
}
}
@ -425,7 +461,7 @@ public class ExecutionService extends AbstractComponent {
assert triggeredWatches != null;
int counter = 0;
for (TriggeredWatch triggeredWatch : triggeredWatches) {
GetResponse response = client.getWatch(triggeredWatch.id().watchId());
GetResponse response = getWatch(triggeredWatch.id().watchId());
if (response.isExists() == false) {
String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() +
"], perhaps it has been deleted, ignoring...";
@ -449,6 +485,18 @@ public class ExecutionService extends AbstractComponent {
logger.debug("triggered execution of [{}] watches", counter);
}
/**
* Gets a watch but in a synchronous way, so that no async calls need to be built
* @param id The id of watch
* @return The GetResponse of calling the get API of this watch
*/
private GetResponse getWatch(String id) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true);
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
client.get(getRequest, future);
return future.actionGet();
}
public Map<String, Object> usageStats() {
Counters counters = new Counters();
counters.inc("execution.actions._all.total", totalExecutionsTime.count());

View File

@ -14,8 +14,11 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.Preference;
@ -28,7 +31,6 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
@ -39,6 +41,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -54,7 +57,7 @@ public class TriggeredWatchStore extends AbstractComponent {
public static final String DOC_TYPE = "doc";
private final int scrollSize;
private final WatcherClientProxy client;
private final Client client;
private final TimeValue scrollTimeout;
private final TriggeredWatch.Parser triggeredWatchParser;
@ -62,12 +65,16 @@ public class TriggeredWatchStore extends AbstractComponent {
private final Lock accessLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
private final TimeValue defaultBulkTimeout;
private final TimeValue defaultSearchTimeout;
public TriggeredWatchStore(Settings settings, WatcherClientProxy client, TriggeredWatch.Parser triggeredWatchParser) {
public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser) {
super(settings);
this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 1000);
this.client = client;
this.scrollTimeout = settings.getAsTime("xpack.watcher.execution.scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollTimeout = settings.getAsTime("xpack.watcher.execution.scroll.timeout", TimeValue.timeValueMinutes(5));
this.defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
this.triggeredWatchParser = triggeredWatchParser;
this.started.set(true);
}
@ -156,7 +163,7 @@ public class TriggeredWatchStore extends AbstractComponent {
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
}
BulkResponse response = client.bulk(request, (TimeValue) null);
BulkResponse response = client.bulk(request).get(defaultBulkTimeout.millis(), TimeUnit.MILLISECONDS);
BitSet successFullSlots = new BitSet(triggeredWatches.size());
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
@ -205,7 +212,7 @@ public class TriggeredWatchStore extends AbstractComponent {
}
try {
client.refresh(new RefreshRequest(TriggeredWatchStore.INDEX_NAME));
client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStore.INDEX_NAME)).actionGet(TimeValue.timeValueSeconds(5));
} catch (IndexNotFoundException e) {
// no index, no problems, we dont need to search further
return Collections.emptyList();
@ -222,7 +229,7 @@ public class TriggeredWatchStore extends AbstractComponent {
.sort(SortBuilders.fieldSort("_doc"))
.version(true));
SearchResponse response = client.search(searchRequest);
SearchResponse response = client.search(searchRequest).actionGet(defaultSearchTimeout);
logger.debug("trying to find triggered watches for ids {}: found [{}] docs", ids, response.getHits().getTotalHits());
try {
while (response.getHits().getHits().length != 0) {
@ -233,10 +240,14 @@ public class TriggeredWatchStore extends AbstractComponent {
triggeredWatches.add(triggeredWatch);
}
}
response = client.searchScroll(response.getScrollId(), scrollTimeout);
SearchScrollRequest request = new SearchScrollRequest(response.getScrollId());
request.scroll(scrollTimeout);
response = client.searchScroll(request).actionGet(defaultSearchTimeout);
}
} finally {
client.clearScroll(response.getScrollId());
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
}
return triggeredWatches;

View File

@ -8,17 +8,16 @@ package org.elasticsearch.xpack.watcher.history;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import org.joda.time.DateTime;
@ -27,6 +26,9 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -42,14 +44,14 @@ public class HistoryStore extends AbstractComponent {
static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd");
private final WatcherClientProxy client;
private final Client client;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
public HistoryStore(Settings settings, WatcherClientProxy client) {
public HistoryStore(Settings settings, Client client) {
super(settings);
this.client = client;
}
@ -83,7 +85,7 @@ public class HistoryStore extends AbstractComponent {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(builder)
.opType(IndexRequest.OpType.CREATE);
client.index(request, (TimeValue) null);
client.index(request).actionGet(30, TimeUnit.SECONDS);
logger.debug("indexed watch history record [{}]", watchRecord.id().value());
} catch (IOException ioe) {
throw ioException("failed to persist watch record [{}]", ioe, watchRecord);
@ -109,7 +111,7 @@ public class HistoryStore extends AbstractComponent {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(builder)
.opType(IndexRequest.OpType.CREATE);
client.index(request, (TimeValue) null);
client.index(request).get(30, TimeUnit.SECONDS);
logger.debug("indexed watch history record [{}]", watchRecord.id().value());
} catch (VersionConflictEngineException vcee) {
watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
@ -117,11 +119,11 @@ public class HistoryStore extends AbstractComponent {
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(xContentBuilder.value(watchRecord));
client.index(request, (TimeValue) null);
client.index(request).get(30, TimeUnit.SECONDS);
}
logger.debug("overwrote watch history record [{}]", watchRecord.id().value());
}
} catch (IOException ioe) {
} catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) {
final WatchRecord wr = watchRecord;
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe);
} finally {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.input.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -18,15 +19,16 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.watcher.input.search.SearchInput.TYPE;
@ -37,12 +39,12 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
private final WatcherClientProxy client;
private final Client client;
private final WatcherSearchTemplateService searchTemplateService;
@Nullable private final TimeValue timeout;
private final TimeValue timeout;
public ExecutableSearchInput(SearchInput input, Logger logger, WatcherClientProxy client,
WatcherSearchTemplateService searchTemplateService, @Nullable TimeValue defaultTimeout) {
public ExecutableSearchInput(SearchInput input, Logger logger, Client client,
WatcherSearchTemplateService searchTemplateService, TimeValue defaultTimeout) {
super(input, logger);
this.client = client;
this.searchTemplateService = searchTemplateService;
@ -69,7 +71,8 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), request.getSearchSource().utf8ToString());
}
SearchResponse response = client.search(searchTemplateService.toSearchRequest(request), timeout);
SearchResponse response = client.search(searchTemplateService.toSearchRequest(request))
.get(timeout.millis(), TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());

View File

@ -5,34 +5,28 @@
*/
package org.elasticsearch.xpack.watcher.input.search;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.input.InputFactory;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import java.io.IOException;
public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Result, ExecutableSearchInput> {
private final WatcherClientProxy client;
private final Client client;
private final TimeValue defaultTimeout;
private final WatcherSearchTemplateService searchTemplateService;
public SearchInputFactory(Settings settings, InternalClient client, NamedXContentRegistry xContentRegistry,
ScriptService scriptService) {
this(settings, new WatcherClientProxy(settings, client), xContentRegistry, scriptService);
}
public SearchInputFactory(Settings settings, WatcherClientProxy client, NamedXContentRegistry xContentRegistry,
ScriptService scriptService) {
public SearchInputFactory(Settings settings, Client client, NamedXContentRegistry xContentRegistry,
ScriptService scriptService) {
super(Loggers.getLogger(ExecutableSearchInput.class, settings));
this.client = client;
this.defaultTimeout = settings.getAsTime("xpack.watcher.input.search.default_timeout", null);
this.defaultTimeout = settings.getAsTime("xpack.watcher.input.search.default_timeout", TimeValue.timeValueMinutes(1));
this.searchTemplateService = new WatcherSearchTemplateService(settings, scriptService, xContentRegistry);
}

View File

@ -5,11 +5,9 @@
*/
package org.elasticsearch.xpack.watcher.support;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -26,7 +24,6 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.template.TemplateUtils;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@ -57,7 +54,7 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
new TemplateConfig(WATCHES_TEMPLATE_NAME, "watches", WATCHES_TEMPLATE_SETTING)
};
private final WatcherClientProxy client;
private final InternalClient client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TemplateConfig[] indexTemplates;
@ -67,7 +64,7 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
public WatcherIndexTemplateRegistry(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService,
ThreadPool threadPool, InternalClient client) {
super(settings);
this.client = new WatcherClientProxy(settings, client);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexTemplates = TEMPLATE_CONFIGS;
@ -168,7 +165,7 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
.build();
request.settings(updatedSettings);
}
client.putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
if (response.isAcknowledged() == false) {

View File

@ -1,184 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.support.init.proxy;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
import java.util.Collections;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
* needs to injected to be avoid circular dependencies issues.
*/
public class WatcherClientProxy extends ClientProxy {
private final TimeValue defaultSearchTimeout;
private final TimeValue defaultIndexTimeout;
private final TimeValue defaultBulkTimeout;
public WatcherClientProxy(Settings settings, InternalClient client) {
super(client);
defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
defaultIndexTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(60));
defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
}
/**
* Creates a proxy to the given internal client (can be used for testing)
*/
public static WatcherClientProxy of(Client client) {
return new WatcherClientProxy(Settings.EMPTY, client instanceof InternalClient ? (InternalClient) client :
new InternalClient(client.settings(), client.threadPool(), client));
}
public IndexResponse index(IndexRequest request, TimeValue timeout) {
if (timeout == null) {
timeout = defaultIndexTimeout;
}
return client.index(preProcess(request)).actionGet(timeout);
}
public UpdateResponse update(UpdateRequest request) {
return client.update(preProcess(request)).actionGet(defaultIndexTimeout);
}
public void update(UpdateRequest request, ActionListener<UpdateResponse> listener) {
client.update(preProcess(request), listener);
}
public BulkResponse bulk(BulkRequest request, TimeValue timeout) {
if (timeout == null) {
timeout = defaultBulkTimeout;
}
return client.bulk(preProcess(request)).actionGet(timeout);
}
public void index(IndexRequest request, ActionListener<IndexResponse> listener) {
client.index(preProcess(request), listener);
}
public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
client.bulk(preProcess(request), listener);
}
public DeleteResponse delete(DeleteRequest request) {
return client.delete(preProcess(request)).actionGet(defaultIndexTimeout);
}
public SearchResponse search(SearchRequest request) {
return search(request, defaultSearchTimeout);
}
public SearchResponse search(SearchRequest request, TimeValue timeout) {
if (timeout == null) {
timeout = defaultSearchTimeout;
}
return client.search(preProcess(request)).actionGet(timeout);
}
public SearchResponse searchScroll(String scrollId, TimeValue timeout) {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout);
return client.searchScroll(preProcess(request)).actionGet(defaultSearchTimeout);
}
public ClearScrollResponse clearScroll(String scrollId) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
return client.clearScroll(preProcess(request)).actionGet(defaultSearchTimeout);
}
public RefreshResponse refresh(RefreshRequest request) {
return client.admin().indices().refresh(preProcess(request)).actionGet(defaultSearchTimeout);
}
public void putTemplate(PutIndexTemplateRequest request, ActionListener<PutIndexTemplateResponse> listener) {
client.admin().indices().putTemplate(preProcess(request), listener);
}
public void deleteTemplate(DeleteIndexTemplateRequest request, ActionListener<DeleteIndexTemplateResponse> listener) {
client.admin().indices().deleteTemplate(preProcess(request), listener);
}
public GetResponse getWatch(String id) {
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
getWatch(id, future);
return future.actionGet();
}
public void getWatch(String id, ActionListener<GetResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true);
getRequest.realtime(true);
client.get(preProcess(getRequest), listener);
}
public void deleteWatch(String id, ActionListener<DeleteResponse> listener) {
DeleteRequest request = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, id);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(preProcess(request), listener);
}
/**
* Updates and persists the status of the given watch
*
* If the watch is missing (because it might have been deleted by the user during an execution), then this method
* does nothing and just returns without throwing an exception
*/
public void updateWatchStatus(Watch watch) throws IOException {
// at the moment we store the status together with the watch,
// so we just need to update the watch itself
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(Watch.INCLUDE_STATUS_KEY, "true"));
XContentBuilder source = JsonXContent.contentBuilder().
startObject()
.field(Watch.Field.STATUS.getPreferredName(), watch.status(), params)
.endObject();
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
updateRequest.doc(source);
updateRequest.version(watch.version());
try {
this.update(updateRequest);
} catch (DocumentMissingException e) {
// do not rethrow this exception, otherwise the watch history will contain an exception
// even though the execution might have been fine
}
}
}

View File

@ -10,30 +10,31 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.script.Script;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.watcher.transform.search.SearchTransform.TYPE;
public class ExecutableSearchTransform extends ExecutableTransform<SearchTransform, SearchTransform.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
protected final WatcherClientProxy client;
private final WatcherSearchTemplateService searchTemplateService;
@Nullable protected final TimeValue timeout;
protected final Client client;
protected final WatcherSearchTemplateService searchTemplateService;
protected final TimeValue timeout;
public ExecutableSearchTransform(SearchTransform transform, Logger logger, WatcherClientProxy client,
WatcherSearchTemplateService searchTemplateService, @Nullable TimeValue defaultTimeout) {
public ExecutableSearchTransform(SearchTransform transform, Logger logger, Client client,
WatcherSearchTemplateService searchTemplateService, TimeValue defaultTimeout) {
super(transform, logger);
this.client = client;
this.searchTemplateService = searchTemplateService;
@ -48,7 +49,8 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
String renderedTemplate = searchTemplateService.renderTemplate(template, ctx, payload);
// We need to make a copy, so that we don't modify the original instance that we keep around in a watch:
request = new WatcherSearchTemplateRequest(transform.getRequest(), new BytesArray(renderedTemplate));
SearchResponse resp = client.search(searchTemplateService.toSearchRequest(request), timeout);
SearchResponse resp = client.search(searchTemplateService.toSearchRequest(request))
.get(timeout.millis(), TimeUnit.MILLISECONDS);
return new SearchTransform.Result(request, new Payload.XContent(resp));
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute [{}] transform for [{}]", TYPE, ctx.id()), e);

View File

@ -5,34 +5,29 @@
*/
package org.elasticsearch.xpack.watcher.transform.search;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
import java.io.IOException;
public class SearchTransformFactory extends TransformFactory<SearchTransform, SearchTransform.Result, ExecutableSearchTransform> {
protected final WatcherClientProxy client;
protected final Client client;
private final TimeValue defaultTimeout;
private final WatcherSearchTemplateService searchTemplateService;
public SearchTransformFactory(Settings settings, InternalClient client, NamedXContentRegistry xContentRegistry,
ScriptService scriptService) {
this(settings, new WatcherClientProxy(settings, client), xContentRegistry, scriptService);
}
public SearchTransformFactory(Settings settings, WatcherClientProxy client, NamedXContentRegistry xContentRegistry,
public SearchTransformFactory(Settings settings, Client client, NamedXContentRegistry xContentRegistry,
ScriptService scriptService) {
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));
this.client = client;
this.defaultTimeout = settings.getAsTime("xpack.watcher.transform.search.default_timeout", null);
this.defaultTimeout = settings.getAsTime("xpack.watcher.transform.search.default_timeout", TimeValue.timeValueMinutes(1));
this.searchTemplateService = new WatcherSearchTemplateService(settings, scriptService, xContentRegistry);
}

View File

@ -7,10 +7,13 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
@ -19,8 +22,8 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
@ -36,12 +39,12 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
private final Clock clock;
private final Watch.Parser parser;
private final WatcherClientProxy client;
private final Client client;
@Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
Watch.Parser parser, WatcherClientProxy client) {
Watch.Parser parser, InternalClient client) {
super(settings, AckWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, AckWatchRequest::new);
this.clock = clock;
@ -51,7 +54,10 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
@Override
protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) {
client.getWatch(request.getWatchId(), ActionListener.wrap((response) -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);
client.get(getRequest, ActionListener.wrap((response) -> {
if (response.isExists() == false) {
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
} else {

View File

@ -7,10 +7,13 @@ package org.elasticsearch.xpack.watcher.transport.actions.activate;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -18,7 +21,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
@ -38,13 +41,13 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
private final Clock clock;
private final Watch.Parser parser;
private final WatcherClientProxy client;
private final Client client;
@Inject
public TransportActivateWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Clock clock,
XPackLicenseState licenseState, Watch.Parser parser,
WatcherClientProxy client) {
InternalClient client) {
super(settings, ActivateWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, ActivateWatchRequest::new);
this.clock = clock;
@ -66,7 +69,9 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
updateRequest.retryOnConflict(2);
client.update(updateRequest, ActionListener.wrap(updateResponse -> {
client.getWatch(request.getWatchId(), ActionListener.wrap(getResponse -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists()) {
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON);

View File

@ -7,14 +7,18 @@ package org.elasticsearch.xpack.watcher.transport.actions.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.watch.Watch;
/**
* Performs the delete operation. This inherits directly from HandledTransportAction, because deletion should always work
@ -22,12 +26,12 @@ import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
*/
public class TransportDeleteWatchAction extends HandledTransportAction<DeleteWatchRequest, DeleteWatchResponse> {
private final WatcherClientProxy client;
private final Client client;
@Inject
public TransportDeleteWatchAction(Settings settings, TransportService transportService,ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
WatcherClientProxy client) {
InternalClient client) {
super(settings, DeleteWatchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
DeleteWatchRequest::new);
this.client = client;
@ -35,7 +39,9 @@ public class TransportDeleteWatchAction extends HandledTransportAction<DeleteWat
@Override
protected void doExecute(DeleteWatchRequest request, ActionListener<DeleteWatchResponse> listener) {
client.deleteWatch(request.getId(), ActionListener.wrap(deleteResponse -> {
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.wrap(deleteResponse -> {
boolean deleted = deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleted);
listener.onResponse(response);

View File

@ -9,8 +9,11 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -20,13 +23,13 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.ActionExecutionMode;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.ManualExecutionContext;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
@ -51,13 +54,13 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
private final Clock clock;
private final TriggerService triggerService;
private final Watch.Parser watchParser;
private final WatcherClientProxy client;
private final Client client;
@Inject
public TransportExecuteWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ExecutionService executionService, Clock clock, XPackLicenseState licenseState,
Watch.Parser watchParser, WatcherClientProxy client, TriggerService triggerService) {
Watch.Parser watchParser, InternalClient client, TriggerService triggerService) {
super(settings, ExecuteWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, ExecuteWatchRequest::new);
this.executionService = executionService;
@ -70,7 +73,10 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
@Override
protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener) {
if (request.getId() != null) {
client.getWatch(request.getId(), ActionListener.wrap(response -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);
client.get(getRequest, ActionListener.wrap(response -> {
if (response.isExists()) {
Watch watch = watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(), request.getXContentType());
watch.version(response.getVersion());

View File

@ -6,8 +6,11 @@
package org.elasticsearch.xpack.watcher.transport.actions.get;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,7 +19,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.Watch;
@ -31,12 +34,12 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
private final Watch.Parser parser;
private final Clock clock;
private final WatcherClientProxy client;
private final Client client;
@Inject
public TransportGetWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, XPackLicenseState licenseState,
Watch.Parser parser, Clock clock, WatcherClientProxy client) {
Watch.Parser parser, Clock clock, InternalClient client) {
super(settings, GetWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, GetWatchRequest::new);
this.parser = parser;
@ -46,7 +49,10 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
@Override
protected void doExecute(GetWatchRequest request, ActionListener<GetWatchResponse> listener) {
client.getWatch(request.getId(), ActionListener.wrap(getResponse -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists()) {
try (XContentBuilder builder = jsonBuilder()) {
// When we return the watch via the Get Watch REST API, we want to return the watch as was specified in the put api,

View File

@ -13,10 +13,6 @@ import org.elasticsearch.xpack.watcher.test.TimeWarpedWatcher;
import javax.security.auth.DestroyFailedException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.time.Clock;
public class TimeWarpedXPackPlugin extends XPackPlugin {

View File

@ -12,9 +12,9 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch;
@ -31,7 +31,7 @@ public class WatcherServiceTests extends ESTestCase {
ExecutionService executionService = mock(ExecutionService.class);
when(executionService.validate(anyObject())).thenReturn(true);
Watch.Parser parser = mock(Watch.Parser.class);
WatcherClientProxy client = mock(WatcherClientProxy.class);
InternalClient client = mock(InternalClient.class);
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore,
executionService, parser, client);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload;
@ -62,7 +61,8 @@ public class IndexActionTests extends ESIntegTestCase {
boolean customTimestampField = timestampField != null;
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
Payload payload;
@ -143,7 +143,8 @@ public class IndexActionTests extends ESIntegTestCase {
boolean customId = list == idList;
IndexAction action = new IndexAction("test-index", "test-type", null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("watch_id", executionTime, new Payload.Simple("_doc", list));
@ -256,7 +257,8 @@ public class IndexActionTests extends ESIntegTestCase {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
List<Map<String, Object>> docs = new ArrayList<>();
boolean addSuccessfulIndexedDoc = randomBoolean();
@ -278,7 +280,8 @@ public class IndexActionTests extends ESIntegTestCase {
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, WatcherClientProxy.of(client()), null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
final DateTime executionTime = DateTime.now(UTC);

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.watcher.actions.webhook;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -33,7 +34,6 @@ import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine;
@ -45,11 +45,10 @@ import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.junit.Before;
import javax.mail.internet.AddressException;
import java.io.IOException;
import java.util.Map;
import javax.mail.internet.AddressException;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -269,7 +268,7 @@ public class WebhookActionTests extends ESTestCase {
private Watch createWatch(String watchId, final String account) throws AddressException, IOException {
return WatcherTestUtils.createTestWatch(watchId,
mock(WatcherClientProxy.class),
mock(Client.class),
ExecuteScenario.Success.client(),
new AbstractWatcherIntegrationTestCase.NoopEmailService() {

View File

@ -7,7 +7,10 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
@ -16,6 +19,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.support.clock.ClockMock;
@ -32,7 +36,6 @@ import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.Transform;
@ -63,6 +66,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.joda.time.DateTime.now;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@ -83,7 +87,7 @@ public class ExecutionServiceTests extends ESTestCase {
private ExecutionService executionService;
private Clock clock;
private ThreadPool threadPool;
private WatcherClientProxy client;
private Client client;
private Watch.Parser parser;
@Before
@ -104,7 +108,7 @@ public class ExecutionServiceTests extends ESTestCase {
clock = ClockMock.frozen();
threadPool = mock(ThreadPool.class);
client = mock(WatcherClientProxy.class);
client = mock(Client.class);
parser = mock(Watch.Parser.class);
DiscoveryNode discoveryNode = new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
@ -124,7 +128,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(client.getWatch("_id")).thenReturn(getResponse);
mockGetWatchResponse(client, "_id", getResponse);
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -214,7 +218,7 @@ public class ExecutionServiceTests extends ESTestCase {
public void testExecuteFailedInput() throws Exception {
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(client.getWatch("_id")).thenReturn(getResponse);
mockGetWatchResponse(client, "_id", getResponse);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -285,7 +289,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(client.getWatch("_id")).thenReturn(getResponse);
mockGetWatchResponse(client, "_id", getResponse);
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -349,7 +353,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(client.getWatch("_id")).thenReturn(getResponse);
mockGetWatchResponse(client, "_id", getResponse);
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -412,7 +416,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(client.getWatch("_id")).thenReturn(getResponse);
mockGetWatchResponse(client, "_id", getResponse);
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -778,7 +782,7 @@ public class ExecutionServiceTests extends ESTestCase {
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(getResponse.getId()).thenReturn("foo");
when(client.getWatch(any())).thenReturn(getResponse);
mockGetWatchResponse(client, "foo", getResponse);
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any())).thenReturn(watch);
// execute needs to fail as well as storing the history
@ -822,18 +826,18 @@ public class ExecutionServiceTests extends ESTestCase {
when(getResponse.isExists()).thenReturn(false);
boolean exceptionThrown = false;
if (randomBoolean()) {
when(client.getWatch("_id")).thenReturn(getResponse);
mockGetWatchResponse(client, "_id", getResponse);
} else {
// this emulates any failure while getting the watch, while index not found is an accepted issue
if (randomBoolean()) {
exceptionThrown = true;
ElasticsearchException e = new ElasticsearchException("something went wrong, i.e. index not found");
when(client.getWatch("_id")).thenThrow(e);
mockGetWatchException(client, "_id", e);
WatchExecutionResult result = new WatchExecutionResult(ctx, randomInt(10));
WatchRecord wr = new WatchRecord.ExceptionWatchRecord(ctx, result, e);
when(ctx.abortFailedExecution(eq(e))).thenReturn(wr);
} else {
when(client.getWatch("_id")).thenThrow(new IndexNotFoundException(".watch"));
mockGetWatchException(client, "_id", new IndexNotFoundException(".watch"));
}
}
@ -885,4 +889,32 @@ public class ExecutionServiceTests extends ESTestCase {
return new Tuple<>(transform, transformResult);
}
private void mockGetWatchResponse(Client client, String id, GetResponse response) {
doAnswer(invocation -> {
GetRequest request = (GetRequest) invocation.getArguments()[0];
ActionListener<GetResponse> listener = (ActionListener) invocation.getArguments()[1];
if (request.id().equals(id)) {
listener.onResponse(response);
} else {
GetResult notFoundResult = new GetResult(request.index(), request.type(), request.id(), -1, false, null, null);
listener.onResponse(new GetResponse(notFoundResult));
}
return null;
}).when(client).get(any(), any());
}
private void mockGetWatchException(Client client, String id, Exception e) {
doAnswer(invocation -> {
GetRequest request = (GetRequest) invocation.getArguments()[0];
ActionListener<GetResponse> listener = (ActionListener) invocation.getArguments()[1];
if (request.id().equals(id)) {
listener.onFailure(e);
} else {
GetResult notFoundResult = new GetResult(request.index(), request.type(), request.id(), -1, false, null, null);
listener.onResponse(new GetResponse(notFoundResult));
}
return null;
}).when(client).get(any(), any());
}
}

View File

@ -7,11 +7,15 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
@ -27,18 +31,15 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.hamcrest.core.IsNull;
import org.joda.time.DateTime;
import org.junit.Before;
@ -49,16 +50,14 @@ import java.util.Collections;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TriggeredWatchStoreTests extends ESTestCase {
@ -68,15 +67,15 @@ public class TriggeredWatchStoreTests extends ESTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
private WatcherClientProxy clientProxy;
private Client client;
private TriggeredWatch.Parser parser;
private TriggeredWatchStore triggeredWatchStore;
@Before
public void init() {
clientProxy = mock(WatcherClientProxy.class);
client = mock(Client.class);
parser = mock(TriggeredWatch.Parser.class);
triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, parser);
triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, client, parser);
triggeredWatchStore.start();
}
@ -148,7 +147,13 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState cs = csBuilder.build();
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
AdminClient adminClient = mock(AdminClient.class);
when(client.admin()).thenReturn(adminClient);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
PlainActionFuture<RefreshResponse> future = PlainActionFuture.newFuture();
when(indicesAdminClient.refresh(any())).thenReturn(future);
future.onResponse(refreshResponse);
SearchResponse searchResponse1 = mock(SearchResponse.class);
when(searchResponse1.getSuccessfulShards()).thenReturn(1);
@ -161,7 +166,9 @@ public class TriggeredWatchStoreTests extends ESTestCase {
SearchHits hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f);
when(searchResponse1.getHits()).thenReturn(hits);
when(searchResponse1.getScrollId()).thenReturn("_scrollId");
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1);
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
when(client.search(any(SearchRequest.class))).thenReturn(searchFuture);
searchFuture.onResponse(searchResponse1);
// First return a scroll response with a single hit and then with no hits
hit = new SearchHit(0, "second_foo", new Text(TriggeredWatchStore.DOC_TYPE), null);
@ -170,15 +177,28 @@ public class TriggeredWatchStoreTests extends ESTestCase {
hit.sourceRef(source);
hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f);
SearchResponse searchResponse2 = new SearchResponse(
new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId", 1, 1, 1, null);
SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId", 1, 1, 1, null);
when(clientProxy.searchScroll(eq("_scrollId"), any(TimeValue.class))).thenReturn(searchResponse2, searchResponse3);
new InternalSearchResponse(hits, null, null, null, false, null, 1), "_scrollId1", 1, 1, 1, null);
SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 1, null);
doAnswer(invocation -> {
SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[0];
PlainActionFuture<SearchResponse> searchScrollFuture = PlainActionFuture.newFuture();
if (request.scrollId().equals("_scrollId")) {
searchScrollFuture.onResponse(searchResponse2);
} else if (request.scrollId().equals("_scrollId1")) {
searchScrollFuture.onResponse(searchResponse3);
} else {
searchScrollFuture.onFailure(new ElasticsearchException("test issue"));
}
return searchScrollFuture;
}).when(client).searchScroll(any());
TriggeredWatch triggeredWatch = mock(TriggeredWatch.class);
when(parser.parse(eq("_id"), eq(1L), any(BytesReference.class))).thenReturn(triggeredWatch);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
PlainActionFuture<ClearScrollResponse> clearScrollResponseFuture = PlainActionFuture.newFuture();
when(client.clearScroll(any())).thenReturn(clearScrollResponseFuture);
clearScrollResponseFuture.onResponse(new ClearScrollResponse(true, 1));
assertThat(triggeredWatchStore.validate(cs), is(true));
DateTime now = DateTime.now(UTC);
@ -203,10 +223,10 @@ public class TriggeredWatchStoreTests extends ESTestCase {
assertThat(triggeredWatches, notNullValue());
assertThat(triggeredWatches, hasSize(watches.size()));
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
verify(client.admin().indices(), times(1)).refresh(any());
verify(client, times(1)).search(any(SearchRequest.class));
verify(client, times(2)).searchScroll(any());
verify(client, times(1)).clearScroll(any());
}
// the elasticsearch migration helper is doing reindex using aliases, so we have to

View File

@ -6,10 +6,13 @@
package org.elasticsearch.xpack.watcher.history;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.common.http.HttpClient;
import org.elasticsearch.xpack.common.http.HttpRequest;
@ -24,7 +27,6 @@ import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionResult;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
@ -33,10 +35,10 @@ import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.DateTime;
import org.junit.Before;
import org.mockito.Matchers;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.watcher.history.HistoryStore.getHistoryIndexNameForTime;
import static org.elasticsearch.xpack.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@ -45,6 +47,7 @@ import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -52,26 +55,38 @@ import static org.mockito.Mockito.when;
public class HistoryStoreTests extends ESTestCase {
private HistoryStore historyStore;
private WatcherClientProxy clientProxy;
private Client client;
@Before
public void init() {
clientProxy = mock(WatcherClientProxy.class);
historyStore = new HistoryStore(Settings.EMPTY, clientProxy);
client = mock(Client.class);
historyStore = new HistoryStore(Settings.EMPTY, client);
historyStore.start();
}
public void testPut() throws Exception {
Wid wid = new Wid("_name", new DateTime(0, UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC));
DateTime now = new DateTime(0, UTC);
Wid wid = new Wid("_name", now);
String index = getHistoryIndexNameForTime(now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), now, now);
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null, randomAlphaOfLength(10));
IndexResponse indexResponse = mock(IndexResponse.class);
IndexRequest indexRequest = indexRequest(".watcher-history-1970.01.01", HistoryStore.DOC_TYPE, wid.value()
, IndexRequest.OpType.CREATE);
when(clientProxy.index(indexRequest, Matchers.<TimeValue>any())).thenReturn(indexResponse);
doAnswer(invocation -> {
IndexRequest request = (IndexRequest) invocation.getArguments()[0];
PlainActionFuture<IndexResponse> indexFuture = PlainActionFuture.newFuture();
if (request.id().equals(wid.value()) && request.type().equals(HistoryStore.DOC_TYPE) && request.opType() == OpType.CREATE
&& request.index().equals(index)) {
indexFuture.onResponse(indexResponse);
} else {
indexFuture.onFailure(new ElasticsearchException("test issue"));
}
return indexFuture;
}).when(client).index(any());
historyStore.put(watchRecord);
verify(clientProxy).index(Matchers.<IndexRequest>any(), Matchers.<TimeValue>any());
verify(client).index(any());
}
public void testPutStopped() throws Exception {
@ -92,13 +107,13 @@ public class HistoryStoreTests extends ESTestCase {
public void testIndexNameGeneration() {
String indexTemplateVersion = WatcherIndexTemplateRegistry.INDEX_TEMPLATE_VERSION;
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(0, UTC)),
assertThat(getHistoryIndexNameForTime(new DateTime(0, UTC)),
equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(100000000000L, UTC)),
assertThat(getHistoryIndexNameForTime(new DateTime(100000000000L, UTC)),
equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(1416582852000L, UTC)),
assertThat(getHistoryIndexNameForTime(new DateTime(1416582852000L, UTC)),
equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(2833165811000L, UTC)),
assertThat(getHistoryIndexNameForTime(new DateTime(2833165811000L, UTC)),
equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12"));
}
@ -139,13 +154,15 @@ public class HistoryStoreTests extends ESTestCase {
}
watchRecord.result().actionsResults().put(JiraAction.TYPE, result);
when(clientProxy.index(Matchers.any(), Matchers.<TimeValue>any())).thenReturn(mock(IndexResponse.class));
PlainActionFuture<IndexResponse> indexResponseFuture = PlainActionFuture.newFuture();
indexResponseFuture.onResponse(mock(IndexResponse.class));
when(client.index(any())).thenReturn(indexResponseFuture);
if (randomBoolean()) {
historyStore.put(watchRecord);
} else {
historyStore.forcePut(watchRecord);
}
verify(clientProxy).index(argThat(indexRequestDoesNotContainPassword(username, password)), Matchers.<TimeValue>any());
verify(client).index(argThat(indexRequestDoesNotContainPassword(username, password)));
}
private static Matcher<IndexRequest> indexRequestDoesNotContainPassword(String username, String password) {

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -45,7 +46,6 @@ import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
@ -61,6 +61,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.hamcrest.Matcher;
import org.joda.time.DateTime;
import javax.mail.internet.AddressException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -69,9 +70,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.mail.internet.AddressException;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt;
import static java.util.Collections.emptyMap;
import static org.apache.lucene.util.LuceneTestCase.createTempDir;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -166,12 +164,12 @@ public final class WatcherTestUtils {
public static Watch createTestWatch(String watchName, HttpClient httpClient, EmailService emailService,
WatcherSearchTemplateService searchTemplateService, Logger logger) throws AddressException {
WatcherClientProxy client = WatcherClientProxy.of(ESIntegTestCase.client());
Client client = ESIntegTestCase.client();
return createTestWatch(watchName, client, httpClient, emailService, searchTemplateService, logger);
}
public static Watch createTestWatch(String watchName, WatcherClientProxy client, HttpClient httpClient, EmailService emailService,
public static Watch createTestWatch(String watchName, Client client, HttpClient httpClient, EmailService emailService,
WatcherSearchTemplateService searchTemplateService, Logger logger) throws AddressException {
WatcherSearchTemplateRequest transformRequest =
@ -226,7 +224,7 @@ public final class WatcherTestUtils {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
AlwaysCondition.INSTANCE,
new ExecutableSearchTransform(searchTransform, logger, client, searchTemplateService, null),
new ExecutableSearchTransform(searchTransform, logger, client, searchTemplateService, TimeValue.timeValueMinutes(1)),
new TimeValue(0),
actions,
metadata,

View File

@ -30,7 +30,6 @@ import org.elasticsearch.xpack.watcher.input.search.SearchInput;
import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
@ -79,7 +78,7 @@ public class SearchInputTests extends ESIntegTestCase {
WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
WatcherClientProxy.of(client()), watcherSearchTemplateService(), null);
client(), watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -95,6 +94,7 @@ public class SearchInputTests extends ESIntegTestCase {
timeValueSeconds(5));
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
assertThat(XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
@ -111,7 +111,7 @@ public class SearchInputTests extends ESIntegTestCase {
WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder, searchType);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
WatcherClientProxy.of(client()), watcherSearchTemplateService(), null);
client(), watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
@ -127,6 +127,7 @@ public class SearchInputTests extends ESIntegTestCase {
timeValueSeconds(5));
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
assertThat(XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
@ -146,8 +147,7 @@ public class SearchInputTests extends ESIntegTestCase {
XContentParser parser = createParser(builder);
parser.nextToken();
SearchInputFactory factory = new SearchInputFactory(Settings.EMPTY, WatcherClientProxy.of(client()),
xContentRegistry(), scriptService());
SearchInputFactory factory = new SearchInputFactory(Settings.EMPTY, client(), xContentRegistry(), scriptService());
SearchInput searchInput = factory.parseInput("_id", parser);
assertEquals(SearchInput.TYPE, searchInput.type());

View File

@ -31,7 +31,6 @@ import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.Transform;
@ -106,8 +105,8 @@ public class SearchTransformTests extends ESIntegTestCase {
WatcherSearchTemplateRequest request = templateRequest(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()), "idx");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, WatcherClientProxy.of(client()),
watcherSearchTemplateService(), null);
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, client(),
watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -143,8 +142,8 @@ public class SearchTransformTests extends ESIntegTestCase {
QueryBuilders.wrapperQuery(jsonBuilder().startObject()
.startObject("_unknown_query_").endObject().endObject().bytes())), "idx");
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, WatcherClientProxy.of(client()),
watcherSearchTemplateService(), null);
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, client(),
watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -215,8 +214,7 @@ public class SearchTransformTests extends ESIntegTestCase {
XContentParser parser = createParser(builder);
parser.nextToken();
SearchTransformFactory transformFactory = new SearchTransformFactory(Settings.EMPTY, WatcherClientProxy.of(client()),
xContentRegistry(), scriptService());
SearchTransformFactory transformFactory = new SearchTransformFactory(Settings.EMPTY, client(), xContentRegistry(), scriptService());
ExecutableSearchTransform executable = transformFactory.parseExecutable("_id", parser);
assertThat(executable, notNullValue());
@ -277,7 +275,7 @@ public class SearchTransformTests extends ESIntegTestCase {
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform executableSearchTransform = new ExecutableSearchTransform(searchTransform, logger,
WatcherClientProxy.of(client()), watcherSearchTemplateService(), null);
client(), watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
return executableSearchTransform.execute(ctx, Payload.Simple.EMPTY);
}

View File

@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
@TestLogging("org.elasticsearch.xpack.watcher:TRACE")
@TestLogging("org.elasticsearch.xpack.watcher:DEBUG")
public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
private String id = randomAlphaOfLength(10);
@ -98,7 +98,9 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(a1CountAfterAck, greaterThan(0L));
assertThat(a2CountAfterAck, greaterThan(0L));
logger.info("###3");
timeWarp().trigger("_id", 4, TimeValue.timeValueSeconds(5));
logger.info("###4");
flush();
refresh();
@ -115,7 +117,9 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertEquals(DocWriteResponse.Result.DELETED, response.getResult());
refresh();
logger.info("###5");
timeWarp().trigger("_id", 4, TimeValue.timeValueSeconds(5));
logger.info("###6");
GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get();
assertThat(getWatchResponse.isFound(), is(true));

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
@ -74,7 +75,6 @@ import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
@ -143,7 +143,7 @@ import static org.mockito.Mockito.mock;
public class WatchTests extends ESTestCase {
private ScriptService scriptService;
private WatcherClientProxy client;
private Client client;
private HttpClient httpClient;
private EmailService emailService;
private TextTemplateEngine templateEngine;
@ -157,7 +157,7 @@ public class WatchTests extends ESTestCase {
@Before
public void init() throws Exception {
scriptService = mock(ScriptService.class);
client = mock(WatcherClientProxy.class);
client = mock(Client.class);
httpClient = mock(HttpClient.class);
emailService = mock(EmailService.class);
templateEngine = mock(TextTemplateEngine.class);
@ -482,7 +482,7 @@ public class WatchTests extends ESTestCase {
case SearchTransform.TYPE:
SearchTransform transform = new SearchTransform(
templateRequest(searchSource()), timeout, timeZone);
return new ExecutableSearchTransform(transform, logger, client, searchTemplateService, null);
return new ExecutableSearchTransform(transform, logger, client, searchTemplateService, TimeValue.timeValueMinutes(1));
default: // chain
SearchTransform searchTransform = new SearchTransform(
templateRequest(searchSource()), timeout, timeZone);
@ -492,7 +492,7 @@ public class WatchTests extends ESTestCase {
return new ExecutableChainTransform(chainTransform, logger, Arrays.<ExecutableTransform>asList(
new ExecutableSearchTransform(new SearchTransform(
templateRequest(searchSource()), timeout, timeZone),
logger, client, searchTemplateService, null),
logger, client, searchTemplateService, TimeValue.timeValueMinutes(1)),
new ExecutableScriptTransform(new ScriptTransform(mockScript("_script")),
logger, scriptService)));
}
@ -520,7 +520,7 @@ public class WatchTests extends ESTestCase {
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone);
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, null)));
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30))));
}
if (randomBoolean()) {
HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000))