Watcher: Store thread context headers in watch (elastic/x-pack-elasticsearch#2808)

In order to be able to execute a watch as the user, who stored the
watch, this commit stores certain headers of the thread context, that
was used when the watch was stored.

Upon loading the watch the headers are loaded and applied for the
following watcher execution features

* search transform
* search input
* index action

A special case is the execute watch API, which overrides the headers loaded
from the watch with the one of the current request, so that a user
cannot execute this watch with other privileges of the user who stored it.

Only the headers "es-security-runas-user", "_xpack_security_authentication" are
copied for now, as those are needed for our security features.

The headers are stored in watch status in the watch and are not returned by default,
when the GET Watch API is used. A search reveals those of course.

relates elastic/x-pack-elasticsearch#2201

Original commit: elastic/x-pack-elasticsearch@9803bd51c2
This commit is contained in:
Alexander Reelsen 2017-11-24 09:15:54 +01:00 committed by GitHub
parent 933c22dce6
commit 4fe9ac734b
36 changed files with 1434 additions and 154 deletions

View File

@ -205,7 +205,6 @@ import static org.elasticsearch.xpack.XPackSettings.HTTP_SSL_ENABLED;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.INTERNAL_INDEX_FORMAT;
import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.INTERNAL_SECURITY_INDEX;
public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin, DiscoveryPlugin {

View File

@ -204,6 +204,10 @@ public class Watcher implements ActionPlugin {
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
// list of headers that will be stored when a watch is stored
public static final Set<String> HEADER_FILTERS =
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));
public static final ScriptContext<SearchScript.Factory> SCRIPT_SEARCH_CONTEXT =
new ScriptContext<>("xpack", SearchScript.Factory.class);
// TODO: remove this context when each xpack script use case has their own contexts
@ -289,8 +293,8 @@ public class Watcher implements ActionPlugin {
final Map<String, ConditionFactory> parsers = new HashMap<>();
parsers.put(AlwaysCondition.TYPE, (c, id, p) -> AlwaysCondition.parse(id, p));
parsers.put(NeverCondition.TYPE, (c, id, p) -> NeverCondition.parse(id, p));
parsers.put(ArrayCompareCondition.TYPE, (c, id, p) -> ArrayCompareCondition.parse(c, id, p));
parsers.put(CompareCondition.TYPE, (c, id, p) -> CompareCondition.parse(c, id, p));
parsers.put(ArrayCompareCondition.TYPE, ArrayCompareCondition::parse);
parsers.put(CompareCondition.TYPE, CompareCondition::parse);
parsers.put(ScriptCondition.TYPE, (c, id, p) -> ScriptCondition.parse(scriptService, id, p));
final ConditionRegistry conditionRegistry = new ConditionRegistry(Collections.unmodifiableMap(parsers), clock);
@ -313,8 +317,7 @@ public class Watcher implements ActionPlugin {
// inputs
final Map<String, InputFactory> inputFactories = new HashMap<>();
inputFactories.put(SearchInput.TYPE,
new SearchInputFactory(settings, client, xContentRegistry, scriptService));
inputFactories.put(SearchInput.TYPE, new SearchInputFactory(settings, client, xContentRegistry, scriptService));
inputFactories.put(SimpleInput.TYPE, new SimpleInputFactory(settings));
inputFactories.put(HttpInput.TYPE, new HttpInputFactory(settings, httpClient, templateEngine, httpTemplateParser));
inputFactories.put(NoneInput.TYPE, new NoneInputFactory(settings));

View File

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin;
/**
* A helper class which decides if we should run via the xpack user and set watcher as origin or
* if we should use the run_as functionality by setting the correct headers
*/
public class WatcherClientHelper {
/**
* Execute a client operation and return the response, try to run with least privileges, when headers exist
*
* @param watch The watch in which context this method gets executed in
* @param client The client used to query
* @param supplier The action to run
* @param <T> The client response class this should return
* @return An instance of the response class
*/
public static <T extends ActionResponse> T execute(Watch watch, Client client, Supplier<T> supplier) {
// no headers, we will have to use the xpack internal user for our execution by specifying the watcher origin
if (watch.status().getHeaders().isEmpty()) {
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
return supplier.get();
}
} else {
try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) {
Map<String, String> filteredHeaders = watch.status().getHeaders().entrySet().stream()
.filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet());
return supplier.get();
}
}
}
}

View File

