Watcher: Replace YAML tests needing to wait for watch execution (elastic/x-pack-elasticsearch#3995)

In order to check for the REST tests if triggering of watches with
security enabled works as expected, we have to add a watch and wait for
its background execution. In the REST tests the only wait is to wait for
this with a timeout. If the timeout is reached but the watch has not
been executed yet, the test will fail.

This commit replaces the YAML with a java based REST test, so that
helper methods like assertBusy() can be used and waiting for a watch to
be executed now works as expected.

relates elastic/x-pack-elasticsearch#3753

Original commit: elastic/x-pack-elasticsearch@fc39636ef7
This commit is contained in:
Alexander Reelsen 2018-02-21 09:58:11 +01:00 committed by GitHub
parent 0bf354eb38
commit 90c360d9d0
2 changed files with 303 additions and 561 deletions

View File

@ -0,0 +1,303 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.smoketest;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase {
private static final String TEST_ADMIN_USERNAME = "test_admin";
private static final String TEST_ADMIN_PASSWORD = "x-pack-test-password";
@Before
public void startWatcher() throws Exception {
StringEntity entity = new StringEntity("{ \"value\" : \"15\" }", ContentType.APPLICATION_JSON);
assertOK(adminClient().performRequest("PUT", "my_test_index/doc/1", Collections.singletonMap("refresh", "true"), entity));
// delete the watcher history to not clutter with entries from other test
adminClient().performRequest("DELETE", ".watcher-history-*", Collections.emptyMap());
// create one document in this index, so we can test in the YAML tests, that the index cannot be accessed
Response resp = adminClient().performRequest("PUT", "/index_not_allowed_to_read/doc/1", Collections.emptyMap(),
new StringEntity("{\"foo\":\"bar\"}", ContentType.APPLICATION_JSON));
assertThat(resp.getStatusLine().getStatusCode(), is(201));
assertBusy(() -> {
try {
adminClient().performRequest("POST", "_xpack/watcher/_start");
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
assertOK(adminClient().performRequest("HEAD", "_template/" + template));
}
Response statsResponse = adminClient().performRequest("GET", "_xpack/watcher/stats");
ObjectPath objectPath = ObjectPath.createFromResponse(statsResponse);
String state = objectPath.evaluate("stats.0.watcher_state");
assertThat(state, is("started"));
} catch (IOException e) {
throw new AssertionError(e);
}
});
}
@After
public void stopWatcher() throws Exception {
adminClient().performRequest("DELETE", "_xpack/watcher/watch/my_watch");
assertOK(adminClient().performRequest("DELETE", "my_test_index"));
assertBusy(() -> {
try {
adminClient().performRequest("POST", "_xpack/watcher/_stop", Collections.emptyMap());
Response statsResponse = adminClient().performRequest("GET", "_xpack/watcher/stats");
ObjectPath objectPath = ObjectPath.createFromResponse(statsResponse);
String state = objectPath.evaluate("stats.0.watcher_state");
assertThat(state, is("stopped"));
} catch (IOException e) {
throw new AssertionError(e);
}
});
}
@Override
protected Settings restClientSettings() {
String token = basicAuthHeaderValue("watcher_manager", new SecureString("x-pack-test-password".toCharArray()));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
@Override
protected Settings restAdminSettings() {
String token = basicAuthHeaderValue(TEST_ADMIN_USERNAME, new SecureString(TEST_ADMIN_PASSWORD.toCharArray()));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
public void testSearchInputHasPermissions() throws Exception {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject();
builder.startObject("input").startObject("search").startObject("request")
.startArray("indices").value("my_test_index").endArray()
.startObject("body").startObject("query").startObject("match_all").endObject().endObject().endObject()
.endObject().endObject().endObject();
builder.startObject("condition").startObject("compare").startObject("ctx.payload.hits.total").field("gte", 1)
.endObject().endObject().endObject();
builder.startObject("actions").startObject("logging").startObject("logging")
.field("text", "successfully ran my_watch to test for search inpput").endObject().endObject().endObject();
builder.endObject();
indexWatch("my_watch", builder);
}
// check history, after watch has fired
ObjectPath objectPath = getWatchHistoryEntry("my_watch");
String state = objectPath.evaluate("hits.hits.0._source.state");
assertThat(state, is("executed"));
boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met");
assertThat(conditionMet, is(true));
}
public void testSearchInputWithInsufficientPrivileges() throws Exception {
String indexName = "index_not_allowed_to_read";
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject();
builder.startObject("input").startObject("search").startObject("request")
.startArray("indices").value(indexName).endArray()
.startObject("body").startObject("query").startObject("match_all").endObject().endObject().endObject()
.endObject().endObject().endObject();
builder.startObject("condition").startObject("compare").startObject("ctx.payload.hits.total").field("gte", 1)
.endObject().endObject().endObject();
builder.startObject("actions").startObject("logging").startObject("logging")
.field("text", "this should never be logged").endObject().endObject().endObject();
builder.endObject();
indexWatch("my_watch", builder);
}
// check history, after watch has fired
ObjectPath objectPath = getWatchHistoryEntry("my_watch");
String state = objectPath.evaluate("hits.hits.0._source.state");
assertThat(state, is("execution_not_needed"));
boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met");
assertThat(conditionMet, is(false));
}
public void testSearchTransformHasPermissions() throws Exception {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject();
builder.startObject("input").startObject("simple").field("foo", "bar").endObject().endObject();
builder.startObject("transform").startObject("search").startObject("request")
.startArray("indices").value("my_test_index").endArray()
.startObject("body").startObject("query").startObject("match_all").endObject().endObject().endObject()
.endObject().endObject().endObject();
builder.startObject("actions").startObject("index").startObject("index")
.field("index", "my_test_index")
.field("doc_type", "doc")
.field("doc_id", "my-id")
.endObject().endObject().endObject();
builder.endObject();
indexWatch("my_watch", builder);
}
// check history, after watch has fired
ObjectPath objectPath = getWatchHistoryEntry("my_watch");
String state = objectPath.evaluate("hits.hits.0._source.state");
assertThat(state, is("executed"));
boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met");
assertThat(conditionMet, is(true));
ObjectPath getObjectPath = ObjectPath.createFromResponse(client().performRequest("GET", "my_test_index/doc/my-id"));
String value = getObjectPath.evaluate("_source.hits.hits.0._source.value");
assertThat(value, is("15"));
}
public void testSearchTransformInsufficientPermissions() throws Exception {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject();
builder.startObject("input").startObject("simple").field("foo", "bar").endObject().endObject();
builder.startObject("transform").startObject("search").startObject("request")
.startArray("indices").value("index_not_allowed_to_read").endArray()
.startObject("body").startObject("query").startObject("match_all").endObject().endObject().endObject()
.endObject().endObject().endObject();
builder.startObject("condition").startObject("compare").startObject("ctx.payload.hits.total").field("gte", 1)
.endObject().endObject().endObject();
builder.startObject("actions").startObject("index").startObject("index")
.field("index", "my_test_index")
.field("doc_type", "doc")
.field("doc_id", "some-id")
.endObject().endObject().endObject();
builder.endObject();
indexWatch("my_watch", builder);
}
getWatchHistoryEntry("my_watch");
Response response = adminClient().performRequest("GET", "my_test_index/doc/some-id",
Collections.singletonMap("ignore", "404"));
assertThat(response.getStatusLine().getStatusCode(), is(404));
}
public void testIndexActionHasPermissions() throws Exception {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject();
builder.startObject("input").startObject("simple").field("spam", "eggs").endObject().endObject();
builder.startObject("actions").startObject("index").startObject("index")
.field("index", "my_test_index")
.field("doc_type", "doc")
.field("doc_id", "my-id")
.endObject().endObject().endObject();
builder.endObject();
indexWatch("my_watch", builder);
}
ObjectPath objectPath = getWatchHistoryEntry("my_watch");
String state = objectPath.evaluate("hits.hits.0._source.state");
assertThat(state, is("executed"));
boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met");
assertThat(conditionMet, is(true));
ObjectPath getObjectPath = ObjectPath.createFromResponse(client().performRequest("GET", "my_test_index/doc/my-id"));
String spam = getObjectPath.evaluate("_source.spam");
assertThat(spam, is("eggs"));
}
public void testIndexActionInsufficientPrivileges() throws Exception {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("trigger").startObject("schedule").field("interval", "1s").endObject().endObject();
builder.startObject("input").startObject("simple").field("spam", "eggs").endObject().endObject();
builder.startObject("actions").startObject("index").startObject("index")
.field("index", "index_not_allowed_to_read")
.field("doc_type", "doc")
.field("doc_id", "my-id")
.endObject().endObject().endObject();
builder.endObject();
indexWatch("my_watch", builder);
}
ObjectPath objectPath = getWatchHistoryEntry("my_watch");
String state = objectPath.evaluate("hits.hits.0._source.state");
assertThat(state, is("executed"));
boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met");
assertThat(conditionMet, is(true));
Response response = adminClient().performRequest("GET", "index_not_allowed_to_read/doc/my-id",
Collections.singletonMap("ignore", "404"));
assertThat(response.getStatusLine().getStatusCode(), is(404));
}
private void indexWatch(String watchId, XContentBuilder builder) throws Exception {
StringEntity entity = new StringEntity(builder.string(), ContentType.APPLICATION_JSON);
Response response = client().performRequest("PUT", "_xpack/watcher/watch/my_watch", Collections.emptyMap(), entity);
assertOK(response);
Map<String, Object> responseMap = entityAsMap(response);
assertThat(responseMap, hasEntry("_id", watchId));
}
private ObjectPath getWatchHistoryEntry(String watchId) throws Exception {
final AtomicReference<ObjectPath> objectPathReference = new AtomicReference<>();
assertBusy(() -> {
client().performRequest("POST", ".watcher-history-*/_refresh");
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("query").startObject("bool").startArray("must");
builder.startObject().startObject("term").startObject("watch_id").field("value", watchId).endObject().endObject()
.endObject();
builder.endArray().endObject().endObject();
builder.startArray("sort").startObject().startObject("trigger_event.triggered_time").field("order", "desc").endObject()
.endObject().endArray();
builder.endObject();
StringEntity entity = new StringEntity(builder.string(), ContentType.APPLICATION_JSON);
Response response = client().performRequest("POST", ".watcher-history-*/_search", Collections.emptyMap(), entity);
ObjectPath objectPath = ObjectPath.createFromResponse(response);
int totalHits = objectPath.evaluate("hits.total");
assertThat(totalHits, is(greaterThanOrEqualTo(1)));
String watchid = objectPath.evaluate("hits.hits.0._source.watch_id");
assertThat(watchid, is(watchId));
objectPathReference.set(objectPath);
}
});
return objectPathReference.get();
}
}

