From 139128856a5f63deb41243756e2b3b3a843fa57c Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 18 Sep 2018 10:25:16 +0200 Subject: [PATCH] Watcher: Use Bulkprocessor in HistoryStore/TriggeredWatchStore (#32490) Currently a watch execution results in one bulk request, when the triggered watches are written into the that index, that need to be executed. However the update of the watch status, the creation of the watch history entry as well as the deletion of the triggered watches index are all single document operations. This can have quite a negative impact, once you are executing a lot of watches, as each execution results in 4 documents writes, three of them being single document actions. This commit switches to a bulk processor instead of a single document action for writing watch history entries and deleting triggered watch entries. However the defaults are to run synchronous as before because the number of concurrent requests is set to 0. This also fixes a bug, where the deletion of the triggered watch entry was done asynchronously. However if you have a high number of watches being executed, you can configure watcher to delete the triggered watches entries as well as writing the watch history entries via bulk requests. The triggered watches deletions should still happen in a timely manner, where as the history entries might actually be bound by size as one entry can easily have 20kb. The following settings have been added: - xpack.watcher.bulk.actions (default 1) - xpack.watcher.bulk.concurrent_requests (default 0) - xpack.watcher.bulk.flush_interval (default 1s) - xpack.watcher.bulk.size (default 1mb) The drawback of this is of course, that on a node outage you might end up with watch history entries not being written or watches needing to be executing again because they have not been deleted from the triggered watches index. The window of these two cases increases configuring the bulk processor to wait to reach certain thresholds. --- .../elasticsearch/xpack/watcher/Watcher.java | 86 ++++++++- .../watcher/execution/ExecutionService.java | 17 +- .../execution/TriggeredWatchStore.java | 59 ++++--- .../xpack/watcher/history/HistoryStore.java | 73 ++------ .../hipchat/IntegrationAccount.java | 1 - .../execution/TriggeredWatchStoreTests.java | 166 ++++++++++++++---- .../watcher/history/HistoryStoreTests.java | 52 ++++-- 7 files changed, 291 insertions(+), 163 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 33b79c38cca..32d492b78a7 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -5,9 +5,14 @@ */ package org.elasticsearch.xpack.watcher; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -20,13 +25,14 @@ import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.util.Providers; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -51,6 +57,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -184,12 +191,16 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static org.elasticsearch.common.settings.Setting.Property.NodeScope; +import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin { @@ -201,6 +212,16 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope); public static final Setting MAX_STOP_TIMEOUT_SETTING = Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope); + private static final Setting SETTING_BULK_ACTIONS = + Setting.intSetting("xpack.watcher.bulk.actions", 1, 1, 10000, NodeScope); + private static final Setting SETTING_BULK_CONCURRENT_REQUESTS = + Setting.intSetting("xpack.watcher.bulk.concurrent_requests", 0, 0, 20, NodeScope); + private static final Setting SETTING_BULK_FLUSH_INTERVAL = + Setting.timeSetting("xpack.watcher.bulk.flush_interval", TimeValue.timeValueSeconds(1), NodeScope); + private static final Setting SETTING_BULK_SIZE = + Setting.byteSizeSetting("xpack.watcher.bulk.size", new ByteSizeValue(1, ByteSizeUnit.MB), + new ByteSizeValue(1, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), NodeScope); + public static final ScriptContext SCRIPT_SEARCH_CONTEXT = new ScriptContext<>("xpack", SearchScript.Factory.class); @@ -210,9 +231,10 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa public static final ScriptContext SCRIPT_TEMPLATE_CONTEXT = new ScriptContext<>("xpack_template", TemplateScript.Factory.class); - private static final Logger logger = Loggers.getLogger(Watcher.class); + private static final Logger logger = LogManager.getLogger(Watcher.class); private WatcherIndexingListener listener; private HttpClient httpClient; + private BulkProcessor bulkProcessor; protected final Settings settings; protected final boolean transportClient; @@ -318,7 +340,49 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories); inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry)); - final HistoryStore historyStore = new HistoryStore(settings, client); + bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + Map triggeredWatches = Arrays.stream(response.getItems()) + .filter(BulkItemResponse::isFailed) + .filter(r -> r.getIndex().startsWith(TriggeredWatchStoreField.INDEX_NAME)) + .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage)); + if (triggeredWatches.isEmpty() == false) { + String failure = triggeredWatches.values().stream().collect(Collectors.joining(", ")); + logger.error("triggered watches could not be deleted {}, failure [{}]", + triggeredWatches.keySet(), Strings.substring(failure, 0, 2000)); + } + + Map overwrittenIds = Arrays.stream(response.getItems()) + .filter(BulkItemResponse::isFailed) + .filter(r -> r.getIndex().startsWith(HistoryStoreField.INDEX_PREFIX)) + .filter(r -> r.getVersion() > 1) + .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage)); + if (overwrittenIds.isEmpty() == false) { + String failure = overwrittenIds.values().stream().collect(Collectors.joining(", ")); + logger.info("overwrote watch history entries {}, possible second execution of a triggered watch, failure [{}]", + overwrittenIds.keySet(), Strings.substring(failure, 0, 2000)); + } + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + logger.error("error executing bulk", failure); + } + }) + .setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings)) + .setBulkActions(SETTING_BULK_ACTIONS.get(settings)) + .setBulkSize(SETTING_BULK_SIZE.get(settings)) + .setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings)) + .build(); + + HistoryStore historyStore = new HistoryStore(settings, bulkProcessor); // schedulers final Set scheduleParsers = new HashSet<>(); @@ -340,7 +404,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa final TriggerService triggerService = new TriggerService(settings, triggerEngines); final TriggeredWatch.Parser triggeredWatchParser = new TriggeredWatch.Parser(settings, triggerService); - final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser); + final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser, bulkProcessor); final WatcherSearchTemplateService watcherSearchTemplateService = new WatcherSearchTemplateService(settings, scriptService, xContentRegistry); @@ -416,6 +480,12 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope)); settings.add(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START); + // bulk processor configuration + settings.add(SETTING_BULK_ACTIONS); + settings.add(SETTING_BULK_CONCURRENT_REQUESTS); + settings.add(SETTING_BULK_FLUSH_INTERVAL); + settings.add(SETTING_BULK_SIZE); + // notification services settings.addAll(SlackService.getSettings()); settings.addAll(EmailService.getSettings()); @@ -608,7 +678,15 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa @Override public void close() throws IOException { + bulkProcessor.flush(); IOUtils.closeWhileHandlingException(httpClient); + try { + if (bulkProcessor.awaitClose(10, TimeUnit.SECONDS) == false) { + logger.warn("failed to properly close watcher bulk processor"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } /** diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 7b77afb225e..3507bd4eb36 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -320,11 +320,8 @@ public class ExecutionService extends AbstractComponent { // TODO log watch record in logger, when saving in history store failed, otherwise the info is gone! } } - try { - triggeredWatchStore.delete(ctx.id()); - } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); - } + + triggeredWatchStore.delete(ctx.id()); } currentExecutions.get().remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); @@ -412,14 +409,8 @@ public class ExecutionService extends AbstractComponent { triggeredWatch.id()), exc); } - try { - triggeredWatchStore.delete(triggeredWatch.id()); - } catch (Exception exc) { - logger.error((Supplier) () -> - new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " + - "rejection", triggeredWatch.id()), exc); - } - }; + triggeredWatchStore.delete(triggeredWatch.id()); + } } WatchRecord executeInner(WatchExecutionContext ctx) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index e0164b5bdbd..9a4b555d633 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -24,7 +25,6 @@ import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -32,6 +32,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.execution.Wid; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -46,8 +47,6 @@ import java.util.Set; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; public class TriggeredWatchStore extends AbstractComponent { @@ -58,21 +57,17 @@ public class TriggeredWatchStore extends AbstractComponent { private final TimeValue defaultBulkTimeout; private final TimeValue defaultSearchTimeout; + private final BulkProcessor bulkProcessor; - public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser) { + public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser, BulkProcessor bulkProcessor) { super(settings); this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 1000); - this.client = client; + this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN); this.scrollTimeout = settings.getAsTime("xpack.watcher.execution.scroll.timeout", TimeValue.timeValueMinutes(5)); this.defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120)); this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30)); this.triggeredWatchParser = triggeredWatchParser; - } - - public static boolean validate(ClusterState state) { - IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData()); - return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN && - state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive()); + this.bulkProcessor = bulkProcessor; } public void putAll(final List triggeredWatches, final ActionListener listener) throws IOException { @@ -81,8 +76,7 @@ public class TriggeredWatchStore extends AbstractComponent { return; } - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, createBulkRequest(triggeredWatches, - TriggeredWatchStoreField.DOC_TYPE), listener, client::bulk); + client.bulk(createBulkRequest(triggeredWatches), listener); } public BulkResponse putAll(final List triggeredWatches) throws IOException { @@ -94,14 +88,14 @@ public class TriggeredWatchStore extends AbstractComponent { /** * Create a bulk request from the triggered watches with a specified document type * @param triggeredWatches The list of triggered watches - * @param docType The document type to use, either the current one or legacy * @return The bulk request for the triggered watches * @throws IOException If a triggered watch could not be parsed to JSON, this exception is thrown */ - private BulkRequest createBulkRequest(final List triggeredWatches, String docType) throws IOException { + private BulkRequest createBulkRequest(final List triggeredWatches) throws IOException { BulkRequest request = new BulkRequest(); for (TriggeredWatch triggeredWatch : triggeredWatches) { - IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, docType, triggeredWatch.id().value()); + IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, + triggeredWatch.id().value()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS); indexRequest.source(builder); @@ -112,12 +106,15 @@ public class TriggeredWatchStore extends AbstractComponent { return request; } + /** + * Delete a triggered watch entry. + * Note that this happens asynchronously, as these kind of requests are batched together to reduce the amount of concurrent requests. + * + * @param wid The ID os the triggered watch id + */ public void delete(Wid wid) { DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value()); - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - client.delete(request); // FIXME shouldn't we wait before saying the delete was successful - } - logger.trace("successfully deleted triggered watch with id [{}]", wid); + bulkProcessor.add(request); } /** @@ -140,9 +137,9 @@ public class TriggeredWatchStore extends AbstractComponent { return Collections.emptyList(); } - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME)) - .actionGet(TimeValue.timeValueSeconds(5)); + try { + RefreshRequest request = new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME); + client.admin().indices().refresh(request).actionGet(TimeValue.timeValueSeconds(5)); } catch (IndexNotFoundException e) { return Collections.emptyList(); } @@ -159,7 +156,7 @@ public class TriggeredWatchStore extends AbstractComponent { .version(true)); SearchResponse response = null; - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + try { response = client.search(searchRequest).actionGet(defaultSearchTimeout); logger.debug("trying to find triggered watches for ids {}: found [{}] docs", ids, response.getHits().getTotalHits()); while (response.getHits().getHits().length != 0) { @@ -176,14 +173,18 @@ public class TriggeredWatchStore extends AbstractComponent { } } finally { if (response != null) { - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(response.getScrollId()); - client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); - } + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); } } return triggeredWatches; } + + public static boolean validate(ClusterState state) { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData()); + return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN && + state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive()); + } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java index 64e909a2f73..723568f8ba7 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java @@ -7,17 +7,14 @@ package org.elasticsearch.xpack.watcher.history; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField; import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; @@ -26,37 +23,18 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.ioException; -public class HistoryStore extends AbstractComponent implements AutoCloseable { +public class HistoryStore extends AbstractComponent { public static final String DOC_TYPE = "doc"; - private final Client client; + private final BulkProcessor bulkProcessor; - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock putUpdateLock = readWriteLock.readLock(); - private final Lock stopLock = readWriteLock.writeLock(); - - public HistoryStore(Settings settings, Client client) { + public HistoryStore(Settings settings, BulkProcessor bulkProcessor) { super(settings); - this.client = client; - } - - @Override - public void close() { - // This will block while put or update actions are underway - stopLock.lock(); - stopLock.unlock(); + this.bulkProcessor = bulkProcessor; } /** @@ -65,20 +43,14 @@ public class HistoryStore extends AbstractComponent implements AutoCloseable { */ public void put(WatchRecord watchRecord) throws Exception { String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); - putUpdateLock.lock(); - try (XContentBuilder builder = XContentFactory.jsonBuilder(); - ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); - IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) - .source(builder) - .opType(IndexRequest.OpType.CREATE); - client.index(request).actionGet(30, TimeUnit.SECONDS); - logger.debug("indexed watch history record [{}]", watchRecord.id().value()); + IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder); + request.opType(IndexRequest.OpType.CREATE); + bulkProcessor.add(request); } catch (IOException ioe) { throw ioException("failed to persist watch record [{}]", ioe, watchRecord); - } finally { - putUpdateLock.unlock(); } } @@ -88,33 +60,14 @@ public class HistoryStore extends AbstractComponent implements AutoCloseable { */ public void forcePut(WatchRecord watchRecord) { String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); - putUpdateLock.lock(); - try { - try (XContentBuilder builder = XContentFactory.jsonBuilder(); - ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); - IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) - .source(builder) - .opType(IndexRequest.OpType.CREATE); - client.index(request).get(30, TimeUnit.SECONDS); - logger.debug("indexed watch history record [{}]", watchRecord.id().value()); - } catch (VersionConflictEngineException vcee) { - watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, - "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); - try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); - ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) - .source(xContentBuilder.value(watchRecord)); - client.index(request).get(30, TimeUnit.SECONDS); - } - logger.debug("overwrote watch history record [{}]", watchRecord.id().value()); - } - } catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) { + IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder); + bulkProcessor.add(request); + } catch (IOException ioe) { final WatchRecord wr = watchRecord; logger.error((Supplier) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe); - } finally { - putUpdateLock.unlock(); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/hipchat/IntegrationAccount.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/hipchat/IntegrationAccount.java index 8af00ae8f81..c33e788b614 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/hipchat/IntegrationAccount.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/hipchat/IntegrationAccount.java @@ -88,7 +88,6 @@ public class IntegrationAccount extends HipChatAccount { sentMessages.add(SentMessages.SentMessage.responded(room, SentMessages.SentMessage.TargetType.ROOM, message, request, response)); } catch (Exception e) { - logger.error("failed to execute hipchat api http request", e); sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message, e)); } return new SentMessages(name, sentMessages); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index f38f4ad6a86..4012c8d24b5 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -7,15 +7,23 @@ package org.elasticsearch.xpack.watcher.execution; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollResponse; -import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetaData; @@ -65,6 +73,9 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import static java.util.Collections.singleton; import static org.hamcrest.Matchers.equalTo; @@ -79,7 +90,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class TriggeredWatchStoreTests extends ESTestCase { @@ -92,15 +102,34 @@ public class TriggeredWatchStoreTests extends ESTestCase { private Client client; private TriggeredWatch.Parser parser; private TriggeredWatchStore triggeredWatchStore; + private final Map bulks = new LinkedHashMap<>(); + private BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + bulks.put(request, response); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + throw new ElasticsearchException(failure); + } + }; @Before public void init() { + Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build(); client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); + when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); parser = mock(TriggeredWatch.Parser.class); - triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, client, parser); + BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor); } public void testFindTriggeredWatchesEmptyCollection() { @@ -174,14 +203,11 @@ public class TriggeredWatchStoreTests extends ESTestCase { csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); - RefreshResponse refreshResponse = mockRefreshResponse(1, 1); - AdminClient adminClient = mock(AdminClient.class); - when(client.admin()).thenReturn(adminClient); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - when(adminClient.indices()).thenReturn(indicesAdminClient); - PlainActionFuture future = PlainActionFuture.newFuture(); - when(indicesAdminClient.refresh(any())).thenReturn(future); - future.onResponse(refreshResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(mockRefreshResponse(1, 1)); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(), any()); SearchResponse searchResponse1 = mock(SearchResponse.class); when(searchResponse1.getSuccessfulShards()).thenReturn(1); @@ -194,9 +220,11 @@ public class TriggeredWatchStoreTests extends ESTestCase { SearchHits hits = new SearchHits(new SearchHit[]{hit}, 1, 1.0f); when(searchResponse1.getHits()).thenReturn(hits); when(searchResponse1.getScrollId()).thenReturn("_scrollId"); - PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - when(client.search(any(SearchRequest.class))).thenReturn(searchFuture); - searchFuture.onResponse(searchResponse1); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(searchResponse1); + return null; + }).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); // First return a scroll response with a single hit and then with no hits hit = new SearchHit(0, "second_foo", new Text(TriggeredWatchStoreField.DOC_TYPE), null); @@ -209,24 +237,27 @@ public class TriggeredWatchStoreTests extends ESTestCase { SearchResponse searchResponse3 = new SearchResponse(InternalSearchResponse.empty(), "_scrollId2", 1, 1, 0, 1, null, null); doAnswer(invocation -> { - SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[0]; - PlainActionFuture searchScrollFuture = PlainActionFuture.newFuture(); + SearchScrollRequest request = (SearchScrollRequest) invocation.getArguments()[1]; + ActionListener listener = (ActionListener) invocation.getArguments()[2]; if (request.scrollId().equals("_scrollId")) { - searchScrollFuture.onResponse(searchResponse2); + listener.onResponse(searchResponse2); } else if (request.scrollId().equals("_scrollId1")) { - searchScrollFuture.onResponse(searchResponse3); + listener.onResponse(searchResponse3); } else { - searchScrollFuture.onFailure(new ElasticsearchException("test issue")); + listener.onFailure(new ElasticsearchException("test issue")); } - return searchScrollFuture; - }).when(client).searchScroll(any()); + return null; + }).when(client).execute(eq(SearchScrollAction.INSTANCE), any(), any()); TriggeredWatch triggeredWatch = mock(TriggeredWatch.class); when(parser.parse(eq("_id"), eq(1L), any(BytesReference.class))).thenReturn(triggeredWatch); - PlainActionFuture clearScrollResponseFuture = PlainActionFuture.newFuture(); - when(client.clearScroll(any())).thenReturn(clearScrollResponseFuture); - clearScrollResponseFuture.onResponse(new ClearScrollResponse(true, 1)); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new ClearScrollResponse(true, 1)); + return null; + + }).when(client).execute(eq(ClearScrollAction.INSTANCE), any(), any()); assertThat(TriggeredWatchStore.validate(cs), is(true)); DateTime now = DateTime.now(UTC); @@ -251,10 +282,10 @@ public class TriggeredWatchStoreTests extends ESTestCase { assertThat(triggeredWatches, notNullValue()); assertThat(triggeredWatches, hasSize(watches.size())); - verify(client.admin().indices(), times(1)).refresh(any()); - verify(client, times(1)).search(any(SearchRequest.class)); - verify(client, times(2)).searchScroll(any()); - verify(client, times(1)).clearScroll(any()); + verify(client, times(1)).execute(eq(RefreshAction.INSTANCE), any(), any()); + verify(client, times(1)).execute(eq(SearchAction.INSTANCE), any(), any()); + verify(client, times(2)).execute(eq(SearchScrollAction.INSTANCE), any(), any()); + verify(client, times(1)).execute(eq(ClearScrollAction.INSTANCE), any(), any()); } // the elasticsearch migration helper is doing reindex using aliases, so we have to @@ -332,7 +363,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { assertThat(TriggeredWatchStore.validate(cs), is(true)); Watch watch = mock(Watch.class); triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs); - verifyZeroInteractions(client); + verify(client, times(0)).execute(any(), any(), any()); } public void testIndexNotFoundButInMetaData() { @@ -344,13 +375,11 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState cs = csBuilder.build(); Watch watch = mock(Watch.class); - AdminClient adminClient = mock(AdminClient.class); - when(client.admin()).thenReturn(adminClient); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - when(adminClient.indices()).thenReturn(indicesAdminClient); - PlainActionFuture future = PlainActionFuture.newFuture(); - when(indicesAdminClient.refresh(any())).thenReturn(future); - future.onFailure(new IndexNotFoundException(TriggeredWatchStoreField.INDEX_NAME)); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onFailure(new IndexNotFoundException(TriggeredWatchStoreField.INDEX_NAME)); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(), any()); Collection triggeredWatches = triggeredWatchStore.findTriggeredWatches(Collections.singletonList(watch), cs); assertThat(triggeredWatches, hasSize(0)); @@ -381,6 +410,65 @@ public class TriggeredWatchStoreTests extends ESTestCase { assertThat(BytesReference.bytes(jsonBuilder).utf8ToString(), equalTo(BytesReference.bytes(jsonBuilder2).utf8ToString())); } + public void testPutTriggeredWatches() throws Exception { + DateTime now = DateTime.now(UTC); + int numberOfTriggeredWatches = randomIntBetween(1, 100); + + List triggeredWatches = new ArrayList<>(numberOfTriggeredWatches); + for (int i = 0; i < numberOfTriggeredWatches; i++) { + triggeredWatches.add(new TriggeredWatch(new Wid("watch_id_", now), new ScheduleTriggerEvent("watch_id", now, now))); + } + + doAnswer(invocation -> { + BulkRequest bulkRequest = (BulkRequest) invocation.getArguments()[1]; + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + + int size = bulkRequest.requests().size(); + BulkItemResponse[] bulkItemResponse = new BulkItemResponse[size]; + for (int i = 0; i < size; i++) { + DocWriteRequest writeRequest = bulkRequest.requests().get(i); + ShardId shardId = new ShardId(TriggeredWatchStoreField.INDEX_NAME, "uuid", 0); + IndexResponse indexResponse = new IndexResponse(shardId, writeRequest.type(), writeRequest.id(), 1, 1, 1, true); + bulkItemResponse[i] = new BulkItemResponse(0, writeRequest.opType(), indexResponse); + } + + listener.onResponse(new BulkResponse(bulkItemResponse, 123)); + return null; + }).when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + + BulkResponse response = triggeredWatchStore.putAll(triggeredWatches); + assertThat(response.hasFailures(), is(false)); + assertThat(response.getItems().length, is(numberOfTriggeredWatches)); + } + + public void testDeleteTriggeredWatches() throws Exception { + DateTime now = DateTime.now(UTC); + + doAnswer(invocation -> { + BulkRequest bulkRequest = (BulkRequest) invocation.getArguments()[0]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + + int size = bulkRequest.requests().size(); + BulkItemResponse[] bulkItemResponse = new BulkItemResponse[size]; + for (int i = 0; i < size; i++) { + DocWriteRequest writeRequest = bulkRequest.requests().get(i); + ShardId shardId = new ShardId(TriggeredWatchStoreField.INDEX_NAME, "uuid", 0); + IndexResponse indexResponse = new IndexResponse(shardId, writeRequest.type(), writeRequest.id(), 1, 1, 1, true); + bulkItemResponse[i] = new BulkItemResponse(0, writeRequest.opType(), indexResponse); + } + + listener.onResponse(new BulkResponse(bulkItemResponse, 123)); + return null; + }).when(client).bulk(any(), any()); + + triggeredWatchStore.delete(new Wid("watch_id_", now)); + assertThat(bulks.keySet(), hasSize(1)); + BulkResponse response = bulks.values().iterator().next(); + assertThat(response.hasFailures(), is(false)); + assertThat(response.getItems().length, is(1)); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 8f1cce93055..19bf1ba5a1f 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -7,10 +7,14 @@ package org.elasticsearch.xpack.watcher.history; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -42,6 +46,7 @@ import static org.elasticsearch.xpack.core.watcher.history.HistoryStoreField.get import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsEqual.equalTo; import static org.joda.time.DateTimeZone.UTC; @@ -58,11 +63,15 @@ public class HistoryStoreTests extends ESTestCase { @Before public void init() { + Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build(); client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - historyStore = new HistoryStore(Settings.EMPTY, client); + when(client.settings()).thenReturn(settings); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); + BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); + BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener).setConcurrentRequests(0).setBulkActions(1).build(); + historyStore = new HistoryStore(settings, bulkProcessor); } public void testPut() throws Exception { @@ -75,19 +84,21 @@ public class HistoryStoreTests extends ESTestCase { IndexResponse indexResponse = mock(IndexResponse.class); doAnswer(invocation -> { - IndexRequest request = (IndexRequest) invocation.getArguments()[0]; - PlainActionFuture indexFuture = PlainActionFuture.newFuture(); - if (request.id().equals(wid.value()) && request.type().equals(HistoryStore.DOC_TYPE) && request.opType() == OpType.CREATE - && request.index().equals(index)) { - indexFuture.onResponse(indexResponse); + BulkRequest request = (BulkRequest) invocation.getArguments()[1]; + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + + IndexRequest indexRequest = (IndexRequest) request.requests().get(0); + if (indexRequest.id().equals(wid.value()) && indexRequest.type().equals(HistoryStore.DOC_TYPE) && + indexRequest.opType() == OpType.CREATE && indexRequest.index().equals(index)) { + listener.onResponse(new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse(1, OpType.CREATE, indexResponse) }, 1)); } else { - indexFuture.onFailure(new ElasticsearchException("test issue")); + listener.onFailure(new ElasticsearchException("test issue")); } - return indexFuture; - }).when(client).index(any()); + return null; + }).when(client).bulk(any(), any()); historyStore.put(watchRecord); - verify(client).index(any()); + verify(client).bulk(any(), any()); } public void testIndexNameGeneration() { @@ -139,10 +150,15 @@ public class HistoryStoreTests extends ESTestCase { } watchRecord.result().actionsResults().put(JiraAction.TYPE, result); - PlainActionFuture indexResponseFuture = PlainActionFuture.newFuture(); - indexResponseFuture.onResponse(mock(IndexResponse.class)); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(IndexRequest.class); - when(client.index(requestCaptor.capture())).thenReturn(indexResponseFuture); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(BulkRequest.class); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + + IndexResponse indexResponse = mock(IndexResponse.class); + listener.onResponse(new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse(1, OpType.CREATE, indexResponse) }, 1)); + return null; + }).when(client).bulk(requestCaptor.capture(), any()); + if (randomBoolean()) { historyStore.put(watchRecord); } else { @@ -150,7 +166,9 @@ public class HistoryStoreTests extends ESTestCase { } assertThat(requestCaptor.getAllValues(), hasSize(1)); - String indexedJson = requestCaptor.getValue().source().utf8ToString(); + assertThat(requestCaptor.getValue().requests().get(0), instanceOf(IndexRequest.class)); + IndexRequest capturedIndexRequest = (IndexRequest) requestCaptor.getValue().requests().get(0); + String indexedJson = capturedIndexRequest.source().utf8ToString(); assertThat(indexedJson, containsString(username)); assertThat(indexedJson, not(containsString(password))); }