@ -15,9 +15,9 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.watcher.WatcherClientHelper;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.actions.ExecutableAction;
@ -31,12 +31,9 @@ import org.joda.time.DateTime;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
@ -94,7 +91,6 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
indexRequest.id(docId);
data = addTimestampToDocument(data, ctx.executionTime());
IndexResponse response;
BytesReference bytesReference;
try (XContentBuilder builder = jsonBuilder()) {
indexRequest.source(builder.prettyPrint().map(data));
@ -105,9 +101,8 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
XContentType.JSON));
}
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
response = client.index(indexRequest).get(indexDefaultTimeout.millis(), TimeUnit.MILLISECONDS);
}
IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client,
() -> client.index(indexRequest).actionGet(indexDefaultTimeout));
try (XContentBuilder builder = jsonBuilder()) {
indexResponseToXContent(builder, response);
bytesReference = builder.bytes();
@ -140,8 +135,8 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
bulkRequest.add(indexRequest);
}
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
BulkResponse bulkResponse = client.bulk(bulkRequest).get(bulkDefaultTimeout.millis(), TimeUnit.MILLISECONDS);
BulkResponse bulkResponse = WatcherClientHelper.execute(ctx.watch(), client,
() -> client.bulk(bulkRequest).actionGet(bulkDefaultTimeout));
try (XContentBuilder jsonBuilder = jsonBuilder().startArray()) {
for (BulkItemResponse item : bulkResponse) {
itemResponseToXContent(jsonBuilder, item);
@ -159,7 +154,6 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
}
}
}
private Map<String, Object> addTimestampToDocument(Map<String, Object> data, DateTime executionTime) {
if (action.executionTimeField != null) {
@ -180,7 +174,7 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
return data instanceof HashMap ? data : new HashMap<>(data);
}
static void itemResponseToXContent(XContentBuilder builder, BulkItemResponse item) throws IOException {
private static void itemResponseToXContent(XContentBuilder builder, BulkItemResponse item) throws IOException {
if (item.isFailed()) {
builder.startObject()
.field("failed", item.isFailed())

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
public class TriggeredExecutionContext extends WatchExecutionContext {

View File

@ -11,7 +11,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;

View File

@ -6,19 +6,20 @@
package org.elasticsearch.xpack.watcher.input.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.watcher.WatcherClientHelper;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils;
@ -28,8 +29,6 @@ import org.elasticsearch.xpack.watcher.watch.Payload;
import java.util.Map;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin;
import static org.elasticsearch.xpack.watcher.input.search.SearchInput.TYPE;
/**
@ -43,8 +42,8 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
private final WatcherSearchTemplateService searchTemplateService;
private final TimeValue timeout;
public ExecutableSearchInput(SearchInput input, Logger logger, Client client,
WatcherSearchTemplateService searchTemplateService, TimeValue defaultTimeout) {
public ExecutableSearchInput(SearchInput input, Logger logger, Client client, WatcherSearchTemplateService searchTemplateService,
TimeValue defaultTimeout) {
super(input, logger);
this.client = client;
this.searchTemplateService = searchTemplateService;
@ -70,10 +69,9 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.watch().id(), request.getSearchSource().utf8ToString());
}
SearchResponse response;
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
response = client.search(searchTemplateService.toSearchRequest(request)).actionGet(timeout);
}
SearchRequest searchRequest = searchTemplateService.toSearchRequest(request);
final SearchResponse response = WatcherClientHelper.execute(ctx.watch(), client,
() -> client.search(searchRequest).actionGet(timeout));
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), response.getHits().getTotalHits());

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateServi
import java.io.IOException;
public class SearchInputFactory extends InputFactory<SearchInput, SearchInput.Result, ExecutableSearchInput> {
private final Client client;
private final TimeValue defaultTimeout;
private final WatcherSearchTemplateService searchTemplateService;

View File

@ -18,7 +18,7 @@ public class WatcherParams extends ToXContent.DelegatingMapParams {
public static final WatcherParams HIDE_SECRETS = WatcherParams.builder().hideSecrets(true).build();
static final String HIDE_SECRETS_KEY = "hide_secrets";
static final String COLLAPSE_ARRAYS_KEY = "collapse_arrays";
public static final String HIDE_HEADERS = "hide_headers";
static final String DEBUG_KEY = "debug";
public static boolean hideSecrets(ToXContent.Params params) {
@ -41,6 +41,10 @@ public class WatcherParams extends ToXContent.DelegatingMapParams {
return paramAsBoolean(DEBUG_KEY, false);
}
public boolean hideHeaders() {
return paramAsBoolean(HIDE_HEADERS, true);
}
public static WatcherParams wrap(ToXContent.Params params) {
return params instanceof WatcherParams ?
(WatcherParams) params :
@ -69,6 +73,11 @@ public class WatcherParams extends ToXContent.DelegatingMapParams {
return this;
}
public Builder hideHeaders(boolean hideHeaders) {
params.put(HIDE_HEADERS, String.valueOf(hideHeaders));
return this;
}
public Builder debug(boolean debug) {
params.put(DEBUG_KEY, String.valueOf(debug));
return this;

View File

@ -8,32 +8,29 @@ package org.elasticsearch.xpack.watcher.transform.search;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.xpack.watcher.WatcherClientHelper;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
import static org.elasticsearch.xpack.ClientHelper.stashWithOrigin;
import static org.elasticsearch.xpack.watcher.transform.search.SearchTransform.TYPE;
public class ExecutableSearchTransform extends ExecutableTransform<SearchTransform, SearchTransform.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
protected final Client client;
protected final WatcherSearchTemplateService searchTemplateService;
protected final TimeValue timeout;
private final Client client;
private final WatcherSearchTemplateService searchTemplateService;
private final TimeValue timeout;
public ExecutableSearchTransform(SearchTransform transform, Logger logger, Client client,
WatcherSearchTemplateService searchTemplateService, TimeValue defaultTimeout) {
@ -51,11 +48,9 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
String renderedTemplate = searchTemplateService.renderTemplate(template, ctx, payload);
// We need to make a copy, so that we don't modify the original instance that we keep around in a watch:
request = new WatcherSearchTemplateRequest(transform.getRequest(), new BytesArray(renderedTemplate));
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
SearchResponse resp = client.search(searchTemplateService.toSearchRequest(request))
.get(timeout.millis(), TimeUnit.MILLISECONDS);
SearchRequest searchRequest = searchTemplateService.toSearchRequest(request);
SearchResponse resp = WatcherClientHelper.execute(ctx.watch(), client, () -> client.search(searchRequest).actionGet(timeout));
return new SearchTransform.Result(request, new Payload.XContent(resp));
}
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to execute [{}] transform for [{}]", TYPE, ctx.id()), e);
return new SearchTransform.Result(request, e);

View File

@ -19,12 +19,11 @@ import java.io.IOException;
public class SearchTransformFactory extends TransformFactory<SearchTransform, SearchTransform.Result, ExecutableSearchTransform> {
protected final Client client;
private final Client client;
private final TimeValue defaultTimeout;
private final WatcherSearchTemplateService searchTemplateService;
public SearchTransformFactory(Settings settings, Client client, NamedXContentRegistry xContentRegistry,
ScriptService scriptService) {
public SearchTransformFactory(Settings settings, Client client, NamedXContentRegistry xContentRegistry, ScriptService scriptService) {
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));
this.client = client;
this.defaultTimeout = settings.getAsTime("xpack.watcher.transform.search.default_timeout", TimeValue.timeValueMinutes(1));

View File

@ -40,6 +40,7 @@ import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
@ -109,6 +110,12 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
threadPool.executor(XPackPlugin.WATCHER).submit(() -> {
try {
// ensure that the headers from the incoming request are used instead those of the stored watch
// otherwise the watch would run as the user who stored the watch, but it needs to be run as the user who
// executes this request
Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
watch.status().setHeaders(headers);
String triggerType = watch.trigger().type();
TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.Payload;
@ -28,6 +29,8 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import java.time.Clock;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
@ -58,8 +61,18 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType());
watch.setState(request.isActive(), now);
// ensure we only filter for the allowed headers
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
watch.status().setHeaders(filteredHeaders);
try (XContentBuilder builder = jsonBuilder()) {
Payload.XContent.Params params = WatcherParams.builder().hideSecrets(false).put(Watch.INCLUDE_STATUS_KEY, "true").build();
Payload.XContent.Params params = WatcherParams.builder()
.hideSecrets(false)
.hideHeaders(false)
.put(Watch.INCLUDE_STATUS_KEY, "true")
.build();
watch.toXContent(builder, params);
final BytesReference bytesReference = builder.bytes();

View File

@ -18,11 +18,13 @@ import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.actions.throttler.AckThrottler;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherXContentParser;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@ -46,6 +48,7 @@ public class WatchStatus implements ToXContentObject, Streamable {
@Nullable private DateTime lastChecked;
@Nullable private DateTime lastMetCondition;
@Nullable private long version;
@Nullable private Map<String, String> headers;
private Map<String, ActionStatus> actions;
// for serialization
@ -53,21 +56,18 @@ public class WatchStatus implements ToXContentObject, Streamable {
}
public WatchStatus(DateTime now, Map<String, ActionStatus> actions) {
this(-1, new State(true, now), null, null, null, actions);
}
public WatchStatus(WatchStatus other) {
this(other.version, other.state, other.executionState, other.lastChecked, other.lastMetCondition, other.actions);
this(-1, new State(true, now), null, null, null, actions, Collections.emptyMap());
}
private WatchStatus(long version, State state, ExecutionState executionState, DateTime lastChecked, DateTime lastMetCondition,
Map<String, ActionStatus> actions) {
Map<String, ActionStatus> actions, Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.actions = actions;
this.state = state;
this.executionState = executionState;
this.headers = headers;
}
public State state() {
@ -102,6 +102,14 @@ public class WatchStatus implements ToXContentObject, Streamable {
return executionState;
}
public Map<String, String> getHeaders() {
return headers;
}
public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -202,6 +210,11 @@ public class WatchStatus implements ToXContentObject, Streamable {
if (executionState != null) {
out.writeString(executionState.id());
}
boolean statusHasHeaders = headers != null && headers.isEmpty() == false;
out.writeBoolean(statusHasHeaders);
if (statusHasHeaders) {
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
}
@Override
@ -220,6 +233,9 @@ public class WatchStatus implements ToXContentObject, Streamable {
if (executionStateExists) {
executionState = ExecutionState.resolve(in.readString());
}
if (in.readBoolean()) {
headers = in.readMap(StreamInput::readString, StreamInput::readString);
}
}
public static WatchStatus read(StreamInput in) throws IOException {
@ -250,6 +266,9 @@ public class WatchStatus implements ToXContentObject, Streamable {
if (executionState != null) {
builder.field(Field.EXECUTION_STATE.getPreferredName(), executionState.id());
}
if (headers != null && headers.isEmpty() == false && params.paramAsBoolean(WatcherParams.HIDE_HEADERS, true) == false) {
builder.field(Field.HEADERS.getPreferredName(), headers);
}
builder.field(Field.VERSION.getPreferredName(), version);
return builder.endObject();
}
@ -261,6 +280,7 @@ public class WatchStatus implements ToXContentObject, Streamable {
DateTime lastMetCondition = null;
Map<String, ActionStatus> actions = null;
long version = -1;
Map<String, String> headers = Collections.emptyMap();
String currentFieldName = null;
XContentParser.Token token;
@ -317,6 +337,10 @@ public class WatchStatus implements ToXContentObject, Streamable {
throw new ElasticsearchParseException("could not parse watch status for [{}]. expecting field [{}] to be an object, " +
"found [{}] instead", watchId, currentFieldName, token);
}
} else if (Field.HEADERS.match(currentFieldName)) {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
}
}
@ -328,7 +352,7 @@ public class WatchStatus implements ToXContentObject, Streamable {
}
actions = actions == null ? emptyMap() : unmodifiableMap(actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions, headers);
}
public static class State implements ToXContentObject {
@ -387,5 +411,6 @@ public class WatchStatus implements ToXContentObject, Streamable {
ParseField ACTIONS = new ParseField("actions");
ParseField VERSION = new ParseField("version");
ParseField EXECUTION_STATE = new ParseField("execution_state");
ParseField HEADERS = new ParseField("headers");
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder;
import org.junit.Before;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME;
import static org.elasticsearch.xpack.ClientHelper.WATCHER_ORIGIN;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class WatcherClientHelperTests extends ESTestCase {
private Client client = mock(Client.class);
@Before
public void setupMocks() {
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
searchFuture.onResponse(new SearchResponse());
when(client.search(any())).thenReturn(searchFuture);
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.threadPool()).thenReturn(threadPool);
}
public void testEmptyHeaders() {
WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock();
when(ctx.watch().status().getHeaders()).thenReturn(Collections.emptyMap());
assertExecutionWithOrigin(ctx);
}
public void testWithHeaders() {
WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock();
Map<String, String> watchStatusHeaders = MapBuilder.<String, String>newMapBuilder()
.put("es-security-runas-user", "anything")
.put("_xpack_security_authentication", "anything")
.map();
when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders);
assertRunAsExecution(ctx, headers -> {
assertThat(headers.keySet(), hasSize(2));
assertThat(headers, hasEntry("es-security-runas-user", "anything"));
assertThat(headers, hasEntry("_xpack_security_authentication", "anything"));
});
}
public void testFilteredHeaders() {
WatchExecutionContext ctx = new WatchExecutionContextMockBuilder("_id").buildMock();
Map<String, String> watchStatusHeaders = MapBuilder.<String, String>newMapBuilder()
.put(randomAlphaOfLength(10), "anything")
.map();
when(ctx.watch().status().getHeaders()).thenReturn(watchStatusHeaders);
assertRunAsExecution(ctx, headers -> {
assertThat(headers.keySet(), hasSize(0));
});
}
/**
* This method executes a search and checks if the thread context was enriched with the watcher origin
*/
private void assertExecutionWithOrigin(WatchExecutionContext ctx) {
WatcherClientHelper.execute(ctx.watch(), client, () -> {
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
assertThat(origin, is(WATCHER_ORIGIN));
// check that headers are not set
Map<String, String> headers = client.threadPool().getThreadContext().getHeaders();
assertThat(headers, not(hasEntry("es-security-runas-user", "anything")));
assertThat(headers, not(hasEntry("_xpack_security_authentication", "anything")));
return client.search(new SearchRequest()).actionGet();
});
}
/**
* This method executes a search and ensures no stashed origin thread context was created, so that the regular node
* client was used, to emulate a run_as function
*/
public void assertRunAsExecution(WatchExecutionContext ctx, Consumer<Map<String, String>> consumer) {
WatcherClientHelper.execute(ctx.watch(), client, () -> {
Object origin = client.threadPool().getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME);
assertThat(origin, is(nullValue()));
Map<String, String> headers = client.threadPool().getThreadContext().getHeaders();
consumer.accept(headers);
return client.search(new SearchRequest()).actionGet();
});
}
}

View File

@ -40,6 +40,11 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
return true;
}
@Override
protected boolean enableSecurity() {
return true;
}
public void testTimeThrottle() throws Exception {
String id = randomAlphaOfLength(20);
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
@ -68,12 +73,12 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
public void testTimeThrottleDefaults() throws Exception {
String id = randomAlphaOfLength(30);
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
PutWatchResponse putWatchResponse = watcherClientWithWatcherUser().preparePutWatch()
.setId(id)
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(simpleInput())
.addAction("my-logging-action", indexAction("actions", "action")))
.addAction("my-logging-action", indexAction("my_watcher_index", "action")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));

View File

@ -5,25 +5,27 @@
*/
package org.elasticsearch.xpack.watcher.actions.email;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.http.MockResponse;
import org.elasticsearch.test.http.MockWebServer;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.Scheme;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.notification.email.EmailTemplate;
import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachment;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachments;
import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpRequestAttachment;
import org.elasticsearch.xpack.watcher.notification.email.support.EmailServer;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.junit.After;
@ -41,14 +43,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.notification.email.DataAttachment.JSON;
import static org.elasticsearch.xpack.watcher.notification.email.DataAttachment.YAML;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.emailAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
import static org.elasticsearch.xpack.watcher.notification.email.DataAttachment.JSON;
import static org.elasticsearch.xpack.watcher.notification.email.DataAttachment.YAML;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.allOf;
@ -151,7 +151,6 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
// Have a sample document in the index, the watch is going to evaluate
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(matchAllQuery()), "idx");
List<EmailAttachmentParser.EmailAttachment> attachments = new ArrayList<>();
@ -174,8 +173,8 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
EmailTemplate.Builder emailBuilder = EmailTemplate.builder().from("_from").to("_to").subject("Subject");
WatchSourceBuilder watchSourceBuilder = watchBuilder()
.trigger(schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)))
.input(searchInput(request))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.input(noneInput())
.condition(AlwaysCondition.INSTANCE)
.addAction("_email", emailAction(emailBuilder).setAuthentication(EmailServer.USERNAME, EmailServer.PASSWORD.toCharArray())
.setAttachments(emailAttachments));
logger.info("TMP WATCHSOURCE {}", watchSourceBuilder.build().getBytes().utf8ToString());
@ -184,12 +183,13 @@ public class EmailAttachmentTests extends AbstractWatcherIntegrationTestCase {
.setSource(watchSourceBuilder)
.get();
if (timeWarped()) {
timeWarp().trigger("_test_id");
refresh();
}
assertWatchWithMinimumPerformedActionsCount("_test_id", 1);
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*")
.setQuery(QueryBuilders.termQuery("watch_id", "_test_id"))
.execute().actionGet();
assertHitCount(searchResponse, 1);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waited too long for email to be received");

View File

@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -202,8 +201,7 @@ public class IndexActionTests extends ESIntegTestCase {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
}
builder.endObject();
Client client = client();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client());
XContentParser parser = createParser(builder);
parser.nextToken();
@ -231,9 +229,7 @@ public class IndexActionTests extends ESIntegTestCase {
}
}
builder.endObject();
Client client = client();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client());
XContentParser parser = createParser(builder);
parser.nextToken();
try {
@ -253,8 +249,8 @@ public class IndexActionTests extends ESIntegTestCase {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(),
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
List<Map<String, Object>> docs = new ArrayList<>();
boolean addSuccessfulIndexedDoc = randomBoolean();
@ -276,8 +272,8 @@ public class IndexActionTests extends ESIntegTestCase {
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(),
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
final DateTime executionTime = DateTime.now(UTC);

View File

@ -439,6 +439,22 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
return randomBoolean() ? new XPackClient(client).watcher() : new WatcherClient(client);
}
/**
* This watcher client needs to be used whenenver an index action is about to be called in a watch
* as otherwise there is no permission to index data with the default transport client user called admin
* This is important if the watch is executed, as the watch is run as the user who stored the watch
* when security is enabled
*/
protected WatcherClient watcherClientWithWatcherUser() {
if (securityEnabled) {
return watcherClient()
.filterWithHeader(Collections.singletonMap("Authorization",
basicAuthHeaderValue("watcher_test", new SecureString(SecuritySettings.TEST_PASSWORD.toCharArray()))));
} else {
return watcherClient();
}
}
protected IndexNameExpressionResolver indexNameExpressionResolver() {
return internalCluster().getInstance(IndexNameExpressionResolver.class);
}
@ -680,13 +696,15 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
"transport_client:" + TEST_PASSWORD_HASHED + "\n" +
TEST_USERNAME + ":" + TEST_PASSWORD_HASHED + "\n" +
"admin:" + TEST_PASSWORD_HASHED + "\n" +
"watcher_test:" + TEST_PASSWORD_HASHED + "\n" +
"monitor:" + TEST_PASSWORD_HASHED;
public static final String USER_ROLES =
"transport_client:transport_client\n" +
"test:test\n" +
"admin:admin\n" +
"monitor:monitor";
"monitor:monitor\n" +
"watcher_test:watcher_test,watcher_admin,watcher_user\n";
public static final String ROLES =
"test:\n" + // a user for the test infra.
@ -700,8 +718,15 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
"\n" +
"admin:\n" +
" cluster: [ 'manage' ]\n" +
"\n" +
"monitor:\n" +
" cluster: [ 'monitor' ]\n"
" cluster: [ 'monitor' ]\n" +
"\n" +
"watcher_test:\n" +
" cluster: [ 'manage_watcher', 'cluster:admin/xpack/watcher/watch/put' ]\n" +
" indices:\n" +
" - names: 'my_watcher_index'\n" +
" privileges: [ all ]\n"
;

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import java.util.Collections;
@ -29,6 +30,9 @@ public class WatchExecutionContextMockBuilder {
public WatchExecutionContextMockBuilder(String watchId) {
ctx = mock(WatchExecutionContext.class);
watch = mock(Watch.class);
WatchStatus watchStatus = mock(WatchStatus.class);
when(watchStatus.getHeaders()).thenReturn(Collections.emptyMap());
when(watch.status()).thenReturn(watchStatus);
when(watch.id()).thenReturn(watchId);
when(ctx.watch()).thenReturn(watch);
payload(Collections.<String, Object>emptyMap());

View File

@ -15,29 +15,29 @@ import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.secret.Secret;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
import org.elasticsearch.xpack.watcher.notification.email.EmailTemplate;
import org.elasticsearch.xpack.watcher.notification.email.HtmlSanitizer;
import org.elasticsearch.xpack.watcher.notification.email.Profile;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.actions.email.EmailAction;
import org.elasticsearch.xpack.watcher.actions.email.ExecutableEmailAction;
import org.elasticsearch.xpack.watcher.actions.webhook.ExecutableWebhookAction;
import org.elasticsearch.xpack.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.secret.Secret;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
import org.elasticsearch.xpack.watcher.notification.email.EmailTemplate;
import org.elasticsearch.xpack.watcher.notification.email.HtmlSanitizer;
import org.elasticsearch.xpack.watcher.notification.email.Profile;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;

View File

@ -260,8 +260,8 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id
().value())
bulkRequestBuilder.add(client()
.prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.setWaitForActiveShards(ActiveShardCount.ALL)
);

View File

@ -56,6 +56,7 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.getRandomSupportedSearchType;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
@ -76,10 +77,11 @@ public class SearchInputTests extends ESTestCase {
contexts.put(Watcher.SCRIPT_EXECUTABLE_CONTEXT.name, Watcher.SCRIPT_EXECUTABLE_CONTEXT);
scriptService = new ScriptService(Settings.EMPTY, engines, contexts);
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
}
public void testExecute() throws Exception {
@ -90,12 +92,16 @@ public class SearchInputTests extends ESTestCase {
searchFuture.onResponse(searchResponse);
when(client.search(requestCaptor.capture())).thenReturn(searchFuture);
ArgumentCaptor<Map> headersCaptor = ArgumentCaptor.forClass(Map.class);
when(client.filterWithHeader(headersCaptor.capture())).thenReturn(client);
SearchSourceBuilder searchSourceBuilder = searchSource().query(boolQuery().must(matchQuery("event_type", "a")));
WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
client, watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
@ -103,6 +109,7 @@ public class SearchInputTests extends ESTestCase {
assertThat(searchRequest.searchType(), is(request.getSearchType()));
assertThat(searchRequest.indicesOptions(), is(request.getIndicesOptions()));
assertThat(searchRequest.indices(), is(arrayContainingInAnyOrder(request.getIndices())));
assertThat(headersCaptor.getAllValues(), hasSize(0));
}
public void testDifferentSearchType() throws Exception {

View File

@ -9,11 +9,11 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
import org.elasticsearch.xpack.watcher.actions.logging.LoggingLevel;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.execution.ActionExecutionMode;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
@ -26,13 +26,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.equalTo;
@ -40,6 +37,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.joda.time.DateTimeZone.UTC;
public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
public void testWatchMetadata() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("foo", "bar");
@ -53,7 +51,7 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0/5 * * * * ? *")))
.input(searchInput(templateRequest(searchSource().query(matchAllQuery()), "my-index")))
.input(noneInput())
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L))
.metadata(metadata))
.get();
@ -84,7 +82,7 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(cron("0 0 0 1 1 ? 2050")))
.input(searchInput(templateRequest(searchSource().query(matchAllQuery()), "my-index")))
.input(noneInput())
.condition(AlwaysCondition.INSTANCE)
.addAction("testLogger", loggingAction)
.defaultThrottlePeriod(TimeValue.timeValueSeconds(0))

View File

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.clock.ClockMock;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TransportPutWatchActionTests extends ESTestCase {
private TransportPutWatchAction action;
private Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch();
private ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
@Before
public void setupAction() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
TransportService transportService = mock(TransportService.class);
Watch.Parser parser = mock(Watch.Parser.class);
when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject())).thenReturn(watch);
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
// mock an index response that calls the listener
doAnswer(invocation -> {
IndexRequest request = (IndexRequest) invocation.getArguments()[1];
ActionListener<IndexResponse> listener = (ActionListener) invocation.getArguments()[2];
ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), 0);
listener.onResponse(new IndexResponse(shardId, request.type(), request.id(), 1, 1, 1, true));
return null;
}).when(client).execute(any(), any(), any());
action = new TransportPutWatchAction(Settings.EMPTY, transportService, threadPool,
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), new ClockMock(),
new XPackLicenseState(), parser, client);
}
public void testHeadersAreFilteredWhenPuttingWatches() throws Exception {
// set up threadcontext with some arbitrary info
String headerName = randomFrom(Watcher.HEADER_FILTERS);
threadContext.putHeader(headerName, randomAlphaOfLength(10));
threadContext.putHeader(randomAlphaOfLength(10), "doesntmatter");
PutWatchRequest putWatchRequest = new PutWatchRequest();
putWatchRequest.setId("_id");
action.doExecute(putWatchRequest, ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue()))));
ArgumentCaptor<Map> captor = ArgumentCaptor.forClass(Map.class);
verify(watch.status()).setHeaders(captor.capture());
Map<String, String> capturedHeaders = captor.getValue();
assertThat(capturedHeaders.keySet(), hasSize(1));
assertThat(capturedHeaders, hasKey(headerName));
}
}

