Watcher: Use Bulkprocessor in HistoryStore/TriggeredWatchStore (#32490)

Currently a watch execution results in one bulk request, when the
triggered watches are written into the that index, that need to be
executed. However the update of the watch status, the creation of the
watch history entry as well as the deletion of the triggered watches
index are all single document operations.

This can have quite a negative impact, once you are executing a lot of
watches, as each execution results in 4 documents writes, three of them
being single document actions.

This commit switches to a bulk processor instead of a single document
action for writing watch history entries and deleting triggered watch
entries. However the defaults are to run synchronous as before because
the number of concurrent requests is set to 0. This also fixes a bug,
where the deletion of the triggered watch entry was done asynchronously.

However if you have a high number of watches being executed, you can
configure watcher to delete the triggered watches entries as well as
writing the watch history entries via bulk requests.

The triggered watches deletions should still happen in a timely manner,
where as the history entries might actually be bound by size as one
entry can easily have 20kb.

The following settings have been added:

- xpack.watcher.bulk.actions (default 1)
- xpack.watcher.bulk.concurrent_requests (default 0)
- xpack.watcher.bulk.flush_interval (default 1s)
- xpack.watcher.bulk.size (default 1mb)

The drawback of this is of course, that on a node outage you might end
up with watch history entries not being written or watches needing to be
executing again because they have not been deleted from the triggered
watches index. The window of these two cases increases configuring the bulk processor to wait to reach certain thresholds.
This commit is contained in:
Alexander Reelsen 2018-09-18 10:25:16 +02:00 committed by GitHub
parent e075b872f6
commit 139128856a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 291 additions and 163 deletions

View File

@ -5,9 +5,14 @@
*/
package org.elasticsearch.xpack.watcher;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -20,13 +25,14 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -51,6 +57,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ssl.SSLService;
@ -184,12 +191,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin {
@ -201,6 +212,16 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope);
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
private static final Setting<Integer> SETTING_BULK_ACTIONS =
Setting.intSetting("xpack.watcher.bulk.actions", 1, 1, 10000, NodeScope);
private static final Setting<Integer> SETTING_BULK_CONCURRENT_REQUESTS =
Setting.intSetting("xpack.watcher.bulk.concurrent_requests", 0, 0, 20, NodeScope);
private static final Setting<TimeValue> SETTING_BULK_FLUSH_INTERVAL =
Setting.timeSetting("xpack.watcher.bulk.flush_interval", TimeValue.timeValueSeconds(1), NodeScope);
private static final Setting<ByteSizeValue> SETTING_BULK_SIZE =
Setting.byteSizeSetting("xpack.watcher.bulk.size", new ByteSizeValue(1, ByteSizeUnit.MB),
new ByteSizeValue(1, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), NodeScope);
public static final ScriptContext<SearchScript.Factory> SCRIPT_SEARCH_CONTEXT =
new ScriptContext<>("xpack", SearchScript.Factory.class);
@ -210,9 +231,10 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
public static final ScriptContext<TemplateScript.Factory> SCRIPT_TEMPLATE_CONTEXT
= new ScriptContext<>("xpack_template", TemplateScript.Factory.class);
private static final Logger logger = Loggers.getLogger(Watcher.class);
private static final Logger logger = LogManager.getLogger(Watcher.class);
private WatcherIndexingListener listener;
private HttpClient httpClient;
private BulkProcessor bulkProcessor;
protected final Settings settings;
protected final boolean transportClient;
@ -318,7 +340,49 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories);
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry));
final HistoryStore historyStore = new HistoryStore(settings, client);
bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
Map<String, String> triggeredWatches = Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.filter(r -> r.getIndex().startsWith(TriggeredWatchStoreField.INDEX_NAME))
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
if (triggeredWatches.isEmpty() == false) {
String failure = triggeredWatches.values().stream().collect(Collectors.joining(", "));
logger.error("triggered watches could not be deleted {}, failure [{}]",
triggeredWatches.keySet(), Strings.substring(failure, 0, 2000));
}
Map<String, String> overwrittenIds = Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.filter(r -> r.getIndex().startsWith(HistoryStoreField.INDEX_PREFIX))
.filter(r -> r.getVersion() > 1)
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
if (overwrittenIds.isEmpty() == false) {
String failure = overwrittenIds.values().stream().collect(Collectors.joining(", "));
logger.info("overwrote watch history entries {}, possible second execution of a triggered watch, failure [{}]",
overwrittenIds.keySet(), Strings.substring(failure, 0, 2000));
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("error executing bulk", failure);
}
})
.setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings))
.setBulkActions(SETTING_BULK_ACTIONS.get(settings))
.setBulkSize(SETTING_BULK_SIZE.get(settings))
.setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings))
.build();
HistoryStore historyStore = new HistoryStore(settings, bulkProcessor);
// schedulers
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
@ -340,7 +404,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
final TriggerService triggerService = new TriggerService(settings, triggerEngines);
final TriggeredWatch.Parser triggeredWatchParser = new TriggeredWatch.Parser(settings, triggerService);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser);
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser, bulkProcessor);
final WatcherSearchTemplateService watcherSearchTemplateService =
new WatcherSearchTemplateService(settings, scriptService, xContentRegistry);
@ -416,6 +480,12 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
settings.add(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START);
// bulk processor configuration
settings.add(SETTING_BULK_ACTIONS);
settings.add(SETTING_BULK_CONCURRENT_REQUESTS);
settings.add(SETTING_BULK_FLUSH_INTERVAL);
settings.add(SETTING_BULK_SIZE);
// notification services
settings.addAll(SlackService.getSettings());
settings.addAll(EmailService.getSettings());
@ -608,7 +678,15 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
@Override
public void close() throws IOException {
bulkProcessor.flush();
IOUtils.closeWhileHandlingException(httpClient);
try {
if (bulkProcessor.awaitClose(10, TimeUnit.SECONDS) == false) {
logger.warn("failed to properly close watcher bulk processor");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**

View File

@ -320,11 +320,8 @@ public class ExecutionService extends AbstractComponent {
// TODO log watch record in logger, when saving in history store failed, otherwise the info is gone!
}
}
try {
triggeredWatchStore.delete(ctx.id());
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
}
}
currentExecutions.get().remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
@ -412,14 +409,8 @@ public class ExecutionService extends AbstractComponent {
triggeredWatch.id()), exc);
}
try {
triggeredWatchStore.delete(triggeredWatch.id());
} catch (Exception exc) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " +
"rejection", triggeredWatch.id()), exc);
}
};
}
WatchRecord executeInner(WatchExecutionContext ctx) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -24,7 +25,6 @@ import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.execution.Wid;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
@ -46,8 +47,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
public class TriggeredWatchStore extends AbstractComponent {
@ -58,21 +57,17 @@ public class TriggeredWatchStore extends AbstractComponent {
private final TimeValue defaultBulkTimeout;
private final TimeValue defaultSearchTimeout;
private final BulkProcessor bulkProcessor;
public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser) {
public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser, BulkProcessor bulkProcessor) {
super(settings);
this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 1000);
this.client = client;
this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN);
this.scrollTimeout = settings.getAsTime("xpack.watcher.execution.scroll.timeout", TimeValue.timeValueMinutes(5));
this.defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
this.triggeredWatchParser = triggeredWatchParser;
}
public static boolean validate(ClusterState state) {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData());
return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN &&
state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive());
this.bulkProcessor = bulkProcessor;
}
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BulkResponse> listener) throws IOException {
@ -81,8 +76,7 @@ public class TriggeredWatchStore extends AbstractComponent {
return;
}
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, createBulkRequest(triggeredWatches,
TriggeredWatchStoreField.DOC_TYPE), listener, client::bulk);
client.bulk(createBulkRequest(triggeredWatches), listener);
}
public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws IOException {
@ -94,14 +88,14 @@ public class TriggeredWatchStore extends AbstractComponent {
/**
* Create a bulk request from the triggered watches with a specified document type
* @param triggeredWatches The list of triggered watches
* @param docType The document type to use, either the current one or legacy
* @return The bulk request for the triggered watches
* @throws IOException If a triggered watch could not be parsed to JSON, this exception is thrown
*/
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches, String docType) throws IOException {
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches) throws IOException {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, docType, triggeredWatch.id().value());
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE,
triggeredWatch.id().value());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
@ -112,12 +106,15 @@ public class TriggeredWatchStore extends AbstractComponent {
return request;
}
/**
* Delete a triggered watch entry.
* Note that this happens asynchronously, as these kind of requests are batched together to reduce the amount of concurrent requests.
*
* @param wid The ID os the triggered watch id
*/
public void delete(Wid wid) {
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value());
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.delete(request); // FIXME shouldn't we wait before saying the delete was successful
}
logger.trace("successfully deleted triggered watch with id [{}]", wid);
bulkProcessor.add(request);
}
/**
@ -140,9 +137,9 @@ public class TriggeredWatchStore extends AbstractComponent {
return Collections.emptyList();
}
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME))
.actionGet(TimeValue.timeValueSeconds(5));
try {
RefreshRequest request = new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME);
client.admin().indices().refresh(request).actionGet(TimeValue.timeValueSeconds(5));
} catch (IndexNotFoundException e) {
return Collections.emptyList();
}
@ -159,7 +156,7 @@ public class TriggeredWatchStore extends AbstractComponent {
.version(true));
SearchResponse response = null;
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
try {
response = client.search(searchRequest).actionGet(defaultSearchTimeout);
logger.debug("trying to find triggered watches for ids {}: found [{}] docs", ids, response.getHits().getTotalHits());
while (response.getHits().getHits().length != 0) {
@ -176,14 +173,18 @@ public class TriggeredWatchStore extends AbstractComponent {
}
} finally {
if (response != null) {
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(response.getScrollId());
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
}
}
}
return triggeredWatches;
}
public static boolean validate(ClusterState state) {
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData());
return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN &&
state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive());
}
}

View File

@ -7,17 +7,14 @@ package org.elasticsearch.xpack.watcher.history;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
@ -26,37 +23,18 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
import static org.elasticsearch.xpack.core.watcher.support.Exceptions.ioException;
public class HistoryStore extends AbstractComponent implements AutoCloseable {
public class HistoryStore extends AbstractComponent {
public static final String DOC_TYPE = "doc";
private final Client client;
private final BulkProcessor bulkProcessor;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock();
public HistoryStore(Settings settings, Client client) {
public HistoryStore(Settings settings, BulkProcessor bulkProcessor) {
super(settings);
this.client = client;
}
@Override
public void close() {
// This will block while put or update actions are underway
stopLock.lock();
stopLock.unlock();
this.bulkProcessor = bulkProcessor;
}
/**
@ -65,20 +43,14 @@ public class HistoryStore extends AbstractComponent implements AutoCloseable {
*/
public void put(WatchRecord watchRecord) throws Exception {
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
putUpdateLock.lock();
try (XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(builder)
.opType(IndexRequest.OpType.CREATE);
client.index(request).actionGet(30, TimeUnit.SECONDS);
logger.debug("indexed watch history record [{}]", watchRecord.id().value());
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
request.opType(IndexRequest.OpType.CREATE);
bulkProcessor.add(request);
} catch (IOException ioe) {
throw ioException("failed to persist watch record [{}]", ioe, watchRecord);
} finally {
putUpdateLock.unlock();
}
}
@ -88,33 +60,14 @@ public class HistoryStore extends AbstractComponent implements AutoCloseable {
*/
public void forcePut(WatchRecord watchRecord) {
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
putUpdateLock.lock();
try {
try (XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(builder)
.opType(IndexRequest.OpType.CREATE);
client.index(request).get(30, TimeUnit.SECONDS);
logger.debug("indexed watch history record [{}]", watchRecord.id().value());
} catch (VersionConflictEngineException vcee) {
watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
"watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]");
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(xContentBuilder.value(watchRecord));
client.index(request).get(30, TimeUnit.SECONDS);
}
logger.debug("overwrote watch history record [{}]", watchRecord.id().value());
}
} catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) {
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
bulkProcessor.add(request);
} catch (IOException ioe) {
final WatchRecord wr = watchRecord;
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe);
} finally {
putUpdateLock.unlock();
}
}