View File

@ -1,561 +0,0 @@
---
setup:
- do:
cluster.health:
wait_for_status: yellow
# user watcher_user is allowed to write into this index
- do:
index:
index: my_test_index
type: type
id: 1
refresh: true
body: >
{
"value" : "15"
}
---
teardown:
- do:
xpack.watcher.delete_watch:
id: "my_watch"
ignore: 404
---
"Test watch search input is run as user who added the watch":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"throttle_period" : "1h",
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Successfully ran my_watch to test for search input"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "executed" }
---
"Test watch search input does not work against index user is not allowed to read":
- do:
# by impersonating this request as powerless user we cannot query the my_test_index
# headers: { es-security-runas-user: powerless_user }
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "index_not_allowed_to_read" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "This message should never occur in the logs as the search above should have failed"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "execution_not_needed" }
---
"Test watch search transform is run as user who added the watch":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"transform" : {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- do:
get:
index: my_test_index
type: type
id: my-id
# this value is from the document in the my_text_index index, see the setup
- match: { _source.hits.hits.0._source.value: "15" }
---
"Test watch search transform does not work without permissions":
- skip:
version: "all"
reason: "AwaitsFix on https://github.com/elastic/x-pack-elasticsearch/issues/3753"
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"transform" : {
"search" : {
"request" : {
"indices" : [ "index_not_allowed_to_read" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- do:
get:
index: my_test_index
type: type
id: my-id
- match: { _source.hits.total: 0 }
---
"Test watch index action requires permission to write to an index":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"actions": {
"index": {
"index": {
"index" : "my_test_index",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "executed" }
- do:
get:
index: my_test_index
type: type
id: 1
- match: { _id: "1" }
---
# this is tricky to test, as we are not allowed to read the index...
"Test watch index action does not work without permissions":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "interval" : "3s" }
},
"input": {
"simple" : {
"foo" : "bar"
}
},
"actions": {
"index": {
"index": {
"index" : "index_not_allowed_to_read",
"doc_type" : "type",
"doc_id": "my-id"
}
}
}
}
- match: { _id: "my_watch" }
# Simulate sleeping, so that the watch triggers
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 5s
- match: { "timed_out": true }
- do:
indices.refresh:
index: .watcher-history-*
- do:
search:
index: .watcher-history-*
body: >
{
"query": {
"bool": {
"must" : [
{
"term": {
"watch_id": {
"value": "my_watch"
}
}
},
{
"term": {
"result.condition.met": {
"value": "true"
}
}
}
]
}
},
"sort": [
{
"trigger_event.triggered_time": {
"order": "desc"
}
}
]
}
- gte: { hits.total: 1 }
- match: { hits.hits.0._source.watch_id: "my_watch" }
- match: { hits.hits.0._source.state: "executed" }
- do:
get:
index: index_not_allowed_to_read
type: type
id: 1
catch: forbidden