View File

@ -5,14 +5,24 @@
*/
package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.actions.ActionStatus.AckStatus.State;
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.joda.time.DateTime.now;
public class WatchStatusTests extends ESTestCase {
@ -31,4 +41,34 @@ public class WatchStatusTests extends ESTestCase {
assertThat(status.actionStatus("foo").ackStatus().state(), is(State.AWAITS_SUCCESSFUL_EXECUTION));
}
public void testHeadersToXContent() throws Exception {
WatchStatus status = new WatchStatus(now(), Collections.emptyMap());
String key = randomAlphaOfLength(10);
String value = randomAlphaOfLength(10);
Map<String, String> headers = Collections.singletonMap(key, value);
status.setHeaders(headers);
// by default headers are hidden
try (XContentBuilder builder = jsonBuilder()) {
status.toXContent(builder, ToXContent.EMPTY_PARAMS);
try (XContentParser parser = createParser(builder)) {
Map<String, Object> fields = parser.map();
assertThat(fields, not(hasKey(WatchStatus.Field.HEADERS.getPreferredName())));
}
}
// but they are required when storing a watch
try (XContentBuilder builder = jsonBuilder()) {
status.toXContent(builder, WatcherParams.builder().hideHeaders(false).build());
try (XContentParser parser = createParser(builder)) {
parser.nextToken();
Map<String, Object> fields = parser.map();
assertThat(fields, hasKey(WatchStatus.Field.HEADERS.getPreferredName()));
assertThat(fields.get(WatchStatus.Field.HEADERS.getPreferredName()), instanceOf(Map.class));
Map<String, Object> extractedHeaders = (Map<String, Object>) fields.get(WatchStatus.Field.HEADERS.getPreferredName());
assertThat(extractedHeaders, is(headers));
}
}
}
}