View File

@ -88,7 +88,6 @@ public class IntegrationAccount extends HipChatAccount {
sentMessages.add(SentMessages.SentMessage.responded(room, SentMessages.SentMessage.TargetType.ROOM, message, request,
response));
} catch (Exception e) {
logger.error("failed to execute hipchat api http request", e);
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message, e));
}
return new SentMessages(name, sentMessages);

View File

@ -7,15 +7,23 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
@ -65,6 +73,9 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.equalTo;
@ -79,7 +90,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TriggeredWatchStoreTests extends ESTestCase {
@ -92,15 +102,34 @@ public class TriggeredWatchStoreTests extends ESTestCase {
private Client client;
private TriggeredWatch.Parser parser;
private TriggeredWatchStore triggeredWatchStore;
private final Map<BulkRequest, BulkResponse> bulks = new LinkedHashMap<>();
private BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
bulks.put(request, response);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
throw new ElasticsearchException(failure);
}
};
@Before
public void init() {
Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build();
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
parser = mock(TriggeredWatch.Parser.class);
triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, client, parser);
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor);
}
public void testFindTriggeredWatchesEmptyCollection() {
@ -174,14 +203,11 @@ public class TriggeredWatchStoreTests extends ESTestCase {
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
AdminClient adminClient = mock(AdminClient.class);
when(client.admin()).thenReturn(adminClient);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
PlainActionFuture<RefreshResponse> future = PlainActionFuture.newFuture();
when(indicesAdminClient.refresh(any())).thenReturn(future);
future.onResponse(refreshResponse);
doAnswer(invocation -> {
ActionListener<RefreshResponse> listener = (ActionListener<RefreshResponse>) invocation.getArguments()[2];
listener.onResponse(mockRefreshResponse(1, 1));
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());
SearchResponse searchResponse1 = mock(SearchResponse.class);
when(searchResponse1.getSuccessfulShards()).thenReturn(1);
@ -194,9 +220,11 @@ public class TriggeredWatchStoreTests extends ESTestCase {
SearchHits hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f);
when(searchResponse1.getHits()).thenReturn(hits);
when(searchResponse1.getScrollId()).thenReturn("_scrollId");
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
when(client.search(any(SearchRequest.class))).thenReturn(searchFuture);
searchFuture.onResponse(searchResponse1);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
listener.onResponse(searchResponse1);
return null;
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
// First return a scroll response with a single hit and then with no hits
hit = new SearchHit(0, "second_foo", new Text(TriggeredWatchStoreField.DOC_TYPE), null);
@ -209,24 +237,27 @@ public class TriggeredWatchStoreTests extends ESTestCase {
SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null, null);
doAnswer(invocation -> {
SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[0];
PlainActionFuture<SearchResponse> searchScrollFuture = PlainActionFuture.newFuture();
SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[1];
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
if (request.scrollId().equals("_scrollId")) {
searchScrollFuture.onResponse(searchResponse2);
listener.onResponse(searchResponse2);
} else if (request.scrollId().equals("_scrollId1")) {
searchScrollFuture.onResponse(searchResponse3);
listener.onResponse(searchResponse3);
} else {
searchScrollFuture.onFailure(new ElasticsearchException("test issue"));
listener.onFailure(new ElasticsearchException("test issue"));
}
return searchScrollFuture;
}).when(client).searchScroll(any());
return null;
}).when(client).execute(eq(SearchScrollAction.INSTANCE), any(), any());
TriggeredWatch triggeredWatch = mock(TriggeredWatch.class);
when(parser.parse(eq("_id"), eq(1L), any(BytesReference.class))).thenReturn(triggeredWatch);
PlainActionFuture<ClearScrollResponse> clearScrollResponseFuture = PlainActionFuture.newFuture();
when(client.clearScroll(any())).thenReturn(clearScrollResponseFuture);
clearScrollResponseFuture.onResponse(new ClearScrollResponse(true, 1));
doAnswer(invocation -> {
ActionListener<ClearScrollResponse> listener = (ActionListener<ClearScrollResponse>) invocation.getArguments()[2];
listener.onResponse(new ClearScrollResponse(true, 1));
return null;
}).when(client).execute(eq(ClearScrollAction.INSTANCE), any(), any());
assertThat(TriggeredWatchStore.validate(cs), is(true));
DateTime now = DateTime.now(UTC);
@ -251,10 +282,10 @@ public class TriggeredWatchStoreTests extends ESTestCase {
assertThat(triggeredWatches, notNullValue());
assertThat(triggeredWatches, hasSize(watches.size()));
verify(client.admin().indices(), times(1)).refresh(any());
verify(client, times(1)).search(any(SearchRequest.class));
verify(client, times(2)).searchScroll(any());
verify(client, times(1)).clearScroll(any());
verify(client, times(1)).execute(eq(RefreshAction.INSTANCE), any(), any());
verify(client, times(1)).execute(eq(SearchAction.INSTANCE), any(), any());
verify(client, times(2)).execute(eq(SearchScrollAction.INSTANCE), any(), any());
verify(client, times(1)).execute(eq(ClearScrollAction.INSTANCE), any(), any());
}
// the elasticsearch migration helper is doing reindex using aliases, so we have to
@ -332,7 +363,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
assertThat(TriggeredWatchStore.validate(cs), is(true));
Watch watch = mock(Watch.class);
triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs);
verifyZeroInteractions(client);
verify(client, times(0)).execute(any(), any(), any());
}
public void testIndexNotFoundButInMetaData() {
@ -344,13 +375,11 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState cs = csBuilder.build();
Watch watch = mock(Watch.class);
AdminClient adminClient = mock(AdminClient.class);
when(client.admin()).thenReturn(adminClient);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
PlainActionFuture<RefreshResponse> future = PlainActionFuture.newFuture();
when(indicesAdminClient.refresh(any())).thenReturn(future);
future.onFailure(new IndexNotFoundException(TriggeredWatchStoreField.INDEX_NAME));
doAnswer(invocation -> {
ActionListener<RefreshResponse> listener = (ActionListener<RefreshResponse>) invocation.getArguments()[2];
listener.onFailure(new IndexNotFoundException(TriggeredWatchStoreField.INDEX_NAME));
return null;
}).when(client).execute(eq(RefreshAction.INSTANCE), any(), any());
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs);
assertThat(triggeredWatches, hasSize(0));
@ -381,6 +410,65 @@ public class TriggeredWatchStoreTests extends ESTestCase {
assertThat(BytesReference.bytes(jsonBuilder).utf8ToString(), equalTo(BytesReference.bytes(jsonBuilder2).utf8ToString()));
}
public void testPutTriggeredWatches() throws Exception {
DateTime now = DateTime.now(UTC);
int numberOfTriggeredWatches = randomIntBetween(1, 100);
List<TriggeredWatch> triggeredWatches = new ArrayList<>(numberOfTriggeredWatches);
for (int i = 0; i < numberOfTriggeredWatches; i++) {
triggeredWatches.add(new TriggeredWatch(new Wid("watch_id_", now), new ScheduleTriggerEvent("watch_id", now, now)));
}
doAnswer(invocation -> {
BulkRequest bulkRequest = (BulkRequest) invocation.getArguments()[1];
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocation.getArguments()[2];
int size = bulkRequest.requests().size();
BulkItemResponse[] bulkItemResponse = new BulkItemResponse[size];
for (int i = 0; i < size; i++) {
DocWriteRequest<?> writeRequest = bulkRequest.requests().get(i);
ShardId shardId = new ShardId(TriggeredWatchStoreField.INDEX_NAME, "uuid", 0);
IndexResponse indexResponse = new IndexResponse(shardId, writeRequest.type(), writeRequest.id(), 1, 1, 1, true);
bulkItemResponse[i] = new BulkItemResponse(0, writeRequest.opType(), indexResponse);
}
listener.onResponse(new BulkResponse(bulkItemResponse, 123));
return null;
}).when(client).execute(eq(BulkAction.INSTANCE), any(), any());
BulkResponse response = triggeredWatchStore.putAll(triggeredWatches);
assertThat(response.hasFailures(), is(false));
assertThat(response.getItems().length, is(numberOfTriggeredWatches));
}
public void testDeleteTriggeredWatches() throws Exception {
DateTime now = DateTime.now(UTC);
doAnswer(invocation -> {
BulkRequest bulkRequest = (BulkRequest) invocation.getArguments()[0];
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocation.getArguments()[1];
int size = bulkRequest.requests().size();
BulkItemResponse[] bulkItemResponse = new BulkItemResponse[size];
for (int i = 0; i < size; i++) {
DocWriteRequest<?> writeRequest = bulkRequest.requests().get(i);
ShardId shardId = new ShardId(TriggeredWatchStoreField.INDEX_NAME, "uuid", 0);
IndexResponse indexResponse = new IndexResponse(shardId, writeRequest.type(), writeRequest.id(), 1, 1, 1, true);
bulkItemResponse[i] = new BulkItemResponse(0, writeRequest.opType(), indexResponse);
}
listener.onResponse(new BulkResponse(bulkItemResponse, 123));
return null;
}).when(client).bulk(any(), any());
triggeredWatchStore.delete(new Wid("watch_id_", now));
assertThat(bulks.keySet(), hasSize(1));
BulkResponse response = bulks.values().iterator().next();
assertThat(response.hasFailures(), is(false));
assertThat(response.getItems().length, is(1));
}
private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total);

