From 5fd68959a0056d6e3b3403fcea2d32dfdff4ef7e Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 11 Dec 2017 13:01:16 +0000 Subject: [PATCH] [ML] Make datafeeds run-as the user who created/updated them (elastic/x-pack-elasticsearch#3254) This is the ML equivalent of what was done for Watcher in elastic/x-pack-elasticsearch#2808. For security reasons, ML datafeeds should not run as the _xpack user. Instead, they record the security headers from the request to create/update them, and reuse these when performing the search to retrieve data for analysis. Relates elastic/x-pack-elasticsearch#1071 Original commit: elastic/x-pack-elasticsearch@29f85de404a8afb26d9f43cfe1fbef103446ea5f --- .../xpack/ml/MlClientHelper.java | 72 +++++++++++ .../elasticsearch/xpack/ml/MlMetadata.java | 24 +++- .../xpack/ml/action/PutDatafeedAction.java | 14 +-- .../xpack/ml/action/StartDatafeedAction.java | 7 +- .../xpack/ml/action/UpdateDatafeedAction.java | 5 +- .../xpack/ml/datafeed/DatafeedConfig.java | 36 +++++- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 5 +- .../xpack/ml/datafeed/DatafeedManager.java | 12 +- .../xpack/ml/datafeed/DatafeedUpdate.java | 15 ++- .../aggregation/AggregationDataExtractor.java | 3 +- .../AggregationDataExtractorContext.java | 6 +- .../AggregationDataExtractorFactory.java | 3 +- .../chunked/ChunkedDataExtractor.java | 3 +- .../chunked/ChunkedDataExtractorContext.java | 5 +- .../chunked/ChunkedDataExtractorFactory.java | 3 +- .../extractor/scroll/ScrollDataExtractor.java | 11 +- .../scroll/ScrollDataExtractorContext.java | 5 +- .../scroll/ScrollDataExtractorFactory.java | 4 +- .../xpack/ml/MlClientHelperTests.java | 117 ++++++++++++++++++ .../xpack/ml/MlMetadataTests.java | 55 ++++---- .../ml/action/CloseJobActionRequestTests.java | 20 +-- .../ml/action/StartDatafeedActionTests.java | 6 +- .../StopDatafeedActionRequestTests.java | 21 ++-- .../ml/datafeed/DatafeedManagerTests.java | 11 +- .../datafeed/DatafeedNodeSelectorTests.java | 36 +++--- .../ml/datafeed/DatafeedUpdateTests.java | 40 +++--- .../AggregationDataExtractorTests.java | 11 +- .../chunked/ChunkedDataExtractorTests.java | 6 +- .../scroll/ScrollDataExtractorTests.java | 35 +++--- .../ml/integration/DatafeedJobsRestIT.java | 112 +++++++++++++---- qa/smoke-test-ml-with-security/roles.yml | 2 +- 31 files changed, 520 insertions(+), 185 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/MlClientHelper.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlClientHelper.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlClientHelper.java new file mode 100644 index 00000000000..aca1acd4b3b --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlClientHelper.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.security.authc.Authentication; +import org.elasticsearch.xpack.security.authc.AuthenticationService; + +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin; + +/** + * A helper class for actions which decides if we should run via the _xpack user and set ML as origin + * or if we should use the run_as functionality by setting the correct headers + */ +public class MlClientHelper { + + /** + * List of headers that are related to security + */ + public static final Set SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationService.RUN_AS_USER_HEADER, + Authentication.AUTHENTICATION_KEY); + + /** + * Execute a client operation and return the response, try to run a datafeed search with least privileges, when headers exist + * + * @param datafeedConfig The config for a datafeed + * @param client The client used to query + * @param supplier The action to run + * @return An instance of the response class + */ + public static T execute(DatafeedConfig datafeedConfig, Client client, Supplier supplier) { + return execute(datafeedConfig.getHeaders(), client, supplier); + } + + /** + * Execute a client operation and return the response, try to run an action with least privileges, when headers exist + * + * @param headers Request headers, ideally including security headers + * @param client The client used to query + * @param supplier The action to run + * @return An instance of the response class + */ + public static T execute(Map headers, Client client, Supplier supplier) { + // no headers, we will have to use the xpack internal user for our execution by specifying the ml origin + if (headers == null || headers.isEmpty()) { + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { + return supplier.get(); + } + } else { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + Map filteredHeaders = headers.entrySet().stream() + .filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); + return supplier.get(); + } + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index 54075448a23..fb7cba02392 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -48,6 +49,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Supplier; +import java.util.stream.Collectors; public class MlMetadata implements MetaData.Custom { @@ -101,7 +103,7 @@ public class MlMetadata implements MetaData.Custom { } public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds) { - return NameResolver.newUnaliased(datafeeds.keySet(), datafeedId -> ExceptionsHelper.missingDatafeedException(datafeedId)) + return NameResolver.newUnaliased(datafeeds.keySet(), ExceptionsHelper::missingDatafeedException) .expand(expression, allowNoDatafeeds); } @@ -285,7 +287,7 @@ public class MlMetadata implements MetaData.Custom { return this; } - public Builder putDatafeed(DatafeedConfig datafeedConfig) { + public Builder putDatafeed(DatafeedConfig datafeedConfig, ThreadContext threadContext) { if (datafeeds.containsKey(datafeedConfig.getId())) { throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists"); } @@ -293,6 +295,17 @@ public class MlMetadata implements MetaData.Custom { checkJobIsAvailableForDatafeed(jobId); Job job = jobs.get(jobId); DatafeedJobValidator.validate(datafeedConfig, job); + + if (threadContext != null) { + // Adjust the request, adding security headers from the current thread context + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig); + Map headers = threadContext.getHeaders().entrySet().stream() + .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + builder.setHeaders(headers); + datafeedConfig = builder.build(); + } + datafeeds.put(datafeedConfig.getId(), datafeedConfig); return this; } @@ -309,7 +322,7 @@ public class MlMetadata implements MetaData.Custom { } } - public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks) { + public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksCustomMetaData persistentTasks, ThreadContext threadContext) { String datafeedId = update.getId(); DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId); if (oldDatafeedConfig == null) { @@ -317,7 +330,7 @@ public class MlMetadata implements MetaData.Custom { } checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId, DatafeedState.STARTED), datafeedId, persistentTasks); - DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig); + DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig, threadContext); if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) { checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId()); } @@ -393,14 +406,13 @@ public class MlMetadata implements MetaData.Custom { putJob(jobBuilder.build(), true); } - public void checkJobHasNoDatafeed(String jobId) { + void checkJobHasNoDatafeed(String jobId) { Optional datafeed = getDatafeedByJobId(jobId); if (datafeed.isPresent()) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" + datafeed.get().getId() + "] refers to it"); } } - } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutDatafeedAction.java index bd69afd1046..0175e71c50f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutDatafeedAction.java @@ -49,7 +49,9 @@ import org.elasticsearch.xpack.security.authz.RoleDescriptor; import org.elasticsearch.xpack.security.support.Exceptions; import java.io.IOException; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class PutDatafeedAction extends Action { @@ -218,8 +220,7 @@ public class PutDatafeedAction extends Action listener) throws Exception { + protected void masterOperation(Request request, ClusterState state, ActionListener listener) { // If security is enabled only create the datafeed if the user requesting creation has // permission to read the indices the datafeed is going to read from if (securityEnabled) { @@ -266,6 +267,7 @@ public class PutDatafeedAction extends Action listener) { + clusterService.submitStateUpdateTask( "put-datafeed-" + request.getDatafeed().getId(), new AckedClusterStateUpdateTask(request, listener) { @@ -275,13 +277,11 @@ public class PutDatafeedAction extends Action { @@ -437,7 +434,7 @@ public class StartDatafeedAction super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); this.licenseState = licenseState; this.persistentTasksService = persistentTasksService; - this.client = clientWithOrigin(client, ML_ORIGIN); + this.client = client; } @Override @@ -453,7 +450,7 @@ public class StartDatafeedAction } @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + protected void masterOperation(Request request, ClusterState state, ActionListener listener) { DatafeedParams params = request.params; if (licenseState.isMachineLearningAllowed()) { ActionListener> finalListener = new ActionListener>() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java index 2b64916dea4..e1c2591a7a6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java @@ -143,8 +143,7 @@ public class UpdateDatafeedAction extends Action listener) - throws Exception { + protected void masterOperation(Request request, ClusterState state, ActionListener listener) { clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(), new AckedClusterStateUpdateTask(request, listener) { private volatile DatafeedConfig updatedDatafeed; @@ -164,7 +163,7 @@ public class UpdateDatafeedAction extends Action implements public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); public static final ParseField SOURCE = new ParseField("_source"); public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); + public static final ParseField HEADERS = new ParseField("headers"); // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser METADATA_PARSER = new ObjectParser<>("datafeed_config", true, Builder::new); @@ -117,6 +119,7 @@ public class DatafeedConfig extends AbstractDiffable implements // TODO this is to read former _source field. Remove in v7.0.0 parser.declareBoolean((builder, value) -> {}, SOURCE); parser.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSERS.get(parserType), CHUNKING_CONFIG); + parser.declareObject(Builder::setHeaders, (p, c) -> p.mapStrings(), HEADERS); } } @@ -140,10 +143,11 @@ public class DatafeedConfig extends AbstractDiffable implements private final List scriptFields; private final Integer scrollSize; private final ChunkingConfig chunkingConfig; + private final Map headers; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, - Integer scrollSize, ChunkingConfig chunkingConfig) { + Integer scrollSize, ChunkingConfig chunkingConfig, Map headers) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -155,6 +159,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.scriptFields = scriptFields; this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; + this.headers = Objects.requireNonNull(headers); } public DatafeedConfig(StreamInput in) throws IOException { @@ -185,6 +190,11 @@ public class DatafeedConfig extends AbstractDiffable implements in.readBoolean(); } this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new); + if (in.getVersion().onOrAfter(Version.V_6_2_0)) { + this.headers = in.readMap(StreamInput::readString, StreamInput::readString); + } else { + this.headers = Collections.emptyMap(); + } } public String getId() { @@ -245,6 +255,10 @@ public class DatafeedConfig extends AbstractDiffable implements return chunkingConfig; } + public Map getHeaders() { + return headers; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); @@ -277,6 +291,9 @@ public class DatafeedConfig extends AbstractDiffable implements out.writeBoolean(false); } out.writeOptionalWriteable(chunkingConfig); + if (out.getVersion().onOrAfter(Version.V_6_2_0)) { + out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); + } } @Override @@ -311,6 +328,10 @@ public class DatafeedConfig extends AbstractDiffable implements if (chunkingConfig != null) { builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig); } + if (headers != null && headers.isEmpty() == false + && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) { + builder.field(HEADERS.getPreferredName(), headers); + } return builder; } @@ -341,13 +362,14 @@ public class DatafeedConfig extends AbstractDiffable implements && Objects.equals(this.scrollSize, that.scrollSize) && Objects.equals(this.aggregations, that.aggregations) && Objects.equals(this.scriptFields, that.scriptFields) - && Objects.equals(this.chunkingConfig, that.chunkingConfig); + && Objects.equals(this.chunkingConfig, that.chunkingConfig) + && Objects.equals(this.headers, that.headers); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, - chunkingConfig); + chunkingConfig, headers); } @Override @@ -420,6 +442,7 @@ public class DatafeedConfig extends AbstractDiffable implements private List scriptFields; private Integer scrollSize = DEFAULT_SCROLL_SIZE; private ChunkingConfig chunkingConfig; + private Map headers = Collections.emptyMap(); public Builder() { } @@ -442,6 +465,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.scriptFields = config.scriptFields; this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; + this.headers = config.headers; } public void setId(String datafeedId) { @@ -452,6 +476,10 @@ public class DatafeedConfig extends AbstractDiffable implements this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); } + public void setHeaders(Map headers) { + this.headers = headers; + } + public void setIndices(List indices) { this.indices = ExceptionsHelper.requireNonNull(indices, INDICES.getPreferredName()); } @@ -516,7 +544,7 @@ public class DatafeedConfig extends AbstractDiffable implements setDefaultChunkingConfig(); setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, - chunkingConfig); + chunkingConfig, headers); } void validateAggregations() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 181634411c1..3ada8792747 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -25,9 +25,6 @@ import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; -import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.ClientHelper.clientWithOrigin; - public class DatafeedJobBuilder { private final Client client; @@ -36,7 +33,7 @@ public class DatafeedJobBuilder { private final Supplier currentTimeSupplier; public DatafeedJobBuilder(Client client, JobProvider jobProvider, Auditor auditor, Supplier currentTimeSupplier) { - this.client = clientWithOrigin(client, ML_ORIGIN); + this.client = client; this.jobProvider = Objects.requireNonNull(jobProvider); this.auditor = Objects.requireNonNull(auditor); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index a07c0dec464..928f8ecb0cf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; @@ -463,7 +464,16 @@ public class DatafeedManager extends AbstractComponent { } private void runTask(StartDatafeedAction.DatafeedTask task) { - innerRun(runningDatafeedsOnThisNode.get(task.getAllocationId()), task.getDatafeedStartTime(), task.getEndTime()); + // This clearing of the thread context is not strictly necessary. Every action performed by the + // datafeed _should_ be done using the MlClientHelper, which will set the appropriate thread + // context. However, by clearing the thread context here if anyone forgets to use MlClientHelper + // somewhere else in the datafeed code then it should cause a failure in the same way in single + // and multi node clusters. If we didn't clear the thread context here then there's a risk that + // a context with sufficient permissions would coincidentally be in force in some single node + // tests, leading to bugs not caught in CI due to many tests running in single node test clusters. + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { + innerRun(runningDatafeedsOnThisNode.get(task.getAllocationId()), task.getDatafeedStartTime(), task.getEndTime()); + } } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java index 00569b6fab0..a3b856967f2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -20,6 +21,7 @@ import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.MlClientHelper; import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; @@ -29,7 +31,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * A datafeed update contains partial properties to update a {@link DatafeedConfig}. @@ -260,7 +264,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update */ - public DatafeedConfig apply(DatafeedConfig datafeedConfig) { + public DatafeedConfig apply(DatafeedConfig datafeedConfig, ThreadContext threadContext) { if (id.equals(datafeedConfig.getId()) == false) { throw new IllegalArgumentException("Cannot apply update to datafeedConfig with different id"); } @@ -296,6 +300,15 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (chunkingConfig != null) { builder.setChunkingConfig(chunkingConfig); } + + if (threadContext != null) { + // Adjust the request, adding security headers from the current thread context + Map headers = threadContext.getHeaders().entrySet().stream() + .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + builder.setHeaders(headers); + } + return builder.build(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index 6624299fd7f..61cef8e9643 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.xpack.ml.MlClientHelper; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; @@ -111,7 +112,7 @@ class AggregationDataExtractor implements DataExtractor { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return searchRequestBuilder.get(); + return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); } private SearchRequestBuilder buildSearchRequest() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java index eefd32ef1fd..4958e39decc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java @@ -9,6 +9,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -24,9 +25,11 @@ class AggregationDataExtractorContext { final long start; final long end; final boolean includeDocCount; + final Map headers; AggregationDataExtractorContext(String jobId, String timeField, Set fields, List indices, List types, - QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount) { + QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount, + Map headers) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); @@ -37,5 +40,6 @@ class AggregationDataExtractorContext { this.start = start; this.end = end; this.includeDocCount = includeDocCount; + this.headers = headers; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index db081637624..a353b5a6b67 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -39,7 +39,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getAggregations(), Intervals.alignToCeil(start, histogramInterval), Intervals.alignToFloor(end, histogramInterval), - job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT)); + job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), + datafeedConfig.getHeaders()); return new AggregationDataExtractor(client, dataExtractorContext); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index c7a87086aa9..158fdfa8c58 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -15,6 +15,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.min.Min; +import org.elasticsearch.xpack.ml.MlClientHelper; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; @@ -133,7 +134,7 @@ public class ChunkedDataExtractor implements DataExtractor { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return searchRequestBuilder.get(); + return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); } private Optional getNextStream() throws IOException { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java index 4c2cdfd1b12..8efc1e2f7ea 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import java.util.List; +import java.util.Map; import java.util.Objects; class ChunkedDataExtractorContext { @@ -29,10 +30,11 @@ class ChunkedDataExtractorContext { final long end; final TimeValue chunkSpan; final TimeAligner timeAligner; + final Map headers; ChunkedDataExtractorContext(String jobId, String timeField, List indices, List types, QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, - TimeAligner timeAligner) { + TimeAligner timeAligner, Map headers) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -43,5 +45,6 @@ class ChunkedDataExtractorContext { this.end = end; this.chunkSpan = chunkSpan; this.timeAligner = Objects.requireNonNull(timeAligner); + this.headers = headers; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index fe122bce379..fc438e1b017 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -41,7 +41,8 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { timeAligner.alignToCeil(start), timeAligner.alignToFloor(end), datafeedConfig.getChunkingConfig().getTimeSpan(), - timeAligner); + timeAligner, + datafeedConfig.getHeaders()); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 2e0fec5da3e..c0a6cb3df54 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -20,6 +20,7 @@ import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.ml.MlClientHelper; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.ml.utils.DomainSplitFunction; @@ -98,7 +99,7 @@ class ScrollDataExtractor implements DataExtractor { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return searchRequestBuilder.get(); + return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); } private SearchRequestBuilder buildSearchRequest(long start) { @@ -182,7 +183,7 @@ class ScrollDataExtractor implements DataExtractor { private InputStream continueScroll() throws IOException { LOGGER.debug("[{}] Continuing scroll with id [{}]", context.jobId, scrollId); - SearchResponse searchResponse = null; + SearchResponse searchResponse; try { searchResponse = executeSearchScrollRequest(scrollId); } catch (SearchPhaseExecutionException searchExecutionException) { @@ -208,10 +209,10 @@ class ScrollDataExtractor implements DataExtractor { } protected SearchResponse executeSearchScrollRequest(String scrollId) { - return SearchScrollAction.INSTANCE.newRequestBuilder(client) + return MlClientHelper.execute(context.headers, client, () -> SearchScrollAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) .setScrollId(scrollId) - .get(); + .get()); } private void resetScroll() { @@ -223,7 +224,7 @@ class ScrollDataExtractor implements DataExtractor { if (scrollId != null) { ClearScrollRequest request = new ClearScrollRequest(); request.addScrollId(scrollId); - client.execute(ClearScrollAction.INSTANCE, request).actionGet(); + MlClientHelper.execute(context.headers, client, () -> client.execute(ClearScrollAction.INSTANCE, request).actionGet()); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java index 9a852e7c3d1..d1666497d24 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java @@ -9,6 +9,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.util.List; +import java.util.Map; import java.util.Objects; class ScrollDataExtractorContext { @@ -22,10 +23,11 @@ class ScrollDataExtractorContext { final int scrollSize; final long start; final long end; + final Map headers; ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List indices, List types, QueryBuilder query, List scriptFields, int scrollSize, - long start, long end) { + long start, long end, Map headers) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); @@ -35,5 +37,6 @@ class ScrollDataExtractorContext { this.scrollSize = scrollSize; this.start = start; this.end = end; + this.headers = headers; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index a57609b1069..d059cf380bf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; @@ -46,7 +45,8 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getScriptFields(), datafeedConfig.getScrollSize(), start, - end); + end, + datafeedConfig.getHeaders()); return new ScrollDataExtractor(client, dataExtractorContext); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java new file mode 100644 index 00000000000..dfbc479bc48 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.security.authc.Authentication; +import org.elasticsearch.xpack.security.authc.AuthenticationService; +import org.junit.Before; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; +import static org.elasticsearch.xpack.ClientHelper.ML_ORIGIN; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MlClientHelperTests extends ESTestCase { + + private Client client = mock(Client.class); + + @Before + public void setupMocks() { + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + } + + public void testEmptyHeaders() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); + builder.setIndices(Collections.singletonList("foo-index")); + + assertExecutionWithOrigin(builder.build()); + } + + public void testWithHeaders() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); + builder.setIndices(Collections.singletonList("foo-index")); + Map headers = MapBuilder.newMapBuilder() + .put(Authentication.AUTHENTICATION_KEY, "anything") + .put(AuthenticationService.RUN_AS_USER_HEADER, "anything") + .map(); + builder.setHeaders(headers); + + assertRunAsExecution(builder.build(), h -> { + assertThat(h.keySet(), hasSize(2)); + assertThat(h, hasEntry(Authentication.AUTHENTICATION_KEY, "anything")); + assertThat(h, hasEntry(AuthenticationService.RUN_AS_USER_HEADER, "anything")); + }); + } + + public void testFilteredHeaders() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); + builder.setIndices(Collections.singletonList("foo-index")); + Map unrelatedHeaders = MapBuilder.newMapBuilder() + .put(randomAlphaOfLength(10), "anything") + .map(); + builder.setHeaders(unrelatedHeaders); + + assertRunAsExecution(builder.build(), h -> assertThat(h.keySet(), hasSize(0))); + } + + /** + * This method executes a search and checks if the thread context was enriched with the ml origin + */ + private void assertExecutionWithOrigin(DatafeedConfig datafeedConfig) { + MlClientHelper.execute(datafeedConfig, client, () -> { + Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); + assertThat(origin, is(ML_ORIGIN)); + + // Check that headers are not set + Map headers = client.threadPool().getThreadContext().getHeaders(); + assertThat(headers, not(hasEntry(Authentication.AUTHENTICATION_KEY, "anything"))); + assertThat(headers, not(hasEntry(AuthenticationService.RUN_AS_USER_HEADER, "anything"))); + + return client.search(new SearchRequest()).actionGet(); + }); + } + + /** + * This method executes a search and ensures no stashed origin thread context was created, so that the regular node + * client was used, to emulate a run_as function + */ + public void assertRunAsExecution(DatafeedConfig datafeedConfig, Consumer> consumer) { + MlClientHelper.execute(datafeedConfig, client, () -> { + Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); + assertThat(origin, is(nullValue())); + + consumer.accept(client.threadPool().getThreadContext().getHeaders()); + return client.search(new SearchRequest()).actionGet(); + }); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 23c3a73b438..3ff569f81e1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.config.JobTests; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import java.io.IOException; import java.util.Collections; import java.util.Date; import java.util.Map; @@ -62,7 +61,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { } job = new Job.Builder(job).setAnalysisConfig(analysisConfig).build(); builder.putJob(job, false); - builder.putDatafeed(datafeedConfig); + builder.putDatafeed(datafeedConfig, null); } else { builder.putJob(job, false); } @@ -163,7 +162,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))); @@ -183,7 +182,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); MlMetadata result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); @@ -200,7 +199,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", "missing-job").build(); MlMetadata.Builder builder = new MlMetadata.Builder(); - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1)); + expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, null)); } public void testPutDatafeed_failBecauseJobIsBeingDeleted() { @@ -209,7 +208,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1)); + expectThrows(ResourceNotFoundException.class, () -> builder.putDatafeed(datafeedConfig1, null)); } public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() { @@ -217,9 +216,9 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); - expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1)); + expectThrows(ResourceAlreadyExistsException.class, () -> builder.putDatafeed(datafeedConfig1, null)); } public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() { @@ -228,10 +227,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder.putDatafeed(datafeedConfig2)); + () -> builder.putDatafeed(datafeedConfig2, null)); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -245,7 +244,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1.build(now), false); - expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1)); + expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1, null)); } public void testUpdateDatafeed() { @@ -253,12 +252,12 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setScrollSize(5000); - MlMetadata updatedMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null).build(); + MlMetadata updatedMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null).build(); DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId()); assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId())); @@ -270,7 +269,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() { DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("job_id"); update.setScrollSize(5000); - expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null).build()); + expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null, null).build()); } public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { @@ -278,7 +277,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); MlMetadata beforeMetadata = builder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -290,7 +289,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { update.setScrollSize(5000); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), tasksInProgress)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), tasksInProgress, null)); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -299,14 +298,14 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setJobId(job1.getId() + "_2"); expectThrows(ResourceNotFoundException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null)); } public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { @@ -318,15 +317,15 @@ public class MlMetadataTests extends AbstractSerializingTestCase { MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); builder.putJob(job2.build(), false); - builder.putDatafeed(datafeedConfig1); - builder.putDatafeed(datafeedConfig2); + builder.putDatafeed(datafeedConfig1, null); + builder.putDatafeed(datafeedConfig2, null); MlMetadata beforeMetadata = builder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setJobId(job2.getId()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null)); + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null, null)); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [job_id_2]")); } @@ -336,7 +335,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); - builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig1, null); MlMetadata result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); @@ -377,9 +376,9 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testExpandDatafeedIds() { MlMetadata.Builder mlMetadataBuilder = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2"); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build()); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build()); - mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build()); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("bar-1-feed", "bar-1").build(), null); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-1-feed", "foo-1").build(), null); + mlMetadataBuilder.putDatafeed(createDatafeedConfig("foo-2-feed", "foo-2").build(), null); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -399,7 +398,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { } @Override - protected MlMetadata mutateInstance(MlMetadata instance) throws IOException { + protected MlMetadata mutateInstance(MlMetadata instance) { Map jobs = instance.getJobs(); Map datafeeds = instance.getDatafeeds(); MlMetadata.Builder metadataBuilder = new MlMetadata.Builder(); @@ -408,7 +407,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { metadataBuilder.putJob(entry.getValue(), true); } for (Map.Entry entry : datafeeds.entrySet()) { - metadataBuilder.putDatafeed(entry.getValue()); + metadataBuilder.putDatafeed(entry.getValue(), null); } switch (between(0, 1)) { @@ -429,7 +428,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { } randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).build(); metadataBuilder.putJob(randomJob, false); - metadataBuilder.putDatafeed(datafeedConfig); + metadataBuilder.putDatafeed(datafeedConfig, null); break; default: throw new AssertionError("Illegal randomisation branch"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index ff21c6e192b..987116e3101 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -80,7 +80,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", - Collections.singletonList("*"))); + Collections.singletonList("*")), null); final PersistentTasksCustomMetaData.Builder startDataFeedTaskBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, JobState.OPENED, startDataFeedTaskBuilder); addTask("datafeed_id", 0L, null, DatafeedState.STARTED, startDataFeedTaskBuilder); @@ -147,7 +147,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa request.setForce(true); CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); assertEquals(Arrays.asList("job_id_1", "job_id_2", "job_id_3"), openJobs); - assertEquals(Arrays.asList("job_id_4"), closingJobs); + assertEquals(Collections.singletonList("job_id_4"), closingJobs); request.setForce(false); expectThrows(ElasticsearchStatusException.class, @@ -171,7 +171,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa CloseJobAction.Request request = new CloseJobAction.Request("job_id_1"); CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); - assertEquals(Arrays.asList("job_id_1"), openJobs); + assertEquals(Collections.singletonList("job_id_1"), openJobs); assertEquals(Collections.emptyList(), closingJobs); // Job without task is closed @@ -219,7 +219,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa request.setForce(true); CloseJobAction.resolveAndValidateJobId(request, cs1, openJobs, closingJobs); - assertEquals(Arrays.asList("job_id_failed"), openJobs); + assertEquals(Collections.singletonList("job_id_failed"), openJobs); assertEquals(Collections.emptyList(), closingJobs); openJobs.clear(); @@ -252,7 +252,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("_all"), cs1, openJobs, closingJobs); assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), openJobs); - assertEquals(Arrays.asList("job_id_closing"), closingJobs); + assertEquals(Collections.singletonList("job_id_closing"), closingJobs); openJobs.clear(); closingJobs.clear(); @@ -264,12 +264,12 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("job_id_closing"), cs1, openJobs, closingJobs); assertEquals(Collections.emptyList(), openJobs); - assertEquals(Arrays.asList("job_id_closing"), closingJobs); + assertEquals(Collections.singletonList("job_id_closing"), closingJobs); openJobs.clear(); closingJobs.clear(); CloseJobAction.resolveAndValidateJobId(new CloseJobAction.Request("job_id_open-1"), cs1, openJobs, closingJobs); - assertEquals(Arrays.asList("job_id_open-1"), openJobs); + assertEquals(Collections.singletonList("job_id_open-1"), openJobs); assertEquals(Collections.emptyList(), closingJobs); openJobs.clear(); closingJobs.clear(); @@ -316,8 +316,8 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa } public void testBuildWaitForCloseRequest() { - List openJobIds = Arrays.asList(new String[] {"openjob1", "openjob2"}); - List closingJobIds = Arrays.asList(new String[] {"closingjob1"}); + List openJobIds = Arrays.asList("openjob1", "openjob2"); + List closingJobIds = Collections.singletonList("closingjob1"); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("openjob1", null, JobState.OPENED, tasksBuilder); @@ -343,4 +343,4 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa tasks.updateTaskStatus(MlMetadata.datafeedTaskId(datafeedId), state); } -} \ No newline at end of file +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 57fd108fe48..c371e0306d2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -43,7 +43,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1) + .putDatafeed(datafeedConfig1, null) .build(); Exception e = expectThrows(ElasticsearchStatusException.class, () -> StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); @@ -60,7 +60,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1) + .putDatafeed(datafeedConfig1, null) .build(); StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); @@ -76,7 +76,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1) + .putDatafeed(datafeedConfig1, null) .build(); StartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index bd8fd0f6eba..18b7714dfe5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -66,7 +65,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe tasksBuilder.addTask(MlMetadata.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new Assignment("node_id", "")); tasksBuilder.updateTaskStatus(MlMetadata.datafeedTaskId("foo"), DatafeedState.STARTED); - PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + tasksBuilder.build(); Job job = createDatafeedJob().build(new Date()); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); @@ -76,7 +75,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder().putJob(job, false) - .putDatafeed(datafeedConfig) + .putDatafeed(datafeedConfig, null) .build(); StopDatafeedAction.validateDatafeedTask("foo", mlMetadata2); } @@ -88,12 +87,12 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -102,7 +101,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe List stoppingDatafeeds = new ArrayList<>(); StopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("datafeed_1"), mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds); - assertEquals(Arrays.asList("datafeed_1"), startedDatafeeds); + assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds); assertEquals(Collections.emptyList(), stoppingDatafeeds); startedDatafeeds.clear(); @@ -120,17 +119,17 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe addTask("datafeed_1", 0L, "node-1", DatafeedState.STARTED, tasksBuilder); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); addTask("datafeed_2", 0L, "node-1", DatafeedState.STOPPED, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); addTask("datafeed_3", 0L, "node-1", DatafeedState.STOPPING, tasksBuilder); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); - mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig, null); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build(); @@ -139,8 +138,8 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe List stoppingDatafeeds = new ArrayList<>(); StopDatafeedAction.resolveDataFeedIds(new StopDatafeedAction.Request("_all"), mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds); - assertEquals(Arrays.asList("datafeed_1"), startedDatafeeds); - assertEquals(Arrays.asList("datafeed_3"), stoppingDatafeeds); + assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds); + assertEquals(Collections.singletonList("datafeed_3"), stoppingDatafeeds); startedDatafeeds.clear(); stoppingDatafeeds.clear(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 6a44b5179e6..c17cb8f2dde 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -16,8 +16,10 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -80,7 +82,7 @@ public class DatafeedManagerTests extends ESTestCase { Job job = createDatafeedJob().build(new Date()); mlMetadata.putJob(job, false); DatafeedConfig datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); - mlMetadata.putDatafeed(datafeed); + mlMetadata.putDatafeed(datafeed, null); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); @@ -109,6 +111,7 @@ public class DatafeedManagerTests extends ESTestCase { auditor = mock(Auditor.class); threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); @@ -248,7 +251,7 @@ public class DatafeedManagerTests extends ESTestCase { } } - public void testDatafeedTaskWaitsUntilJobIsOpened() throws Exception { + public void testDatafeedTaskWaitsUntilJobIsOpened() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) @@ -288,7 +291,7 @@ public class DatafeedManagerTests extends ESTestCase { verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); } - public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() throws Exception { + public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) @@ -316,7 +319,7 @@ public class DatafeedManagerTests extends ESTestCase { verify(task).stop("job_never_opened", TimeValue.timeValueSeconds(20)); } - public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() throws Exception { + public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 8ce1fc1c983..ede2aea6a15 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -63,11 +63,11 @@ public class DatafeedNodeSelectorTests extends ESTestCase { .build(); } - public void testSelectNode_GivenJobIsOpened() throws Exception { + public void testSelectNode_GivenJobIsOpened() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -81,11 +81,11 @@ public class DatafeedNodeSelectorTests extends ESTestCase { new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); } - public void testSelectNode_GivenJobIsOpening() throws Exception { + public void testSelectNode_GivenJobIsOpening() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -99,13 +99,13 @@ public class DatafeedNodeSelectorTests extends ESTestCase { new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); } - public void testNoJobTask() throws Exception { + public void testNoJobTask() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); mlMetadata = mlMetadataBuilder.build(); tasks = PersistentTasksCustomMetaData.builder().build(); @@ -123,11 +123,11 @@ public class DatafeedNodeSelectorTests extends ESTestCase { + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [closed] while state [opened] is required]")); } - public void testSelectNode_GivenJobFailedOrClosed() throws Exception { + public void testSelectNode_GivenJobFailedOrClosed() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -149,13 +149,13 @@ public class DatafeedNodeSelectorTests extends ESTestCase { + "] while state [opened] is required]")); } - public void testShardUnassigned() throws Exception { + public void testShardUnassigned() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -175,13 +175,13 @@ public class DatafeedNodeSelectorTests extends ESTestCase { new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); } - public void testShardNotAllActive() throws Exception { + public void testShardNotAllActive() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -202,11 +202,11 @@ public class DatafeedNodeSelectorTests extends ESTestCase { new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); } - public void testIndexDoesntExist() throws Exception { + public void testIndexDoesntExist() { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -230,7 +230,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase { MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), null); mlMetadata = mlMetadataBuilder.build(); String nodeId = randomBoolean() ? "node_id2" : null; @@ -261,14 +261,14 @@ public class DatafeedNodeSelectorTests extends ESTestCase { new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); } - public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() throws Exception { + public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { // Here we test that when there are 2 problems, the most critical gets reported first. // In this case job is Opening (non-critical) and the index does not exist (critical) MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo"))); + mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), null); mlMetadata = mlMetadataBuilder.build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -339,4 +339,4 @@ public class DatafeedNodeSelectorTests extends ESTestCase { return new RoutingTable.Builder().add(rtBuilder).build(); } -} \ No newline at end of file +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java index d549bf6b531..8b153de708a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java @@ -25,9 +25,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig.Mode; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -93,7 +91,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase createRandomized(datafeed.getId() + "_2").apply(datafeed)); + expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed, null)); } public void testApply_givenEmptyUpdate() { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); - DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed); + DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, null); assertThat(datafeed, equalTo(updatedDatafeed)); } @@ -127,7 +125,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase next() throws IOException { + public Optional next() { if (streams.isEmpty()) { hasNext = false; return Optional.empty(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index a6de545db92..7ed00a01f35 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; @@ -24,6 +26,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -117,7 +120,10 @@ public class ScrollDataExtractorTests extends ESTestCase { @Before public void setUpTests() { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); capturedSearchRequests = new ArrayList<>(); capturedContinueScrollIds = new ArrayList<>(); jobId = "test-job"; @@ -269,7 +275,7 @@ public class ScrollDataExtractorTests extends ESTestCase { extractor.setNextResponse(createErrorResponse()); assertThat(extractor.hasNext(), is(true)); - expectThrows(IOException.class, () -> extractor.next()); + expectThrows(IOException.class, extractor::next); } public void testExtractionGivenContinueScrollResponseHasError() throws IOException { @@ -288,7 +294,7 @@ public class ScrollDataExtractorTests extends ESTestCase { extractor.setNextResponse(createErrorResponse()); assertThat(extractor.hasNext(), is(true)); - expectThrows(IOException.class, () -> extractor.next()); + expectThrows(IOException.class, extractor::next); } public void testExtractionGivenInitSearchResponseHasShardFailures() throws IOException { @@ -297,7 +303,7 @@ public class ScrollDataExtractorTests extends ESTestCase { extractor.setNextResponse(createResponseWithShardFailures()); assertThat(extractor.hasNext(), is(true)); - expectThrows(IOException.class, () -> extractor.next()); + expectThrows(IOException.class, extractor::next); } public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() throws IOException { @@ -306,7 +312,7 @@ public class ScrollDataExtractorTests extends ESTestCase { extractor.setNextResponse(createResponseWithUnavailableShards(1)); assertThat(extractor.hasNext(), is(true)); - IOException e = expectThrows(IOException.class, () -> extractor.next()); + IOException e = expectThrows(IOException.class, extractor::next); assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards")); } @@ -333,7 +339,7 @@ public class ScrollDataExtractorTests extends ESTestCase { assertThat(output.isPresent(), is(true)); // A second failure is not tolerated assertThat(extractor.hasNext(), is(true)); - expectThrows(IOException.class, () -> extractor.next()); + expectThrows(IOException.class, extractor::next); } public void testResetScollUsesLastResultTimestamp() throws IOException { @@ -389,7 +395,7 @@ public class ScrollDataExtractorTests extends ESTestCase { assertEquals(new Long(1400L), extractor.getLastTimestamp()); // A second failure is not tolerated assertThat(extractor.hasNext(), is(true)); - expectThrows(SearchPhaseExecutionException.class, () -> extractor.next()); + expectThrows(SearchPhaseExecutionException.class, extractor::next); } public void testSearchPhaseExecutionExceptionOnInitScroll() throws IOException { @@ -398,7 +404,7 @@ public class ScrollDataExtractorTests extends ESTestCase { extractor.setNextResponse(createResponseWithShardFailures()); extractor.setNextResponse(createResponseWithShardFailures()); - expectThrows(IOException.class, () -> extractor.next()); + expectThrows(IOException.class, extractor::next); List capturedClearScrollIds = getCapturedClearScrollIds(); assertThat(capturedClearScrollIds.isEmpty(), is(true)); @@ -412,8 +418,8 @@ public class ScrollDataExtractorTests extends ESTestCase { "script2", new Script(ScriptType.INLINE, "painless", "return domainSplit('foo.com', params);", emptyMap()), false); List sFields = Arrays.asList(withoutSplit, withSplit); - ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indices, - types, query, sFields, scrollSize, 1000, 2000); + ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indices, + types, query, sFields, scrollSize, 1000, 2000, Collections.emptyMap()); TestDataExtractor extractor = new TestDataExtractor(context); @@ -460,7 +466,8 @@ public class ScrollDataExtractorTests extends ESTestCase { } private ScrollDataExtractorContext createContext(long start, long end) { - return new ScrollDataExtractorContext(jobId, extractedFields, indices, types, query, scriptFields, scrollSize, start, end); + return new ScrollDataExtractorContext(jobId, extractedFields, indices, types, query, scriptFields, scrollSize, start, end, + Collections.emptyMap()); } private SearchResponse createEmptySearchResponse() { @@ -475,9 +482,9 @@ public class ScrollDataExtractorTests extends ESTestCase { for (int i = 0; i < timestamps.size(); i++) { SearchHit hit = new SearchHit(randomInt()); Map fields = new HashMap<>(); - fields.put(extractedFields.timeField(), new DocumentField("time", Arrays.asList(timestamps.get(i)))); - fields.put("field_1", new DocumentField("field_1", Arrays.asList(field1Values.get(i)))); - fields.put("field_2", new DocumentField("field_2", Arrays.asList(field2Values.get(i)))); + fields.put(extractedFields.timeField(), new DocumentField("time", Collections.singletonList(timestamps.get(i)))); + fields.put("field_1", new DocumentField("field_1", Collections.singletonList(field1Values.get(i)))); + fields.put("field_2", new DocumentField("field_2", Collections.singletonList(field2Values.get(i)))); hit.fields(fields); hits.add(hit); } @@ -519,4 +526,4 @@ public class ScrollDataExtractorTests extends ESTestCase { return reader.lines().collect(Collectors.joining("\n")); } } -} \ No newline at end of file +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 81d64eca366..319ca213193 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.SecuritySettingsSource; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; import org.junit.After; import org.junit.Before; @@ -24,8 +25,10 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Locale; import java.util.stream.Collectors; @@ -39,6 +42,8 @@ public class DatafeedJobsRestIT extends ESRestTestCase { basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING); private static final String BASIC_AUTH_VALUE_ML_ADMIN = basicAuthHeaderValue("ml_admin", SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING); + private static final String BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS = + basicAuthHeaderValue("ml_admin_plus_data", SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING); @Override protected Settings restClientSettings() { @@ -50,25 +55,39 @@ public class DatafeedJobsRestIT extends ESRestTestCase { return true; } - private void setupUser() throws IOException { - String password = new String(SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING.getChars()); - - // This user has admin rights on machine learning, but (importantly for the tests) no - // rights on any of the data indexes - String user = "{" - + " \"password\" : \"" + password + "\"," - + " \"roles\" : [ \"machine_learning_admin\" ]" + private void setupDataAccessRole(String index) throws IOException { + String json = "{" + + " \"indices\" : [" + + " { \"names\": [\"" + index + "\"], \"privileges\": [\"read\"] }" + + " ]" + "}"; - client().performRequest("put", "_xpack/security/user/ml_admin", Collections.emptyMap(), - new StringEntity(user, ContentType.APPLICATION_JSON)); + client().performRequest("put", "_xpack/security/role/test_data_access", Collections.emptyMap(), + new StringEntity(json, ContentType.APPLICATION_JSON)); + } + + private void setupUser(String user, List roles) throws IOException { + String password = new String(SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING.getChars()); + + String json = "{" + + " \"password\" : \"" + password + "\"," + + " \"roles\" : [ " + roles.stream().map(unquoted -> "\"" + unquoted + "\"").collect(Collectors.joining(", ")) + " ]" + + "}"; + + client().performRequest("put", "_xpack/security/user/" + user, Collections.emptyMap(), + new StringEntity(json, ContentType.APPLICATION_JSON)); } @Before public void setUpData() throws Exception { - setupUser(); + setupDataAccessRole("network-data"); + // This user has admin rights on machine learning, but (importantly for the tests) no rights + // on any of the data indexes + setupUser("ml_admin", Collections.singletonList("machine_learning_admin")); + // This user has admin rights on machine learning, and read access to the network-data index + setupUser("ml_admin_plus_data", Arrays.asList("machine_learning_admin", "test_data_access")); addAirlineData(); - addNetworkData(); + addNetworkData("network-data"); } private void addAirlineData() throws IOException { @@ -221,7 +240,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase { client().performRequest("post", "_refresh"); } - private void addNetworkData() throws IOException { + private void addNetworkData(String index) throws IOException { // Create index with source = enabled, doc_values = enabled, stored = false + multi-field String mappings = "{" @@ -241,19 +260,19 @@ public class DatafeedJobsRestIT extends ESRestTestCase { + " }" + " }" + "}"; - client().performRequest("put", "network-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON)); + client().performRequest("put", index, Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON)); String docTemplate = "{\"timestamp\":%d,\"host\":\"%s\",\"network_bytes_out\":%d}"; Date date = new Date(1464739200735L); for (int i=0; i<120; i++) { long byteCount = randomNonNegativeLong(); String jsonDoc = String.format(Locale.ROOT, docTemplate, date.getTime(), "hostA", byteCount); - client().performRequest("post", "network-data/doc", Collections.emptyMap(), + client().performRequest("post", index + "/doc", Collections.emptyMap(), new StringEntity(jsonDoc, ContentType.APPLICATION_JSON)); byteCount = randomNonNegativeLong(); jsonDoc = String.format(Locale.ROOT, docTemplate, date.getTime(), "hostB", byteCount); - client().performRequest("post", "network-data/doc", Collections.emptyMap(), + client().performRequest("post", index + "/doc", Collections.emptyMap(), new StringEntity(jsonDoc, ContentType.APPLICATION_JSON)); date = new Date(date.getTime() + 10_000); @@ -263,7 +282,6 @@ public class DatafeedJobsRestIT extends ESRestTestCase { client().performRequest("post", "_refresh"); } - public void testLookbackOnlyWithMixedTypes() throws Exception { new LookbackOnlyTestHelper("test-lookback-only-with-mixed-types", "airline-data") .setShouldSucceedProcessing(true).execute(); @@ -494,6 +512,52 @@ public class DatafeedJobsRestIT extends ESRestTestCase { assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":240")); } + public void testLookbackWithoutPermissions() throws Exception { + String jobId = "permission-test-network-job"; + String job = "{\"analysis_config\" :{\"bucket_span\":\"300s\"," + + "\"summary_count_field_name\":\"doc_count\"," + + "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"bytes-delta\",\"by_field_name\":\"hostname\"}]}," + + "\"data_description\" : {\"time_field\":\"timestamp\"}" + + "}"; + client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), + new StringEntity(job, ContentType.APPLICATION_JSON)); + + String datafeedId = "datafeed-" + jobId; + String aggregations = + "{\"hostname\": {\"terms\" : {\"field\": \"host.keyword\", \"size\":10}," + + "\"aggs\": {\"buckets\": {\"date_histogram\":{\"field\":\"timestamp\",\"interval\":\"5s\"}," + + "\"aggs\": {\"timestamp\":{\"max\":{\"field\":\"timestamp\"}}," + + "\"bytes-delta\":{\"derivative\":{\"buckets_path\":\"avg_bytes_out\"}}," + + "\"avg_bytes_out\":{\"avg\":{\"field\":\"network_bytes_out\"}} }}}}}"; + + // At the time we create the datafeed the user can access the network-data index that we have access to + new DatafeedBuilder(datafeedId, jobId, "network-data", "doc") + .setAggregations(aggregations) + .setChunkingTimespan("300s") + .setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS) + .build(); + + // Change the role so that the user can no longer access network-data + setupDataAccessRole("some-other-data"); + + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId, BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + // We expect that no data made it through to the job + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":0")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":0")); + + // There should be a notification saying that there was a problem extracting data + client().performRequest("post", "_refresh"); + Response notificationsResponse = client().performRequest("get", Auditor.NOTIFICATIONS_INDEX + "/_search?q=job_id:" + jobId); + String notificationsResponseAsString = responseEntityToString(notificationsResponse); + assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " + + "action [indices:data/read/search] is unauthorized for user [ml_admin_plus_data]\"")); + } + public void testLookbackWithPipelineBucketAgg() throws Exception { String jobId = "pipeline-bucket-agg-job"; String job = "{\"analysis_config\" :{\"bucket_span\":\"1h\"," @@ -665,10 +729,14 @@ public class DatafeedJobsRestIT extends ESRestTestCase { assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); } } - private void startDatafeedAndWaitUntilStopped(String datafeedId) throws Exception { + startDatafeedAndWaitUntilStopped(datafeedId, BASIC_AUTH_VALUE_SUPER_USER); + } + + private void startDatafeedAndWaitUntilStopped(String datafeedId, String authHeader) throws Exception { Response startDatafeedRequest = client().performRequest("post", - MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z"); + MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z", + new BasicHeader("Authorization", authHeader)); assertThat(startDatafeedRequest.getStatusLine().getStatusCode(), equalTo(200)); assertThat(responseEntityToString(startDatafeedRequest), equalTo("{\"started\":true}")); assertBusy(() -> { @@ -763,9 +831,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase { } DatafeedBuilder setChunkingTimespan(String timespan) { - chunkingTimespan = timespan; - return this; - } + chunkingTimespan = timespan; + return this; + } Response build() throws IOException { String datafeedConfig = "{" diff --git a/qa/smoke-test-ml-with-security/roles.yml b/qa/smoke-test-ml-with-security/roles.yml index 4c798037c48..e47fe40a120 100644 --- a/qa/smoke-test-ml-with-security/roles.yml +++ b/qa/smoke-test-ml-with-security/roles.yml @@ -7,7 +7,7 @@ minimal: # Give all users involved in these tests access to the indices where the data to # be analyzed is stored, because the ML roles alone do not provide access to # non-ML indices - - names: [ 'airline-data', 'index-foo', 'unavailable-data' ] + - names: [ 'airline-data', 'index-*', 'unavailable-data', 'utopia' ] privileges: - indices:admin/create - indices:admin/refresh