View File

@ -25,21 +25,6 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.notification.email.DataAttachment;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
import org.elasticsearch.xpack.watcher.notification.email.EmailTemplate;
import org.elasticsearch.xpack.watcher.notification.email.HtmlSanitizer;
import org.elasticsearch.xpack.watcher.notification.email.Profile;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachments;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser;
import org.elasticsearch.xpack.watcher.watch.clock.ClockMock;
import org.elasticsearch.xpack.watcher.actions.ActionFactory;
import org.elasticsearch.xpack.watcher.actions.ActionRegistry;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
@ -57,6 +42,13 @@ import org.elasticsearch.xpack.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.watcher.actions.webhook.ExecutableWebhookAction;
import org.elasticsearch.xpack.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.xpack.watcher.actions.webhook.WebhookActionFactory;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.condition.AlwaysConditionTests;
import org.elasticsearch.xpack.watcher.condition.ArrayCompareCondition;
@ -78,6 +70,13 @@ import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.xpack.watcher.notification.email.DataAttachment;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
import org.elasticsearch.xpack.watcher.notification.email.EmailTemplate;
import org.elasticsearch.xpack.watcher.notification.email.HtmlSanitizer;
import org.elasticsearch.xpack.watcher.notification.email.Profile;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachments;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine;
@ -112,6 +111,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.support.Month;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.MonthTimes;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.YearTimes;
import org.elasticsearch.xpack.watcher.watch.clock.ClockMock;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
@ -147,6 +147,7 @@ import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
public class WatchTests extends ESTestCase {
private ScriptService scriptService;
private Client client;
private HttpClient httpClient;
@ -587,7 +588,8 @@ public class WatchTests extends ESTestCase {
IndexAction action = new IndexAction("_index", "_type", randomBoolean() ? "123" : null, null, timeout, timeZone);
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30))));
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30))));
}
if (randomBoolean()) {
HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000))

