diff --git a/docs/java-api/query-dsl/has-child-query.asciidoc b/docs/java-api/query-dsl/has-child-query.asciidoc index 300b32e1922..f47f3af487d 100644 --- a/docs/java-api/query-dsl/has-child-query.asciidoc +++ b/docs/java-api/query-dsl/has-child-query.asciidoc @@ -9,7 +9,7 @@ When using the `has_child` query it is important to use the `PreBuiltTransportCl -------------------------------------------------- Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); TransportClient client = new PreBuiltTransportClient(settings); -client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9300))); +client.addTransportAddress(new TransportAddress(new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9300))); -------------------------------------------------- Otherwise the parent-join module doesn't get loaded and the `has_child` query can't be used from the transport client. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 193dfa4b2eb..136103f7d4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -544,20 +544,20 @@ public class IndexShardRoutingTable implements Iterable { static class AttributesKey { - final String[] attributes; + final List attributes; - AttributesKey(String[] attributes) { + AttributesKey(List attributes) { this.attributes = attributes; } @Override public int hashCode() { - return Arrays.hashCode(attributes); + return attributes.hashCode(); } @Override public boolean equals(Object obj) { - return obj instanceof AttributesKey && Arrays.equals(attributes, ((AttributesKey) obj).attributes); + return obj instanceof AttributesKey && attributes.equals(((AttributesKey) obj).attributes); } } @@ -621,11 +621,11 @@ public class IndexShardRoutingTable implements Iterable { return Collections.unmodifiableList(to); } - public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) { + public ShardIterator preferAttributesActiveInitializingShardsIt(List attributes, DiscoveryNodes nodes) { return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed()); } - public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int seed) { + public ShardIterator preferAttributesActiveInitializingShardsIt(List attributes, DiscoveryNodes nodes, int seed) { AttributesKey key = new AttributesKey(attributes); AttributesRoutings activeRoutings = getActiveAttribute(key, nodes); AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 005600ceb44..87655c06413 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -49,7 +50,7 @@ public class OperationRouting extends AbstractComponent { Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", true, Setting.Property.Dynamic, Setting.Property.NodeScope); - private String[] awarenessAttributes; + private List awarenessAttributes; private boolean useAdaptiveReplicaSelection; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { @@ -65,7 +66,7 @@ public class OperationRouting extends AbstractComponent { this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection; } - private void setAwarenessAttributes(String[] awarenessAttributes) { + private void setAwarenessAttributes(List awarenessAttributes) { this.awarenessAttributes = awarenessAttributes; } @@ -139,7 +140,7 @@ public class OperationRouting extends AbstractComponent { @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts) { if (preference == null || preference.isEmpty()) { - if (awarenessAttributes.length == 0) { + if (awarenessAttributes.isEmpty()) { if (useAdaptiveReplicaSelection) { return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); } else { @@ -174,7 +175,7 @@ public class OperationRouting extends AbstractComponent { } // no more preference if (index == -1 || index == preference.length() - 1) { - if (awarenessAttributes.length == 0) { + if (awarenessAttributes.isEmpty()) { if (useAdaptiveReplicaSelection) { return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); } else { @@ -218,7 +219,7 @@ public class OperationRouting extends AbstractComponent { // shard ID into the hash of the user-supplied preference key. routingHash = 31 * routingHash + indexShard.shardId.hashCode(); } - if (awarenessAttributes.length == 0) { + if (awarenessAttributes.isEmpty()) { return indexShard.activeInitializingShardsIt(routingHash); } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index e7e538ae371..6105c732d55 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import com.carrotsearch.hppc.ObjectIntHashMap; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -34,6 +35,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import static java.util.Collections.emptyList; + /** * This {@link AllocationDecider} controls shard allocation based on * {@code awareness} key-value pairs defined in the node configuration. @@ -78,13 +81,13 @@ public class AwarenessAllocationDecider extends AllocationDecider { public static final String NAME = "awareness"; - public static final Setting CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING = - new Setting<>("cluster.routing.allocation.awareness.attributes", "", s -> Strings.tokenizeToStringArray(s, ","), Property.Dynamic, + public static final Setting> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING = + Setting.listSetting("cluster.routing.allocation.awareness.attributes", emptyList(), Function.identity(), Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING = Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope); - private volatile String[] awarenessAttributes; + private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; @@ -109,7 +112,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { this.forcedAwarenessAttributes = forcedAwarenessAttributes; } - private void setAwarenessAttributes(String[] awarenessAttributes) { + private void setAwarenessAttributes(List awarenessAttributes) { this.awarenessAttributes = awarenessAttributes; } @@ -124,7 +127,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { } private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { - if (awarenessAttributes.length == 0) { + if (awarenessAttributes.isEmpty()) { return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled, set cluster setting [%s] to enable it", CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey()); @@ -138,7 +141,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]", awarenessAttribute, CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), - allocation.debugDecision() ? Strings.arrayToCommaDelimitedString(awarenessAttributes) : null); + allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null); } // build attr_value -> nodes map diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexShardRoutingTableTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexShardRoutingTableTests.java index 7823970ff46..659d6007036 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexShardRoutingTableTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexShardRoutingTableTests.java @@ -24,11 +24,13 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; public class IndexShardRoutingTableTests extends ESTestCase { public void testEqualsAttributesKey() { - String[] attr1 = {"a"}; - String[] attr2 = {"b"}; + List attr1 = Arrays.asList("a"); + List attr2 = Arrays.asList("b"); IndexShardRoutingTable.AttributesKey attributesKey1 = new IndexShardRoutingTable.AttributesKey(attr1); IndexShardRoutingTable.AttributesKey attributesKey2 = new IndexShardRoutingTable.AttributesKey(attr1); IndexShardRoutingTable.AttributesKey attributesKey3 = new IndexShardRoutingTable.AttributesKey(attr2); diff --git a/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index 6fd11aa91dc..c48b745743e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -50,7 +51,6 @@ import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -224,11 +224,16 @@ public class RoutingIteratorTests extends ESAllocationTestCase { } public void testAttributePreferenceRouting() { - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put("cluster.routing.allocation.awareness.attributes", "rack_id,zone") - .build()); + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always"); + if (randomBoolean()) { + settings.put("cluster.routing.allocation.awareness.attributes", " rack_id, zone "); + } else { + settings.putList("cluster.routing.allocation.awareness.attributes", "rack_id", "zone"); + } + + AllocationService strategy = createAllocationService(settings.build()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -258,7 +263,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); // after all are started, check routing iteration - ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes()); + ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes()); ShardRouting shardRouting = shardIterator.nextOrNull(); assertThat(shardRouting, notNullValue()); assertThat(shardRouting.currentNodeId(), equalTo("node1")); @@ -266,7 +271,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { assertThat(shardRouting, notNullValue()); assertThat(shardRouting.currentNodeId(), equalTo("node2")); - shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes()); + shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes()); shardRouting = shardIterator.nextOrNull(); assertThat(shardRouting, notNullValue()); assertThat(shardRouting.currentNodeId(), equalTo("node1")); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index d657d4df809..c73bb8576a7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -14,9 +14,15 @@ import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.xpack.core.security.authc.AuthenticationField; +import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; +import java.util.Map; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Utility class to help with the execution of requests made using a {@link Client} such that they @@ -24,6 +30,12 @@ import java.util.function.Supplier; */ public final class ClientHelper { + /** + * List of headers that are related to security + */ + public static final Set SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER, + AuthenticationField.AUTHENTICATION_KEY); + public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; public static final String SECURITY_ORIGIN = "security"; public static final String WATCHER_ORIGIN = "watcher"; @@ -78,6 +90,82 @@ public final class ClientHelper { } } + /** + * Execute a client operation and return the response, try to run an action + * with least privileges, when headers exist + * + * @param headers + * Request headers, ideally including security headers + * @param origin + * The origin to fall back to if there are no security headers + * @param client + * The client used to query + * @param supplier + * The action to run + * @return An instance of the response class + */ + public static T executeWithHeaders(Map headers, String origin, Client client, + Supplier supplier) { + Map filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // no security headers, we will have to use the xpack internal user for + // our execution by specifying the origin + if (filteredHeaders.isEmpty()) { + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), origin)) { + return supplier.get(); + } + } else { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); + return supplier.get(); + } + } + } + + /** + * Execute a client operation asynchronously, try to run an action with + * least privileges, when headers exist + * + * @param headers + * Request headers, ideally including security headers + * @param origin + * The origin to fall back to if there are no security headers + * @param action + * The action to execute + * @param request + * The request object for the action + * @param listener + * The listener to call when the action is complete + */ + public static > void executeWithHeadersAsync( + Map headers, String origin, Client client, Action action, Request request, + ActionListener listener) { + + Map filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + final ThreadContext threadContext = client.threadPool().getThreadContext(); + + // No headers (e.g. security not installed/in use) so execute as origin + if (filteredHeaders.isEmpty()) { + ClientHelper.executeAsyncWithOrigin(client, origin, action, request, listener); + } else { + // Otherwise stash the context and copy in the saved headers before executing + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { + client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } + } + + private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { + final ThreadContext.StoredContext storedContext = threadContext.stashContext(); + threadContext.copyHeaders(headers.entrySet()); + return storedContext; + } + private static final class ClientWithOrigin extends FilterClient { private final String origin; @@ -98,5 +186,4 @@ public final class ClientHelper { } } } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java deleted file mode 100644 index a76c5c51e8d..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlClientHelper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.security.authc.AuthenticationField; -import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; - -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -/** - * A helper class for actions which decides if we should run via the _xpack user and set ML as origin - * or if we should use the run_as functionality by setting the correct headers - */ -public class MlClientHelper { - - /** - * List of headers that are related to security - */ - public static final Set SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER, - AuthenticationField.AUTHENTICATION_KEY); - - /** - * Execute a client operation and return the response, try to run a datafeed search with least privileges, when headers exist - * - * @param datafeedConfig The config for a datafeed - * @param client The client used to query - * @param supplier The action to run - * @return An instance of the response class - */ - public static T execute(DatafeedConfig datafeedConfig, Client client, Supplier supplier) { - return execute(datafeedConfig.getHeaders(), client, supplier); - } - - /** - * Execute a client operation and return the response, try to run an action with least privileges, when headers exist - * - * @param headers Request headers, ideally including security headers - * @param client The client used to query - * @param supplier The action to run - * @return An instance of the response class - */ - public static T execute(Map headers, Client client, Supplier supplier) { - // no headers, we will have to use the xpack internal user for our execution by specifying the ml origin - if (headers == null || headers.isEmpty()) { - try (ThreadContext.StoredContext ignore = ClientHelper.stashWithOrigin(client.threadPool().getThreadContext(), - ClientHelper.ML_ORIGIN)) { - return supplier.get(); - } - } else { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { - Map filteredHeaders = headers.entrySet().stream() - .filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); - return supplier.get(); - } - } - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index b09a7463ffd..b709e32946e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -23,6 +23,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; @@ -35,8 +38,6 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.NameResolver; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Collection; @@ -303,7 +304,7 @@ public class MlMetadata implements MetaData.Custom { // Adjust the request, adding security headers from the current thread context DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig); Map headers = threadContext.getHeaders().entrySet().stream() - .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); builder.setHeaders(headers); datafeedConfig = builder.build(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 6255be9f438..444532a7e3f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -21,7 +21,7 @@ import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -304,7 +304,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (threadContext != null) { // Adjust the request, adding security headers from the current thread context Map headers = threadContext.getHeaders().entrySet().stream() - .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); builder.setHeaders(headers); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/WatcherXContentParser.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/WatcherXContentParser.java index 6b97512c237..c841d6c11ec 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/WatcherXContentParser.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/WatcherXContentParser.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.core.watcher.support.xcontent; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.DeprecationHandler; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java index a243b8c995d..95361dbff42 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java @@ -9,15 +9,33 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.security.authc.AuthenticationField; +import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -97,7 +115,7 @@ public class ClientHelperTests extends ESTestCase { assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); assertNull(threadContext.getHeader(headerName)); latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); return null; }).when(client).execute(anyObject(), anyObject(), anyObject()); @@ -130,7 +148,7 @@ public class ClientHelperTests extends ESTestCase { assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); assertNull(threadContext.getHeader(headerName)); latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); return null; }).when(client).execute(anyObject(), anyObject(), anyObject()); @@ -139,4 +157,179 @@ public class ClientHelperTests extends ESTestCase { clientWithOrigin.execute(null, null, listener); latch.await(); } + + public void testExecuteWithHeadersAsyncNoHeaders() throws InterruptedException { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + final CountDownLatch latch = new CountDownLatch(2); + final ActionListener listener = ActionListener.wrap(v -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + }, e -> fail(e.getMessage())); + + doAnswer(invocationOnMock -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SearchRequest request = new SearchRequest("foo"); + + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeadersAsync(Collections.emptyMap(), originName, client, SearchAction.INSTANCE, request, listener); + + latch.await(); + } + + public void testExecuteWithHeadersAsyncWrongHeaders() throws InterruptedException { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + final CountDownLatch latch = new CountDownLatch(2); + final ActionListener listener = ActionListener.wrap(v -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + }, e -> fail(e.getMessage())); + + doAnswer(invocationOnMock -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SearchRequest request = new SearchRequest("foo"); + Map headers = new HashMap<>(1); + headers.put("foo", "foo"); + headers.put("bar", "bar"); + + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeadersAsync(headers, originName, client, SearchAction.INSTANCE, request, listener); + + latch.await(); + } + + public void testExecuteWithHeadersAsyncWithHeaders() throws Exception { + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + final CountDownLatch latch = new CountDownLatch(2); + final ActionListener listener = ActionListener.wrap(v -> { + assertTrue(threadContext.getHeaders().isEmpty()); + latch.countDown(); + }, e -> fail(e.getMessage())); + + doAnswer(invocationOnMock -> { + assertThat(threadContext.getHeaders().size(), equalTo(2)); + assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo")); + assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar")); + latch.countDown(); + ((ActionListener) invocationOnMock.getArguments()[2]).onResponse(null); + return null; + }).when(client).execute(anyObject(), anyObject(), anyObject()); + + SearchRequest request = new SearchRequest("foo"); + Map headers = new HashMap<>(1); + headers.put("es-security-runas-user", "foo"); + headers.put("_xpack_security_authentication", "bar"); + + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeadersAsync(headers, originName, client, SearchAction.INSTANCE, request, listener); + + latch.await(); + } + + public void testExecuteWithHeadersNoHeaders() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + assertExecutionWithOrigin(Collections.emptyMap(), client); + } + + public void testExecuteWithHeaders() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + Map headers = MapBuilder. newMapBuilder().put(AuthenticationField.AUTHENTICATION_KEY, "anything") + .put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything").map(); + + assertRunAsExecution(headers, h -> { + assertThat(h.keySet(), hasSize(2)); + assertThat(h, hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything")); + assertThat(h, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")); + }, client); + } + + public void testExecuteWithHeadersNoSecurityHeaders() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + + PlainActionFuture searchFuture = PlainActionFuture.newFuture(); + searchFuture.onResponse(new SearchResponse()); + when(client.search(any())).thenReturn(searchFuture); + Map unrelatedHeaders = MapBuilder. newMapBuilder().put(randomAlphaOfLength(10), "anything").map(); + + assertExecutionWithOrigin(unrelatedHeaders, client); + } + + /** + * This method executes a search and checks if the thread context was + * enriched with the ml origin + */ + private void assertExecutionWithOrigin(Map storedHeaders, Client client) { + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeaders(storedHeaders, originName, client, () -> { + Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); + assertThat(origin, is(originName)); + + // Check that headers are not set + Map headers = client.threadPool().getThreadContext().getHeaders(); + assertThat(headers, not(hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything"))); + assertThat(headers, not(hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything"))); + + return client.search(new SearchRequest()).actionGet(); + }); + } + + /** + * This method executes a search and ensures no stashed origin thread + * context was created, so that the regular node client was used, to emulate + * a run_as function + */ + public void assertRunAsExecution(Map storedHeaders, Consumer> consumer, Client client) { + String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN); + ClientHelper.executeWithHeaders(storedHeaders, originName, client, () -> { + Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); + assertThat(origin, is(nullValue())); + + consumer.accept(client.threadPool().getThreadContext().getHeaders()); + return client.search(new SearchRequest()).actionGet(); + }); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 98ba2caa408..2ffb318dc4f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -16,8 +16,8 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MLMetadataField; -import org.elasticsearch.xpack.core.ml.MlClientHelper; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; @@ -64,7 +64,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction headers = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); previewDatafeed.setHeaders(headers); // NB: this is using the client from the transport layer, NOT the internal client. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index f9089b6bc17..d83865b751f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -14,7 +14,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; @@ -112,7 +112,7 @@ class AggregationDataExtractor implements DataExtractor { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } private SearchRequestBuilder buildSearchRequest() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 61298f16abd..2e157c3d1e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -15,10 +15,10 @@ import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.min.Min; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.io.IOException; import java.io.InputStream; @@ -135,7 +135,7 @@ public class ChunkedDataExtractor implements DataExtractor { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } private Optional getNextStream() throws IOException { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index 57601406e71..24174730e2d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -20,7 +20,7 @@ import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.ml.utils.DomainSplitFunction; @@ -100,7 +100,7 @@ class ScrollDataExtractor implements DataExtractor { } protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get); + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } private SearchRequestBuilder buildSearchRequest(long start) { @@ -211,7 +211,8 @@ class ScrollDataExtractor implements DataExtractor { } protected SearchResponse executeSearchScrollRequest(String scrollId) { - return MlClientHelper.execute(context.headers, client, () -> SearchScrollAction.INSTANCE.newRequestBuilder(client) + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, + () -> SearchScrollAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) .setScrollId(scrollId) .get()); @@ -226,7 +227,8 @@ class ScrollDataExtractor implements DataExtractor { if (scrollId != null) { ClearScrollRequest request = new ClearScrollRequest(); request.addScrollId(scrollId); - MlClientHelper.execute(context.headers, client, () -> client.execute(ClearScrollAction.INSTANCE, request).actionGet()); + ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, + () -> client.execute(ClearScrollAction.INSTANCE, request).actionGet()); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index f4f359580db..2c6e0deaebd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -12,12 +12,12 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.client.Client; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.xpack.core.ml.MlClientHelper; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.MlStrings; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.util.Objects; @@ -76,7 +76,7 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*") .toArray(size -> new String[size]); fieldCapabilitiesRequest.fields(requestFields); - MlClientHelper.execute(datafeed, client, () -> { + ClientHelper. executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); // This response gets discarded - the listener handles the real response return null; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java deleted file mode 100644 index 284e746e67d..00000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlClientHelperTests.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlClientHelper; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.security.authc.AuthenticationField; -import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; -import org.junit.Before; - -import java.util.Collections; -import java.util.Map; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class MlClientHelperTests extends ESTestCase { - - private Client client = mock(Client.class); - - @Before - public void setupMocks() { - ThreadPool threadPool = mock(ThreadPool.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - when(threadPool.getThreadContext()).thenReturn(threadContext); - when(client.threadPool()).thenReturn(threadPool); - - PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); - when(client.search(any())).thenReturn(searchFuture); - } - - public void testEmptyHeaders() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); - builder.setIndices(Collections.singletonList("foo-index")); - - assertExecutionWithOrigin(builder.build()); - } - - public void testWithHeaders() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); - builder.setIndices(Collections.singletonList("foo-index")); - Map headers = MapBuilder.newMapBuilder() - .put(AuthenticationField.AUTHENTICATION_KEY, "anything") - .put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything") - .map(); - builder.setHeaders(headers); - - assertRunAsExecution(builder.build(), h -> { - assertThat(h.keySet(), hasSize(2)); - assertThat(h, hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything")); - assertThat(h, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")); - }); - } - - public void testFilteredHeaders() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo"); - builder.setIndices(Collections.singletonList("foo-index")); - Map unrelatedHeaders = MapBuilder.newMapBuilder() - .put(randomAlphaOfLength(10), "anything") - .map(); - builder.setHeaders(unrelatedHeaders); - - assertRunAsExecution(builder.build(), h -> assertThat(h.keySet(), hasSize(0))); - } - - /** - * This method executes a search and checks if the thread context was enriched with the ml origin - */ - private void assertExecutionWithOrigin(DatafeedConfig datafeedConfig) { - MlClientHelper.execute(datafeedConfig, client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(ML_ORIGIN)); - - // Check that headers are not set - Map headers = client.threadPool().getThreadContext().getHeaders(); - assertThat(headers, not(hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything"))); - assertThat(headers, not(hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything"))); - - return client.search(new SearchRequest()).actionGet(); - }); - } - - /** - * This method executes a search and ensures no stashed origin thread context was created, so that the regular node - * client was used, to emulate a run_as function - */ - public void assertRunAsExecution(DatafeedConfig datafeedConfig, Consumer> consumer) { - MlClientHelper.execute(datafeedConfig, client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(nullValue())); - - consumer.accept(client.threadPool().getThreadContext().getHeaders()); - return client.search(new SearchRequest()).actionGet(); - }); - } -} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java deleted file mode 100644 index 20e4ba120cd..00000000000 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupClientHelper.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.rollup.job; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.support.ContextPreservingActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; -import org.elasticsearch.xpack.rollup.Rollup; - -import java.util.Map; -import java.util.function.Supplier; -import java.util.stream.Collectors; - - -/** - * Helper class to execute actions with authentication headers cached in the rollup job (if they exist, otherwise Origin) - */ -public class RollupClientHelper { - - @SuppressWarnings("try") - public static > void executeAsync( - Client client, RollupJob job, Action action, Request request, - ActionListener listener) { - - Map filteredHeaders = job.getHeaders().entrySet().stream() - .filter(e -> Rollup.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - final ThreadContext threadContext = client.threadPool().getThreadContext(); - - // No headers (e.g. security not installed/in use) so execute as rollup origin - if (filteredHeaders.isEmpty()) { - ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ROLLUP_ORIGIN, action, request, listener); - } else { - // Otherwise stash the context and copy in the saved headers before executing - final Supplier supplier = threadContext.newRestorableContext(false); - try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { - client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); - } - } - } - - private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { - final ThreadContext.StoredContext storedContext = threadContext.stashContext(); - threadContext.copyHeaders(headers.entrySet()); - return storedContext; - } -} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index f357d579c82..425629c248c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -23,6 +23,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; @@ -103,12 +104,14 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, nextPhase); + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE, request, + nextPhase); } @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { - RollupClientHelper.executeAsync(client, job, BulkAction.INSTANCE, request, nextPhase); + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, BulkAction.INSTANCE, request, + nextPhase); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java deleted file mode 100644 index b2d098d458e..00000000000 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupClientHelperTests.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.rollup.job; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; -import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; -import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class RollupClientHelperTests extends ESTestCase { - - @SuppressWarnings("unchecked") - public void testNoHeaders() throws InterruptedException { - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - final CountDownLatch latch = new CountDownLatch(2); - final ActionListener listener = ActionListener.wrap(v -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - doAnswer(invocationOnMock -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); - return null; - }).when(client).execute(anyObject(), anyObject(), anyObject()); - - SearchRequest request = new SearchRequest("foo"); - - RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(); - RollupJob job = new RollupJob(config, Collections.emptyMap()); - - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener); - - latch.await(); - } - - @SuppressWarnings("unchecked") - public void testWrongHeaders() throws InterruptedException { - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - final CountDownLatch latch = new CountDownLatch(2); - final ActionListener listener = ActionListener.wrap(v -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - doAnswer(invocationOnMock -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); - return null; - }).when(client).execute(anyObject(), anyObject(), anyObject()); - - SearchRequest request = new SearchRequest("foo"); - Map headers = new HashMap<>(1); - headers.put("foo", "foo"); - headers.put("bar", "bar"); - RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(); - RollupJob job = new RollupJob(config, headers); - - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener); - - latch.await(); - } - - @SuppressWarnings("unchecked") - public void testWithHeaders() throws Exception { - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(threadContext); - - final CountDownLatch latch = new CountDownLatch(2); - final ActionListener listener = ActionListener.wrap(v -> { - assertTrue(threadContext.getHeaders().isEmpty()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - doAnswer(invocationOnMock -> { - assertThat(threadContext.getHeaders().size(), equalTo(2)); - assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo")); - assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar")); - latch.countDown(); - ((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null); - return null; - }).when(client).execute(anyObject(), anyObject(), anyObject()); - - SearchRequest request = new SearchRequest("foo"); - Map headers = new HashMap<>(1); - headers.put("es-security-runas-user", "foo"); - headers.put("_xpack_security_authentication", "bar"); - RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(); - RollupJob job = new RollupJob(config, headers); - - RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener); - - latch.await(); - } - -} diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 1bd1aa59f70..9a18b6c857d 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -203,10 +203,6 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin { public static final Setting MAX_STOP_TIMEOUT_SETTING = Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope); - // list of headers that will be stored when a watch is stored - public static final Set HEADER_FILTERS = - new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")); - public static final ScriptContext SCRIPT_SEARCH_CONTEXT = new ScriptContext<>("xpack", SearchScript.Factory.class); // TODO: remove this context when each xpack script use case has their own contexts diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java deleted file mode 100644 index 1019f5a423e..00000000000 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.xpack.core.watcher.watch.Watch; - -import java.util.Map; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; - -/** - * A helper class which decides if we should run via the xpack user and set watcher as origin or - * if we should use the run_as functionality by setting the correct headers - */ -public class WatcherClientHelper { - - /** - * Execute a client operation and return the response, try to run with least privileges, when headers exist - * - * @param watch The watch in which context this method gets executed in - * @param client The client used to query - * @param supplier The action to run - * @param The client response class this should return - * @return An instance of the response class - */ - public static T execute(Watch watch, Client client, Supplier supplier) { - // no headers, we will have to use the xpack internal user for our execution by specifying the watcher origin - if (watch.status().getHeaders().isEmpty()) { - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - return supplier.get(); - } - } else { - try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { - Map filteredHeaders = watch.status().getHeaders().entrySet().stream() - .filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); - return supplier.get(); - } - } - } -} diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java index e49732f0cb5..a156e68a4b1 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.actions.Action; import org.elasticsearch.xpack.core.watcher.actions.Action.Result.Status; import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; @@ -24,7 +25,6 @@ import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.WatcherClientHelper; import org.elasticsearch.xpack.watcher.support.ArrayObjectIterator; import org.joda.time.DateTime; @@ -96,7 +96,7 @@ public class ExecutableIndexAction extends ExecutableAction { new XContentSource(indexRequest.source(), XContentType.JSON)); } - IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client, + IndexResponse response = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client, () -> client.index(indexRequest).actionGet(indexDefaultTimeout)); try (XContentBuilder builder = jsonBuilder()) { indexResponseToXContent(builder, response); @@ -137,7 +137,7 @@ public class ExecutableIndexAction extends ExecutableAction { } bulkRequest.add(indexRequest); } - BulkResponse bulkResponse = WatcherClientHelper.execute(ctx.watch(), client, + BulkResponse bulkResponse = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client, () -> client.bulk(bulkRequest).actionGet(bulkDefaultTimeout)); try (XContentBuilder jsonBuilder = jsonBuilder().startArray()) { for (BulkItemResponse item : bulkResponse) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java index 83a4f1f85e7..4aced1b6c03 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/search/ExecutableSearchInput.java @@ -20,10 +20,10 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.input.ExecutableInput; import org.elasticsearch.xpack.core.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.WatcherClientHelper; import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService; @@ -71,8 +71,8 @@ public class ExecutableSearchInput extends ExecutableInput client.search(searchRequest).actionGet(timeout)); + final SearchResponse response = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, + client, () -> client.search(searchRequest).actionGet(timeout)); if (logger.isDebugEnabled()) { logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits()); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java index 03dbf88fb0d..1b408bc5e64 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transform/search/ExecutableSearchTransform.java @@ -15,10 +15,10 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.core.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.WatcherClientHelper; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService; @@ -49,7 +49,8 @@ public class ExecutableSearchTransform extends ExecutableTransform client.search(searchRequest).actionGet(timeout)); + SearchResponse resp = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client, + () -> client.search(searchRequest).actionGet(timeout)); return new SearchTransform.Result(request, new Payload.XContent(resp)); } catch (Exception e) { logger.error((Supplier) () -> new ParameterizedMessage("failed to execute [{}] transform for [{}]", TYPE, ctx.id()), e); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index ba380ef8420..840e43e038e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -20,12 +20,12 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; @@ -83,7 +83,7 @@ public class TransportPutWatchAction extends WatcherTransportAction filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey())) + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); watch.status().setHeaders(filteredHeaders); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java deleted file mode 100644 index f1908ccefc2..00000000000 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherClientHelperTests.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; -import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder; -import org.junit.Before; - -import java.util.Collections; -import java.util.Map; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME; -import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class WatcherClientHelperTests extends ESTestCase { - - private Client client = mock(Client.class); - - @Before - public void setupMocks() { - PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); - when(client.search(any())).thenReturn(searchFuture); - - ThreadPool threadPool = mock(ThreadPool.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - when(threadPool.getThreadContext()).thenReturn(threadContext); - when(client.threadPool()).thenReturn(threadPool); - } - - public void testEmptyHeaders() { - WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock(); - when(ctx.watch().status().getHeaders()).thenReturn(Collections.emptyMap()); - - assertExecutionWithOrigin(ctx); - } - - public void testWithHeaders() { - WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock(); - Map watchStatusHeaders = MapBuilder.newMapBuilder() - .put("es-security-runas-user", "anything") - .put("_xpack_security_authentication", "anything") - .map(); - when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders); - - assertRunAsExecution(ctx, headers -> { - assertThat(headers.keySet(), hasSize(2)); - assertThat(headers, hasEntry("es-security-runas-user", "anything")); - assertThat(headers, hasEntry("_xpack_security_authentication", "anything")); - }); - } - - public void testFilteredHeaders() { - WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock(); - Map watchStatusHeaders = MapBuilder.newMapBuilder() - .put(randomAlphaOfLength(10), "anything") - .map(); - when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders); - - assertRunAsExecution(ctx, headers -> { - assertThat(headers.keySet(), hasSize(0)); - }); - } - - /** - * This method executes a search and checks if the thread context was enriched with the watcher origin - */ - private void assertExecutionWithOrigin(WatchExecutionContext ctx) { - WatcherClientHelper.execute(ctx.watch(), client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(WATCHER_ORIGIN)); - - // check that headers are not set - Map headers = client.threadPool().getThreadContext().getHeaders(); - assertThat(headers, not(hasEntry("es-security-runas-user", "anything"))); - assertThat(headers, not(hasEntry("_xpack_security_authentication", "anything"))); - - return client.search(new SearchRequest()).actionGet(); - }); - - } - - /** - * This method executes a search and ensures no stashed origin thread context was created, so that the regular node - * client was used, to emulate a run_as function - */ - public void assertRunAsExecution(WatchExecutionContext ctx, Consumer> consumer) { - WatcherClientHelper.execute(ctx.watch(), client, () -> { - Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME); - assertThat(origin, is(nullValue())); - - Map headers = client.threadPool().getThreadContext().getHeaders(); - consumer.accept(headers); - return client.search(new SearchRequest()).actionGet(); - }); - - } -} \ No newline at end of file diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java index b91acc1f969..07fb45936e9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java @@ -57,7 +57,7 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC private MockWebServer webServer = new MockWebServer(); private static Boolean encryptSensitiveData = null; - private static byte[] encryptionKey = CryptoServiceTests.generateKey(); + private static final byte[] encryptionKey = CryptoServiceTests.generateKey(); @Before public void init() throws Exception { @@ -155,6 +155,14 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC assertThat(webServer.requests(), hasSize(1)); assertThat(webServer.requests().get(0).getHeader("Authorization"), is(ApplicableBasicAuth.headerValue(USERNAME, PASSWORD.toCharArray()))); + + // now trigger the by the scheduler and make sure that the password is also correctly transmitted + webServer.enqueue(new MockResponse().setResponseCode(200).setBody( + BytesReference.bytes(jsonBuilder().startObject().field("key", "value").endObject()).utf8ToString())); + timeWarp().trigger("_id"); + assertThat(webServer.requests(), hasSize(2)); + assertThat(webServer.requests().get(1).getHeader("Authorization"), + is(ApplicableBasicAuth.headerValue(USERNAME, PASSWORD.toCharArray()))); } public void testWebhookAction() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java index ffefe3bab74..4ed07804b71 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; @@ -20,10 +19,10 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.junit.Before; @@ -81,7 +80,7 @@ public class TransportPutWatchActionTests extends ESTestCase { public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { // set up threadcontext with some arbitrary info - String headerName = randomFrom(Watcher.HEADER_FILTERS); + String headerName = randomFrom(ClientHelper.SECURITY_HEADER_FILTERS); threadContext.putHeader(headerName, randomAlphaOfLength(10)); threadContext.putHeader(randomAlphaOfLength(10), "doesntmatter"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index 58f5c8f4a26..63f4f95ae21 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -53,6 +53,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { @Override public void start(Collection jobs) { + jobs.forEach(this::add); } @Override diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/30_ml_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/30_ml_jobs_crud.yml index 7bfbb5ad8c4..4458b517448 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/30_ml_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/30_ml_jobs_crud.yml @@ -87,6 +87,13 @@ setup: xpack.ml.close_job: job_id: mixed-cluster-job +# Wait for indices to be fully allocated before +# killing the node + - do: + cluster.health: + index: [".ml-state", ".ml-anomalies-shared"] + wait_for_status: green + --- "Test get job with rules": diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml index 61f39107a2d..da36da301c1 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/30_ml_jobs_crud.yml @@ -49,6 +49,13 @@ job_id: old-cluster-job - match: { count: 1 } +# Wait for indices to be fully allocated before +# killing the node + - do: + cluster.health: + index: [".ml-state", ".ml-anomalies-shared"] + wait_for_status: green + --- "Put job on the old cluster with the default model memory limit and post some data": - do: @@ -96,6 +103,13 @@ job_id: no-model-memory-limit-job - match: { count: 201 } +# Wait for indices to be fully allocated before +# killing the node + - do: + cluster.health: + index: [".ml-state", ".ml-anomalies-shared"] + wait_for_status: green + --- "Put job with empty strings in the configuration": - do: