Merge branch 'master' into index-lifecycle
This commit is contained in:
commit
f4fad07113
|
@ -9,7 +9,7 @@ When using the `has_child` query it is important to use the `PreBuiltTransportCl
|
|||
--------------------------------------------------
|
||||
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
|
||||
TransportClient client = new PreBuiltTransportClient(settings);
|
||||
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9300)));
|
||||
client.addTransportAddress(new TransportAddress(new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9300)));
|
||||
--------------------------------------------------
|
||||
|
||||
Otherwise the parent-join module doesn't get loaded and the `has_child` query can't be used from the transport client.
|
||||
|
|
|
@ -544,20 +544,20 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
|
||||
static class AttributesKey {
|
||||
|
||||
final String[] attributes;
|
||||
final List<String> attributes;
|
||||
|
||||
AttributesKey(String[] attributes) {
|
||||
AttributesKey(List<String> attributes) {
|
||||
this.attributes = attributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(attributes);
|
||||
return attributes.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return obj instanceof AttributesKey && Arrays.equals(attributes, ((AttributesKey) obj).attributes);
|
||||
return obj instanceof AttributesKey && attributes.equals(((AttributesKey) obj).attributes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -621,11 +621,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return Collections.unmodifiableList(to);
|
||||
}
|
||||
|
||||
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
|
||||
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes) {
|
||||
return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed());
|
||||
}
|
||||
|
||||
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int seed) {
|
||||
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes, int seed) {
|
||||
AttributesKey key = new AttributesKey(attributes);
|
||||
AttributesRoutings activeRoutings = getActiveAttribute(key, nodes);
|
||||
AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes);
|
||||
|
|
|
@ -39,6 +39,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -49,7 +50,7 @@ public class OperationRouting extends AbstractComponent {
|
|||
Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", true,
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
|
||||
private String[] awarenessAttributes;
|
||||
private List<String> awarenessAttributes;
|
||||
private boolean useAdaptiveReplicaSelection;
|
||||
|
||||
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
|
||||
|
@ -65,7 +66,7 @@ public class OperationRouting extends AbstractComponent {
|
|||
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
|
||||
}
|
||||
|
||||
private void setAwarenessAttributes(String[] awarenessAttributes) {
|
||||
private void setAwarenessAttributes(List<String> awarenessAttributes) {
|
||||
this.awarenessAttributes = awarenessAttributes;
|
||||
}
|
||||
|
||||
|
@ -139,7 +140,7 @@ public class OperationRouting extends AbstractComponent {
|
|||
@Nullable ResponseCollectorService collectorService,
|
||||
@Nullable Map<String, Long> nodeCounts) {
|
||||
if (preference == null || preference.isEmpty()) {
|
||||
if (awarenessAttributes.length == 0) {
|
||||
if (awarenessAttributes.isEmpty()) {
|
||||
if (useAdaptiveReplicaSelection) {
|
||||
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
|
||||
} else {
|
||||
|
@ -174,7 +175,7 @@ public class OperationRouting extends AbstractComponent {
|
|||
}
|
||||
// no more preference
|
||||
if (index == -1 || index == preference.length() - 1) {
|
||||
if (awarenessAttributes.length == 0) {
|
||||
if (awarenessAttributes.isEmpty()) {
|
||||
if (useAdaptiveReplicaSelection) {
|
||||
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
|
||||
} else {
|
||||
|
@ -218,7 +219,7 @@ public class OperationRouting extends AbstractComponent {
|
|||
// shard ID into the hash of the user-supplied preference key.
|
||||
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
|
||||
}
|
||||
if (awarenessAttributes.length == 0) {
|
||||
if (awarenessAttributes.isEmpty()) {
|
||||
return indexShard.activeInitializingShardsIt(routingHash);
|
||||
} else {
|
||||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing.allocation.decider;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectIntHashMap;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -34,6 +35,8 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
/**
|
||||
* This {@link AllocationDecider} controls shard allocation based on
|
||||
* {@code awareness} key-value pairs defined in the node configuration.
|
||||
|
@ -78,13 +81,13 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|||
|
||||
public static final String NAME = "awareness";
|
||||
|
||||
public static final Setting<String[]> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
|
||||
new Setting<>("cluster.routing.allocation.awareness.attributes", "", s -> Strings.tokenizeToStringArray(s, ","), Property.Dynamic,
|
||||
public static final Setting<List<String>> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
|
||||
Setting.listSetting("cluster.routing.allocation.awareness.attributes", emptyList(), Function.identity(), Property.Dynamic,
|
||||
Property.NodeScope);
|
||||
public static final Setting<Settings> CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING =
|
||||
Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope);
|
||||
|
||||
private volatile String[] awarenessAttributes;
|
||||
private volatile List<String> awarenessAttributes;
|
||||
|
||||
private volatile Map<String, List<String>> forcedAwarenessAttributes;
|
||||
|
||||
|
@ -109,7 +112,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|||
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
|
||||
}
|
||||
|
||||
private void setAwarenessAttributes(String[] awarenessAttributes) {
|
||||
private void setAwarenessAttributes(List<String> awarenessAttributes) {
|
||||
this.awarenessAttributes = awarenessAttributes;
|
||||
}
|
||||
|
||||
|
@ -124,7 +127,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|||
}
|
||||
|
||||
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
|
||||
if (awarenessAttributes.length == 0) {
|
||||
if (awarenessAttributes.isEmpty()) {
|
||||
return allocation.decision(Decision.YES, NAME,
|
||||
"allocation awareness is not enabled, set cluster setting [%s] to enable it",
|
||||
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey());
|
||||
|
@ -138,7 +141,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
|
|||
return allocation.decision(Decision.NO, NAME,
|
||||
"node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]",
|
||||
awarenessAttribute, CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
|
||||
allocation.debugDecision() ? Strings.arrayToCommaDelimitedString(awarenessAttributes) : null);
|
||||
allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null);
|
||||
}
|
||||
|
||||
// build attr_value -> nodes map
|
||||
|
|
|
@ -24,11 +24,13 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class IndexShardRoutingTableTests extends ESTestCase {
|
||||
public void testEqualsAttributesKey() {
|
||||
String[] attr1 = {"a"};
|
||||
String[] attr2 = {"b"};
|
||||
List<String> attr1 = Arrays.asList("a");
|
||||
List<String> attr2 = Arrays.asList("b");
|
||||
IndexShardRoutingTable.AttributesKey attributesKey1 = new IndexShardRoutingTable.AttributesKey(attr1);
|
||||
IndexShardRoutingTable.AttributesKey attributesKey2 = new IndexShardRoutingTable.AttributesKey(attr1);
|
||||
IndexShardRoutingTable.AttributesKey attributesKey3 = new IndexShardRoutingTable.AttributesKey(attr2);
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -50,7 +51,6 @@ import static java.util.Collections.singletonMap;
|
|||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -224,11 +224,16 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
public void testAttributePreferenceRouting() {
|
||||
AllocationService strategy = createAllocationService(Settings.builder()
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
|
||||
.put("cluster.routing.allocation.awareness.attributes", "rack_id,zone")
|
||||
.build());
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
|
||||
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always");
|
||||
if (randomBoolean()) {
|
||||
settings.put("cluster.routing.allocation.awareness.attributes", " rack_id, zone ");
|
||||
} else {
|
||||
settings.putList("cluster.routing.allocation.awareness.attributes", "rack_id", "zone");
|
||||
}
|
||||
|
||||
AllocationService strategy = createAllocationService(settings.build());
|
||||
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
||||
|
@ -258,7 +263,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
|
||||
// after all are started, check routing iteration
|
||||
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
|
||||
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
|
||||
ShardRouting shardRouting = shardIterator.nextOrNull();
|
||||
assertThat(shardRouting, notNullValue());
|
||||
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
|
||||
|
@ -266,7 +271,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
|
|||
assertThat(shardRouting, notNullValue());
|
||||
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
|
||||
|
||||
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
|
||||
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
|
||||
shardRouting = shardIterator.nextOrNull();
|
||||
assertThat(shardRouting, notNullValue());
|
||||
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
|
||||
|
|
|
@ -14,9 +14,15 @@ import org.elasticsearch.action.support.ContextPreservingActionListener;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.FilterClient;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Utility class to help with the execution of requests made using a {@link Client} such that they
|
||||
|
@ -24,6 +30,12 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public final class ClientHelper {
|
||||
|
||||
/**
|
||||
* List of headers that are related to security
|
||||
*/
|
||||
public static final Set<String> SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER,
|
||||
AuthenticationField.AUTHENTICATION_KEY);
|
||||
|
||||
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
|
||||
public static final String SECURITY_ORIGIN = "security";
|
||||
public static final String WATCHER_ORIGIN = "watcher";
|
||||
|
@ -78,6 +90,82 @@ public final class ClientHelper {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a client operation and return the response, try to run an action
|
||||
* with least privileges, when headers exist
|
||||
*
|
||||
* @param headers
|
||||
* Request headers, ideally including security headers
|
||||
* @param origin
|
||||
* The origin to fall back to if there are no security headers
|
||||
* @param client
|
||||
* The client used to query
|
||||
* @param supplier
|
||||
* The action to run
|
||||
* @return An instance of the response class
|
||||
*/
|
||||
public static <T extends ActionResponse> T executeWithHeaders(Map<String, String> headers, String origin, Client client,
|
||||
Supplier<T> supplier) {
|
||||
Map<String, String> filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
// no security headers, we will have to use the xpack internal user for
|
||||
// our execution by specifying the origin
|
||||
if (filteredHeaders.isEmpty()) {
|
||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), origin)) {
|
||||
return supplier.get();
|
||||
}
|
||||
} else {
|
||||
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
|
||||
client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet());
|
||||
return supplier.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a client operation asynchronously, try to run an action with
|
||||
* least privileges, when headers exist
|
||||
*
|
||||
* @param headers
|
||||
* Request headers, ideally including security headers
|
||||
* @param origin
|
||||
* The origin to fall back to if there are no security headers
|
||||
* @param action
|
||||
* The action to execute
|
||||
* @param request
|
||||
* The request object for the action
|
||||
* @param listener
|
||||
* The listener to call when the action is complete
|
||||
*/
|
||||
public static <Request extends ActionRequest, Response extends ActionResponse,
|
||||
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void executeWithHeadersAsync(
|
||||
Map<String, String> headers, String origin, Client client, Action<Request, Response, RequestBuilder> action, Request request,
|
||||
ActionListener<Response> listener) {
|
||||
|
||||
Map<String, String> filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
final ThreadContext threadContext = client.threadPool().getThreadContext();
|
||||
|
||||
// No headers (e.g. security not installed/in use) so execute as origin
|
||||
if (filteredHeaders.isEmpty()) {
|
||||
ClientHelper.executeAsyncWithOrigin(client, origin, action, request, listener);
|
||||
} else {
|
||||
// Otherwise stash the context and copy in the saved headers before executing
|
||||
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
||||
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
|
||||
client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
|
||||
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
|
||||
threadContext.copyHeaders(headers.entrySet());
|
||||
return storedContext;
|
||||
}
|
||||
|
||||
private static final class ClientWithOrigin extends FilterClient {
|
||||
|
||||
private final String origin;
|
||||
|
@ -98,5 +186,4 @@ public final class ClientHelper {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,71 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ml;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A helper class for actions which decides if we should run via the _xpack user and set ML as origin
|
||||
* or if we should use the run_as functionality by setting the correct headers
|
||||
*/
|
||||
public class MlClientHelper {
|
||||
|
||||
/**
|
||||
* List of headers that are related to security
|
||||
*/
|
||||
public static final Set<String> SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER,
|
||||
AuthenticationField.AUTHENTICATION_KEY);
|
||||
|
||||
/**
|
||||
* Execute a client operation and return the response, try to run a datafeed search with least privileges, when headers exist
|
||||
*
|
||||
* @param datafeedConfig The config for a datafeed
|
||||
* @param client The client used to query
|
||||
* @param supplier The action to run
|
||||
* @return An instance of the response class
|
||||
*/
|
||||
public static <T extends ActionResponse> T execute(DatafeedConfig datafeedConfig, Client client, Supplier<T> supplier) {
|
||||
return execute(datafeedConfig.getHeaders(), client, supplier);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a client operation and return the response, try to run an action with least privileges, when headers exist
|
||||
*
|
||||
* @param headers Request headers, ideally including security headers
|
||||
* @param client The client used to query
|
||||
* @param supplier The action to run
|
||||
* @return An instance of the response class
|
||||
*/
|
||||
public static <T extends ActionResponse> T execute(Map<String, String> headers, Client client, Supplier<T> supplier) {
|
||||
// no headers, we will have to use the xpack internal user for our execution by specifying the ml origin
|
||||
if (headers == null || headers.isEmpty()) {
|
||||
try (ThreadContext.StoredContext ignore = ClientHelper.stashWithOrigin(client.threadPool().getThreadContext(),
|
||||
ClientHelper.ML_ORIGIN)) {
|
||||
return supplier.get();
|
||||
}
|
||||
} else {
|
||||
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
|
||||
Map<String, String> filteredHeaders = headers.entrySet().stream()
|
||||
.filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet());
|
||||
return supplier.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
|
@ -35,8 +38,6 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
|||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -303,7 +304,7 @@ public class MlMetadata implements MetaData.Custom {
|
|||
// Adjust the request, adding security headers from the current thread context
|
||||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig);
|
||||
Map<String, String> headers = threadContext.getHeaders().entrySet().stream()
|
||||
.filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
builder.setHeaders(headers);
|
||||
datafeedConfig = builder.build();
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.elasticsearch.index.query.AbstractQueryBuilder;
|
|||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
@ -304,7 +304,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
|
|||
if (threadContext != null) {
|
||||
// Adjust the request, adding security headers from the current thread context
|
||||
Map<String, String> headers = threadContext.getHeaders().entrySet().stream()
|
||||
.filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
builder.setHeaders(headers);
|
||||
}
|
||||
|
|
|
@ -5,8 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.watcher.support.xcontent;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
|
|
|
@ -9,15 +9,33 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -97,7 +115,7 @@ public class ClientHelperTests extends ESTestCase {
|
|||
assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
|
||||
assertNull(threadContext.getHeader(headerName));
|
||||
latch.countDown();
|
||||
((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
((ActionListener<?>) invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
|
@ -130,7 +148,7 @@ public class ClientHelperTests extends ESTestCase {
|
|||
assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
|
||||
assertNull(threadContext.getHeader(headerName));
|
||||
latch.countDown();
|
||||
((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
((ActionListener<?>) invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
|
@ -139,4 +157,179 @@ public class ClientHelperTests extends ESTestCase {
|
|||
clientWithOrigin.execute(null, null, listener);
|
||||
latch.await();
|
||||
}
|
||||
|
||||
public void testExecuteWithHeadersAsyncNoHeaders() throws InterruptedException {
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final Client client = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final ActionListener<SearchResponse> listener = ActionListener.wrap(v -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
}, e -> fail(e.getMessage()));
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
((ActionListener<?>) invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
SearchRequest request = new SearchRequest("foo");
|
||||
|
||||
String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN);
|
||||
ClientHelper.executeWithHeadersAsync(Collections.emptyMap(), originName, client, SearchAction.INSTANCE, request, listener);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
||||
public void testExecuteWithHeadersAsyncWrongHeaders() throws InterruptedException {
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final Client client = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final ActionListener<SearchResponse> listener = ActionListener.wrap(v -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
}, e -> fail(e.getMessage()));
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
((ActionListener<?>) invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
SearchRequest request = new SearchRequest("foo");
|
||||
Map<String, String> headers = new HashMap<>(1);
|
||||
headers.put("foo", "foo");
|
||||
headers.put("bar", "bar");
|
||||
|
||||
String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN);
|
||||
ClientHelper.executeWithHeadersAsync(headers, originName, client, SearchAction.INSTANCE, request, listener);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
||||
public void testExecuteWithHeadersAsyncWithHeaders() throws Exception {
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final Client client = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final ActionListener<SearchResponse> listener = ActionListener.wrap(v -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
}, e -> fail(e.getMessage()));
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
assertThat(threadContext.getHeaders().size(), equalTo(2));
|
||||
assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo"));
|
||||
assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar"));
|
||||
latch.countDown();
|
||||
((ActionListener<?>) invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
SearchRequest request = new SearchRequest("foo");
|
||||
Map<String, String> headers = new HashMap<>(1);
|
||||
headers.put("es-security-runas-user", "foo");
|
||||
headers.put("_xpack_security_authentication", "bar");
|
||||
|
||||
String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN);
|
||||
ClientHelper.executeWithHeadersAsync(headers, originName, client, SearchAction.INSTANCE, request, listener);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
||||
public void testExecuteWithHeadersNoHeaders() {
|
||||
Client client = mock(Client.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
searchFuture.onResponse(new SearchResponse());
|
||||
when(client.search(any())).thenReturn(searchFuture);
|
||||
assertExecutionWithOrigin(Collections.emptyMap(), client);
|
||||
}
|
||||
|
||||
public void testExecuteWithHeaders() {
|
||||
Client client = mock(Client.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
searchFuture.onResponse(new SearchResponse());
|
||||
when(client.search(any())).thenReturn(searchFuture);
|
||||
Map<String, String> headers = MapBuilder.<String, String> newMapBuilder().put(AuthenticationField.AUTHENTICATION_KEY, "anything")
|
||||
.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything").map();
|
||||
|
||||
assertRunAsExecution(headers, h -> {
|
||||
assertThat(h.keySet(), hasSize(2));
|
||||
assertThat(h, hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything"));
|
||||
assertThat(h, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything"));
|
||||
}, client);
|
||||
}
|
||||
|
||||
public void testExecuteWithHeadersNoSecurityHeaders() {
|
||||
Client client = mock(Client.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
searchFuture.onResponse(new SearchResponse());
|
||||
when(client.search(any())).thenReturn(searchFuture);
|
||||
Map<String, String> unrelatedHeaders = MapBuilder.<String, String> newMapBuilder().put(randomAlphaOfLength(10), "anything").map();
|
||||
|
||||
assertExecutionWithOrigin(unrelatedHeaders, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method executes a search and checks if the thread context was
|
||||
* enriched with the ml origin
|
||||
*/
|
||||
private void assertExecutionWithOrigin(Map<String, String> storedHeaders, Client client) {
|
||||
String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN);
|
||||
ClientHelper.executeWithHeaders(storedHeaders, originName, client, () -> {
|
||||
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
|
||||
assertThat(origin, is(originName));
|
||||
|
||||
// Check that headers are not set
|
||||
Map<String, String> headers = client.threadPool().getThreadContext().getHeaders();
|
||||
assertThat(headers, not(hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything")));
|
||||
assertThat(headers, not(hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")));
|
||||
|
||||
return client.search(new SearchRequest()).actionGet();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This method executes a search and ensures no stashed origin thread
|
||||
* context was created, so that the regular node client was used, to emulate
|
||||
* a run_as function
|
||||
*/
|
||||
public void assertRunAsExecution(Map<String, String> storedHeaders, Consumer<Map<String, String>> consumer, Client client) {
|
||||
String originName = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.ROLLUP_ORIGIN);
|
||||
ClientHelper.executeWithHeaders(storedHeaders, originName, client, () -> {
|
||||
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
|
||||
assertThat(origin, is(nullValue()));
|
||||
|
||||
consumer.accept(client.threadPool().getThreadContext().getHeaders());
|
||||
return client.search(new SearchRequest()).actionGet();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
|
||||
|
@ -64,7 +64,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|||
|
||||
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeed);
|
||||
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
previewDatafeed.setHeaders(headers);
|
||||
// NB: this is using the client from the transport layer, NOT the internal client.
|
||||
|
|
|
@ -14,7 +14,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
|
||||
|
||||
|
@ -112,7 +112,7 @@ class AggregationDataExtractor implements DataExtractor {
|
|||
}
|
||||
|
||||
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
|
||||
return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get);
|
||||
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
|
||||
}
|
||||
|
||||
private SearchRequestBuilder buildSearchRequest() {
|
||||
|
|
|
@ -15,10 +15,10 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
|
|||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
import org.elasticsearch.search.aggregations.metrics.min.Min;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -135,7 +135,7 @@ public class ChunkedDataExtractor implements DataExtractor {
|
|||
}
|
||||
|
||||
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
|
||||
return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get);
|
||||
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
|
||||
}
|
||||
|
||||
private Optional<InputStream> getNextStream() throws IOException {
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.elasticsearch.script.Script;
|
|||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.fetch.StoredFieldsContext;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
|
||||
import org.elasticsearch.xpack.ml.utils.DomainSplitFunction;
|
||||
|
@ -100,7 +100,7 @@ class ScrollDataExtractor implements DataExtractor {
|
|||
}
|
||||
|
||||
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
|
||||
return MlClientHelper.execute(context.headers, client, searchRequestBuilder::get);
|
||||
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
|
||||
}
|
||||
|
||||
private SearchRequestBuilder buildSearchRequest(long start) {
|
||||
|
@ -211,7 +211,8 @@ class ScrollDataExtractor implements DataExtractor {
|
|||
}
|
||||
|
||||
protected SearchResponse executeSearchScrollRequest(String scrollId) {
|
||||
return MlClientHelper.execute(context.headers, client, () -> SearchScrollAction.INSTANCE.newRequestBuilder(client)
|
||||
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client,
|
||||
() -> SearchScrollAction.INSTANCE.newRequestBuilder(client)
|
||||
.setScroll(SCROLL_TIMEOUT)
|
||||
.setScrollId(scrollId)
|
||||
.get());
|
||||
|
@ -226,7 +227,8 @@ class ScrollDataExtractor implements DataExtractor {
|
|||
if (scrollId != null) {
|
||||
ClearScrollRequest request = new ClearScrollRequest();
|
||||
request.addScrollId(scrollId);
|
||||
MlClientHelper.execute(context.headers, client, () -> client.execute(ClearScrollAction.INSTANCE, request).actionGet());
|
||||
ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client,
|
||||
() -> client.execute(ClearScrollAction.INSTANCE, request).actionGet());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,12 +12,12 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
|||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
|
||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -76,7 +76,7 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
|
|||
String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*")
|
||||
.toArray(size -> new String[size]);
|
||||
fieldCapabilitiesRequest.fields(requestFields);
|
||||
MlClientHelper.<FieldCapabilitiesResponse>execute(datafeed, client, () -> {
|
||||
ClientHelper.<FieldCapabilitiesResponse> executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
|
||||
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
|
||||
// This response gets discarded - the listener handles the real response
|
||||
return null;
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ml.MlClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
|
||||
import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MlClientHelperTests extends ESTestCase {
|
||||
|
||||
private Client client = mock(Client.class);
|
||||
|
||||
@Before
|
||||
public void setupMocks() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
searchFuture.onResponse(new SearchResponse());
|
||||
when(client.search(any())).thenReturn(searchFuture);
|
||||
}
|
||||
|
||||
public void testEmptyHeaders() {
|
||||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo");
|
||||
builder.setIndices(Collections.singletonList("foo-index"));
|
||||
|
||||
assertExecutionWithOrigin(builder.build());
|
||||
}
|
||||
|
||||
public void testWithHeaders() {
|
||||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo");
|
||||
builder.setIndices(Collections.singletonList("foo-index"));
|
||||
Map<String, String> headers = MapBuilder.<String, String>newMapBuilder()
|
||||
.put(AuthenticationField.AUTHENTICATION_KEY, "anything")
|
||||
.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")
|
||||
.map();
|
||||
builder.setHeaders(headers);
|
||||
|
||||
assertRunAsExecution(builder.build(), h -> {
|
||||
assertThat(h.keySet(), hasSize(2));
|
||||
assertThat(h, hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything"));
|
||||
assertThat(h, hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything"));
|
||||
});
|
||||
}
|
||||
|
||||
public void testFilteredHeaders() {
|
||||
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed-foo", "foo");
|
||||
builder.setIndices(Collections.singletonList("foo-index"));
|
||||
Map<String, String> unrelatedHeaders = MapBuilder.<String, String>newMapBuilder()
|
||||
.put(randomAlphaOfLength(10), "anything")
|
||||
.map();
|
||||
builder.setHeaders(unrelatedHeaders);
|
||||
|
||||
assertRunAsExecution(builder.build(), h -> assertThat(h.keySet(), hasSize(0)));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method executes a search and checks if the thread context was enriched with the ml origin
|
||||
*/
|
||||
private void assertExecutionWithOrigin(DatafeedConfig datafeedConfig) {
|
||||
MlClientHelper.execute(datafeedConfig, client, () -> {
|
||||
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
|
||||
assertThat(origin, is(ML_ORIGIN));
|
||||
|
||||
// Check that headers are not set
|
||||
Map<String, String> headers = client.threadPool().getThreadContext().getHeaders();
|
||||
assertThat(headers, not(hasEntry(AuthenticationField.AUTHENTICATION_KEY, "anything")));
|
||||
assertThat(headers, not(hasEntry(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything")));
|
||||
|
||||
return client.search(new SearchRequest()).actionGet();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This method executes a search and ensures no stashed origin thread context was created, so that the regular node
|
||||
* client was used, to emulate a run_as function
|
||||
*/
|
||||
public void assertRunAsExecution(DatafeedConfig datafeedConfig, Consumer<Map<String, String>> consumer) {
|
||||
MlClientHelper.execute(datafeedConfig, client, () -> {
|
||||
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
|
||||
assertThat(origin, is(nullValue()));
|
||||
|
||||
consumer.accept(client.threadPool().getThreadContext().getHeaders());
|
||||
return client.search(new SearchRequest()).actionGet();
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.rollup.job;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.rollup.Rollup;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* Helper class to execute actions with authentication headers cached in the rollup job (if they exist, otherwise Origin)
|
||||
*/
|
||||
public class RollupClientHelper {
|
||||
|
||||
@SuppressWarnings("try")
|
||||
public static <Request extends ActionRequest, Response extends ActionResponse,
|
||||
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void executeAsync(
|
||||
Client client, RollupJob job, Action<Request, Response, RequestBuilder> action, Request request,
|
||||
ActionListener<Response> listener) {
|
||||
|
||||
Map<String, String> filteredHeaders = job.getHeaders().entrySet().stream()
|
||||
.filter(e -> Rollup.HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
final ThreadContext threadContext = client.threadPool().getThreadContext();
|
||||
|
||||
// No headers (e.g. security not installed/in use) so execute as rollup origin
|
||||
if (filteredHeaders.isEmpty()) {
|
||||
ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ROLLUP_ORIGIN, action, request, listener);
|
||||
} else {
|
||||
// Otherwise stash the context and copy in the saved headers before executing
|
||||
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
||||
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
|
||||
client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
|
||||
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
|
||||
threadContext.copyHeaders(headers.entrySet());
|
||||
return storedContext;
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
|
||||
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
|
||||
|
@ -103,12 +104,14 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
|
||||
@Override
|
||||
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
|
||||
RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, nextPhase);
|
||||
ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE, request,
|
||||
nextPhase);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
|
||||
RollupClientHelper.executeAsync(client, job, BulkAction.INSTANCE, request, nextPhase);
|
||||
ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, BulkAction.INSTANCE, request,
|
||||
nextPhase);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.rollup.job;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class RollupClientHelperTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNoHeaders() throws InterruptedException {
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final Client client = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final ActionListener<SearchResponse> listener = ActionListener.wrap(v -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
}, e -> fail(e.getMessage()));
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
SearchRequest request = new SearchRequest("foo");
|
||||
|
||||
RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build();
|
||||
RollupJob job = new RollupJob(config, Collections.emptyMap());
|
||||
|
||||
RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testWrongHeaders() throws InterruptedException {
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final Client client = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final ActionListener<SearchResponse> listener = ActionListener.wrap(v -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
}, e -> fail(e.getMessage()));
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
SearchRequest request = new SearchRequest("foo");
|
||||
Map<String, String> headers = new HashMap<>(1);
|
||||
headers.put("foo", "foo");
|
||||
headers.put("bar", "bar");
|
||||
RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build();
|
||||
RollupJob job = new RollupJob(config, headers);
|
||||
|
||||
RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testWithHeaders() throws Exception {
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final Client client = mock(Client.class);
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final ActionListener<SearchResponse> listener = ActionListener.wrap(v -> {
|
||||
assertTrue(threadContext.getHeaders().isEmpty());
|
||||
latch.countDown();
|
||||
}, e -> fail(e.getMessage()));
|
||||
|
||||
doAnswer(invocationOnMock -> {
|
||||
assertThat(threadContext.getHeaders().size(), equalTo(2));
|
||||
assertThat(threadContext.getHeaders().get("es-security-runas-user"), equalTo("foo"));
|
||||
assertThat(threadContext.getHeaders().get("_xpack_security_authentication"), equalTo("bar"));
|
||||
latch.countDown();
|
||||
((ActionListener)invocationOnMock.getArguments()[2]).onResponse(null);
|
||||
return null;
|
||||
}).when(client).execute(anyObject(), anyObject(), anyObject());
|
||||
|
||||
SearchRequest request = new SearchRequest("foo");
|
||||
Map<String, String> headers = new HashMap<>(1);
|
||||
headers.put("es-security-runas-user", "foo");
|
||||
headers.put("_xpack_security_authentication", "bar");
|
||||
RollupJobConfig config = ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build();
|
||||
RollupJob job = new RollupJob(config, headers);
|
||||
|
||||
RollupClientHelper.executeAsync(client, job, SearchAction.INSTANCE, request, listener);
|
||||
|
||||
latch.await();
|
||||
}
|
||||
|
||||
}
|
|
@ -203,10 +203,6 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
|||
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
|
||||
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
|
||||
|
||||
// list of headers that will be stored when a watch is stored
|
||||
public static final Set<String> HEADER_FILTERS =
|
||||
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));
|
||||
|
||||
public static final ScriptContext<SearchScript.Factory> SCRIPT_SEARCH_CONTEXT =
|
||||
new ScriptContext<>("xpack", SearchScript.Factory.class);
|
||||
// TODO: remove this context when each xpack script use case has their own contexts
|
||||
|
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.watcher;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
|
||||
|
||||
/**
|
||||
* A helper class which decides if we should run via the xpack user and set watcher as origin or
|
||||
* if we should use the run_as functionality by setting the correct headers
|
||||
*/
|
||||
public class WatcherClientHelper {
|
||||
|
||||
/**
|
||||
* Execute a client operation and return the response, try to run with least privileges, when headers exist
|
||||
*
|
||||
* @param watch The watch in which context this method gets executed in
|
||||
* @param client The client used to query
|
||||
* @param supplier The action to run
|
||||
* @param <T> The client response class this should return
|
||||
* @return An instance of the response class
|
||||
*/
|
||||
public static <T extends ActionResponse> T execute(Watch watch, Client client, Supplier<T> supplier) {
|
||||
// no headers, we will have to use the xpack internal user for our execution by specifying the watcher origin
|
||||
if (watch.status().getHeaders().isEmpty()) {
|
||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
|
||||
return supplier.get();
|
||||
}
|
||||
} else {
|
||||
try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) {
|
||||
Map<String, String> filteredHeaders = watch.status().getHeaders().entrySet().stream()
|
||||
.filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet());
|
||||
return supplier.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.Action;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.Action.Result.Status;
|
||||
import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction;
|
||||
|
@ -24,7 +25,6 @@ import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
|||
import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils;
|
||||
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Payload;
|
||||
import org.elasticsearch.xpack.watcher.WatcherClientHelper;
|
||||
import org.elasticsearch.xpack.watcher.support.ArrayObjectIterator;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
|
@ -96,7 +96,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
|
|||
new XContentSource(indexRequest.source(), XContentType.JSON));
|
||||
}
|
||||
|
||||
IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client,
|
||||
IndexResponse response = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client,
|
||||
() -> client.index(indexRequest).actionGet(indexDefaultTimeout));
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
indexResponseToXContent(builder, response);
|
||||
|
@ -137,7 +137,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
|
|||
}
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
BulkResponse bulkResponse = WatcherClientHelper.execute(ctx.watch(), client,
|
||||
BulkResponse bulkResponse = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client,
|
||||
() -> client.bulk(bulkRequest).actionGet(bulkDefaultTimeout));
|
||||
try (XContentBuilder jsonBuilder = jsonBuilder().startArray()) {
|
||||
for (BulkItemResponse item : bulkResponse) {
|
||||
|
|
|
@ -20,10 +20,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.core.watcher.input.ExecutableInput;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Payload;
|
||||
import org.elasticsearch.xpack.watcher.WatcherClientHelper;
|
||||
import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils;
|
||||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
|
||||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
|
||||
|
@ -71,8 +71,8 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
|
|||
}
|
||||
|
||||
SearchRequest searchRequest = searchTemplateService.toSearchRequest(request);
|
||||
final SearchResponse response = WatcherClientHelper.execute(ctx.watch(), client,
|
||||
() -> client.search(searchRequest).actionGet(timeout));
|
||||
final SearchResponse response = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN,
|
||||
client, () -> client.search(searchRequest).actionGet(timeout));
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());
|
||||
|
|
|
@ -15,10 +15,10 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.core.watcher.transform.ExecutableTransform;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Payload;
|
||||
import org.elasticsearch.xpack.watcher.WatcherClientHelper;
|
||||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
|
||||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
|
||||
|
||||
|
@ -49,7 +49,8 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
|
|||
// We need to make a copy, so that we don't modify the original instance that we keep around in a watch:
|
||||
request = new WatcherSearchTemplateRequest(transform.getRequest(), new BytesArray(renderedTemplate));
|
||||
SearchRequest searchRequest = searchTemplateService.toSearchRequest(request);
|
||||
SearchResponse resp = WatcherClientHelper.execute(ctx.watch(), client, () -> client.search(searchRequest).actionGet(timeout));
|
||||
SearchResponse resp = ClientHelper.executeWithHeaders(ctx.watch().status().getHeaders(), ClientHelper.WATCHER_ORIGIN, client,
|
||||
() -> client.search(searchRequest).actionGet(timeout));
|
||||
return new SearchTransform.Result(request, new Payload.XContent(resp));
|
||||
} catch (Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute [{}] transform for [{}]", TYPE, ctx.id()), e);
|
||||
|
|
|
@ -20,12 +20,12 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.watcher.Watcher;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
|
||||
import org.elasticsearch.xpack.watcher.watch.WatchParser;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -83,7 +83,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
|
|||
|
||||
// ensure we only filter for the allowed headers
|
||||
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey()))
|
||||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
watch.status().setHeaders(filteredHeaders);
|
||||
|
||||
|
|
|
@ -1,119 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.watcher;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class WatcherClientHelperTests extends ESTestCase {
|
||||
|
||||
private Client client = mock(Client.class);
|
||||
|
||||
@Before
|
||||
public void setupMocks() {
|
||||
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
|
||||
searchFuture.onResponse(new SearchResponse());
|
||||
when(client.search(any())).thenReturn(searchFuture);
|
||||
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
}
|
||||
|
||||
public void testEmptyHeaders() {
|
||||
WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock();
|
||||
when(ctx.watch().status().getHeaders()).thenReturn(Collections.emptyMap());
|
||||
|
||||
assertExecutionWithOrigin(ctx);
|
||||
}
|
||||
|
||||
public void testWithHeaders() {
|
||||
WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock();
|
||||
Map<String, String> watchStatusHeaders = MapBuilder.<String, String>newMapBuilder()
|
||||
.put("es-security-runas-user", "anything")
|
||||
.put("_xpack_security_authentication", "anything")
|
||||
.map();
|
||||
when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders);
|
||||
|
||||
assertRunAsExecution(ctx, headers -> {
|
||||
assertThat(headers.keySet(), hasSize(2));
|
||||
assertThat(headers, hasEntry("es-security-runas-user", "anything"));
|
||||
assertThat(headers, hasEntry("_xpack_security_authentication", "anything"));
|
||||
});
|
||||
}
|
||||
|
||||
public void testFilteredHeaders() {
|
||||
WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock();
|
||||
Map<String, String> watchStatusHeaders = MapBuilder.<String, String>newMapBuilder()
|
||||
.put(randomAlphaOfLength(10), "anything")
|
||||
.map();
|
||||
when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders);
|
||||
|
||||
assertRunAsExecution(ctx, headers -> {
|
||||
assertThat(headers.keySet(), hasSize(0));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This method executes a search and checks if the thread context was enriched with the watcher origin
|
||||
*/
|
||||
private void assertExecutionWithOrigin(WatchExecutionContext ctx) {
|
||||
WatcherClientHelper.execute(ctx.watch(), client, () -> {
|
||||
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
|
||||
assertThat(origin, is(WATCHER_ORIGIN));
|
||||
|
||||
// check that headers are not set
|
||||
Map<String, String> headers = client.threadPool().getThreadContext().getHeaders();
|
||||
assertThat(headers, not(hasEntry("es-security-runas-user", "anything")));
|
||||
assertThat(headers, not(hasEntry("_xpack_security_authentication", "anything")));
|
||||
|
||||
return client.search(new SearchRequest()).actionGet();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This method executes a search and ensures no stashed origin thread context was created, so that the regular node
|
||||
* client was used, to emulate a run_as function
|
||||
*/
|
||||
public void assertRunAsExecution(WatchExecutionContext ctx, Consumer<Map<String, String>> consumer) {
|
||||
WatcherClientHelper.execute(ctx.watch(), client, () -> {
|
||||
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
|
||||
assertThat(origin, is(nullValue()));
|
||||
|
||||
Map<String, String> headers = client.threadPool().getThreadContext().getHeaders();
|
||||
consumer.accept(headers);
|
||||
return client.search(new SearchRequest()).actionGet();
|
||||
});
|
||||
|
||||
}
|
||||
}
|
|
@ -57,7 +57,7 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
|
|||
|
||||
private MockWebServer webServer = new MockWebServer();
|
||||
private static Boolean encryptSensitiveData = null;
|
||||
private static byte[] encryptionKey = CryptoServiceTests.generateKey();
|
||||
private static final byte[] encryptionKey = CryptoServiceTests.generateKey();
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
|
@ -155,6 +155,14 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
|
|||
assertThat(webServer.requests(), hasSize(1));
|
||||
assertThat(webServer.requests().get(0).getHeader("Authorization"),
|
||||
is(ApplicableBasicAuth.headerValue(USERNAME, PASSWORD.toCharArray())));
|
||||
|
||||
// now trigger the by the scheduler and make sure that the password is also correctly transmitted
|
||||
webServer.enqueue(new MockResponse().setResponseCode(200).setBody(
|
||||
BytesReference.bytes(jsonBuilder().startObject().field("key", "value").endObject()).utf8ToString()));
|
||||
timeWarp().trigger("_id");
|
||||
assertThat(webServer.requests(), hasSize(2));
|
||||
assertThat(webServer.requests().get(1).getHeader("Authorization"),
|
||||
is(ApplicableBasicAuth.headerValue(USERNAME, PASSWORD.toCharArray())));
|
||||
}
|
||||
|
||||
public void testWebhookAction() throws Exception {
|
||||
|
|
|
@ -11,7 +11,6 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -20,10 +19,10 @@ import org.elasticsearch.license.XPackLicenseState;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.watcher.Watcher;
|
||||
import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder;
|
||||
import org.elasticsearch.xpack.watcher.watch.WatchParser;
|
||||
import org.junit.Before;
|
||||
|
@ -81,7 +80,7 @@ public class TransportPutWatchActionTests extends ESTestCase {
|
|||
|
||||
public void testHeadersAreFilteredWhenPuttingWatches() throws Exception {
|
||||
// set up threadcontext with some arbitrary info
|
||||
String headerName = randomFrom(Watcher.HEADER_FILTERS);
|
||||
String headerName = randomFrom(ClientHelper.SECURITY_HEADER_FILTERS);
|
||||
threadContext.putHeader(headerName, randomAlphaOfLength(10));
|
||||
threadContext.putHeader(randomAlphaOfLength(10), "doesntmatter");
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
|
|||
|
||||
@Override
|
||||
public void start(Collection<Watch> jobs) {
|
||||
jobs.forEach(this::add);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -87,6 +87,13 @@ setup:
|
|||
xpack.ml.close_job:
|
||||
job_id: mixed-cluster-job
|
||||
|
||||
# Wait for indices to be fully allocated before
|
||||
# killing the node
|
||||
- do:
|
||||
cluster.health:
|
||||
index: [".ml-state", ".ml-anomalies-shared"]
|
||||
wait_for_status: green
|
||||
|
||||
---
|
||||
"Test get job with rules":
|
||||
|
||||
|
|
|
@ -49,6 +49,13 @@
|
|||
job_id: old-cluster-job
|
||||
- match: { count: 1 }
|
||||
|
||||
# Wait for indices to be fully allocated before
|
||||
# killing the node
|
||||
- do:
|
||||
cluster.health:
|
||||
index: [".ml-state", ".ml-anomalies-shared"]
|
||||
wait_for_status: green
|
||||
|
||||
---
|
||||
"Put job on the old cluster with the default model memory limit and post some data":
|
||||
- do:
|
||||
|
@ -96,6 +103,13 @@
|
|||
job_id: no-model-memory-limit-job
|
||||
- match: { count: 201 }
|
||||
|
||||
# Wait for indices to be fully allocated before
|
||||
# killing the node
|
||||
- do:
|
||||
cluster.health:
|
||||
index: [".ml-state", ".ml-anomalies-shared"]
|
||||
wait_for_status: green
|
||||
|
||||
---
|
||||
"Put job with empty strings in the configuration":
|
||||
- do:
|
||||
|
|
Loading…
Reference in New Issue