View File

@ -7,10 +7,14 @@ package org.elasticsearch.xpack.watcher.history;
import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -42,6 +46,7 @@ import static org.elasticsearch.xpack.core.watcher.history.HistoryStoreField.get
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC;
@ -58,11 +63,15 @@ public class HistoryStoreTests extends ESTestCase {
@Before
public void init() {
Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build();
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
historyStore = new HistoryStore(Settings.EMPTY, client);
when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build();
historyStore = new HistoryStore(settings, bulkProcessor);
}
public void testPut() throws Exception {
@ -75,19 +84,21 @@ public class HistoryStoreTests extends ESTestCase {
IndexResponse indexResponse = mock(IndexResponse.class);
doAnswer(invocation -> {
IndexRequest request = (IndexRequest) invocation.getArguments()[0];
PlainActionFuture<IndexResponse> indexFuture = PlainActionFuture.newFuture();
if (request.id().equals(wid.value()) && request.type().equals(HistoryStore.DOC_TYPE) && request.opType() == OpType.CREATE
&& request.index().equals(index)) {
indexFuture.onResponse(indexResponse);
BulkRequest request = (BulkRequest) invocation.getArguments()[1];
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocation.getArguments()[2];
IndexRequest indexRequest = (IndexRequest) request.requests().get(0);
if (indexRequest.id().equals(wid.value()) && indexRequest.type().equals(HistoryStore.DOC_TYPE) &&
indexRequest.opType() == OpType.CREATE && indexRequest.index().equals(index)) {
listener.onResponse(new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse(1, OpType.CREATE, indexResponse) }, 1));
} else {
indexFuture.onFailure(new ElasticsearchException("test issue"));
listener.onFailure(new ElasticsearchException("test issue"));
}
return indexFuture;
}).when(client).index(any());
return null;
}).when(client).bulk(any(), any());
historyStore.put(watchRecord);
verify(client).index(any());
verify(client).bulk(any(), any());
}
public void testIndexNameGeneration() {
@ -139,10 +150,15 @@ public class HistoryStoreTests extends ESTestCase {
}
watchRecord.result().actionsResults().put(JiraAction.TYPE, result);
PlainActionFuture<IndexResponse> indexResponseFuture = PlainActionFuture.newFuture();
indexResponseFuture.onResponse(mock(IndexResponse.class));
ArgumentCaptor<IndexRequest> requestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
when(client.index(requestCaptor.capture())).thenReturn(indexResponseFuture);
ArgumentCaptor<BulkRequest> requestCaptor = ArgumentCaptor.forClass(BulkRequest.class);
doAnswer(invocation -> {
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocation.getArguments()[2];
IndexResponse indexResponse = mock(IndexResponse.class);
listener.onResponse(new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse(1, OpType.CREATE, indexResponse) }, 1));
return null;
}).when(client).bulk(requestCaptor.capture(), any());
if (randomBoolean()) {
historyStore.put(watchRecord);
} else {
@ -150,7 +166,9 @@ public class HistoryStoreTests extends ESTestCase {
}
assertThat(requestCaptor.getAllValues(), hasSize(1));
String indexedJson = requestCaptor.getValue().source().utf8ToString();
assertThat(requestCaptor.getValue().requests().get(0), instanceOf(IndexRequest.class));
IndexRequest capturedIndexRequest = (IndexRequest) requestCaptor.getValue().requests().get(0);
String indexedJson = capturedIndexRequest.source().utf8ToString();
assertThat(indexedJson, containsString(username));
assertThat(indexedJson, not(containsString(password)));
}