View File

@ -37,7 +37,7 @@ teardown:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}

View File

@ -37,7 +37,7 @@ teardown:
}
},
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}

View File

@ -32,7 +32,7 @@ teardown:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}
@ -86,7 +86,7 @@ teardown:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}
@ -129,7 +129,7 @@ teardown:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}

View File

@ -48,7 +48,7 @@ setup:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}
@ -122,7 +122,7 @@ setup:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}
@ -182,7 +182,7 @@ setup:
"actions": {
"indexme" : {
"index" : {
"index" : "my-index",
"index" : "my_test_index",
"doc_type" : "my-type",
"doc_id": "my-id"
}

View File

@ -36,7 +36,7 @@ teardown:
"actions": {
"test_index": {
"index": {
"index": "test",
"index": "my_test_index",
"doc_type": "test2",
"doc_id": "test_id1"
}
@ -86,7 +86,7 @@ teardown:
"actions": {
"test_index": {
"index": {
"index": "test",
"index": "my_test_index",
"doc_type": "test2"
}
}
@ -143,7 +143,7 @@ teardown:
"actions": {
"test_index": {
"index": {
"index": "test",
"index": "my_test_index",
"doc_type": "test2"
}
}
@ -202,7 +202,7 @@ teardown:
"actions": {
"test_index": {
"index": {
"index": "test",
"index": "my_test_index",
"doc_type": "test2"
}
}

View File

@ -7,6 +7,9 @@ package org.elasticsearch.smoketest;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -18,8 +21,7 @@ import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Collections;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -43,12 +45,20 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY
@Before
public void startWatcher() throws Exception {
final List<String> watcherTemplates = Arrays.asList(WatcherIndexTemplateRegistry.TEMPLATE_NAMES);
// delete the watcher history to not clutter with entries from other test
getAdminExecutionContext().callApi("indices.delete", Collections.singletonMap("index", ".watcher-history-*"),
emptyList(), emptyMap());
// create one document in this index, so we can test in the YAML tests, that the index cannot be accessed
Response resp = adminClient().performRequest("PUT", "/index_not_allowed_to_read/doc/1", Collections.emptyMap(),
new StringEntity("{\"foo\":\"bar\"}", ContentType.APPLICATION_JSON));
assertThat(resp.getStatusLine().getStatusCode(), is(201));
assertBusy(() -> {
try {
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
for (String template : watcherTemplates) {
for (String template : WatcherIndexTemplateRegistry.TEMPLATE_NAMES) {
ClientYamlTestResponse templateExistsResponse = getAdminExecutionContext().callApi("indices.exists_template",
singletonMap("name", template), emptyList(), emptyMap());
assertThat(templateExistsResponse.getStatusCode(), is(200));

View File

@ -0,0 +1,330 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
# user watcher_user is allowed to write into this index
- do:
index:
index: my_test_index
type: type
id: 1
refresh: true
body: >
{
"value" : "15"
}
---
teardown:
- do:
xpack.watcher.delete_watch:
id: "my_watch"
ignore: 404
---
"Test watch search input is run as user who added the watch":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Successfully ran my_watch to test for search input"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { _id: "my_watch" }
- is_false: watch.status.headers
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
---
"Test watch search input does not work against index user is not allowed to read":
- do:
# by impersonating this request as powerless user we cannot query the my_test_index
# headers: { es-security-runas-user: powerless_user }
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "index_not_allowed_to_read" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "This message should never occur in the logs as the search above should not have returned any hits"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { _id: "my_watch" }
- is_false: watch.status.headers
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
# because we are not allowed to read the index, there wont be any data
- match: { watch_record.state: "execution_not_needed" }
---
"Test watch search transform is run as user who added the watch":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"transform" : {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- do:
get:
index: my_test_index
type: type
id: my-id
# this value is from the document in the my_text_index index, see the setup
- match: { _source.hits.hits.0._source.value: "15" }
---
"Test watch search transform does not work without permissions":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"transform" : {
"search" : {
"request" : {
"indices" : [ "index_not_allowed_to_read" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- do:
get:
index: my_test_index
type: type
id: my-id
- match: { _source.hits.total: 0 }
---
"Test watch index action requires permission to write to an index":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { _id: "my_watch" }
- is_false: watch.status.headers
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- do:
get:
index: my_test_index
type: type
id: 1
- match: { _id: "1" }
---
"Test watch index action does not work without permissions":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"actions": {
"index": {
"index": {
"index" : "index_not_allowed_to_read",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { _id: "my_watch" }
- is_false: watch.status.headers
- do:
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- do:
get:
index: index_not_allowed_to_read
type: type
id: 1
catch: forbidden

View File

@ -0,0 +1,558 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
# user watcher_user is allowed to write into this index
- do:
index:
index: my_test_index
type: type
id: 1
refresh: true
body: >
{
"value" : "15"
}
---
teardown:
- do:
xpack.watcher.delete_watch:
id: "my_watch"
ignore: 404
---
"Test watch search input is run as user who added the watch":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"throttle_period" : "1h",
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Successfully ran my_watch to test for search input"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "executed" }
---
"Test watch search input does not work against index user is not allowed to read":
- do:
# by impersonating this request as powerless user we cannot query the my_test_index
# headers: { es-security-runas-user: powerless_user }
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "index_not_allowed_to_read" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "This message should never occur in the logs as the search above should have failed"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "execution_not_needed" }
---
"Test watch search transform is run as user who added the watch":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"transform" : {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- do:
get:
index: my_test_index
type: type
id: my-id
# this value is from the document in the my_text_index index, see the setup
- match: { _source.hits.hits.0._source.value: "15" }
---
"Test watch search transform does not work without permissions":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"transform" : {
"search" : {
"request" : {
"indices" : [ "index_not_allowed_to_read" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- do:
get:
index: my_test_index
type: type
id: my-id
- match: { _source.hits.total: 0 }
---
"Test watch index action requires permission to write to an index":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "executed" }
- do:
get:
index: my_test_index
type: type
id: 1
- match: { _id: "1" }
---
# this is tricky to test, as we are not allowed to read the index...
"Test watch index action does not work without permissions":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"actions": {
"index": {
"index": {
"index" : "index_not_allowed_to_read",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "executed" }
- do:
get:
index: index_not_allowed_to_read
type: type
id: 1
catch: forbidden