Merge remote-tracking branch 'upstream/master'

Original commit: elastic/x-pack-elasticsearch@5b94f9472a
This commit is contained in:
lcawley 2017-12-15 11:07:02 -08:00
commit ab737982bb
14 changed files with 362 additions and 249 deletions

View File

@ -157,3 +157,17 @@ poorer precision worthwhile. If you want to view or change the aggregations
that are used in your job, refer to the `aggregations` property in your {dfeed}.
For more information, see {ref}/ml-datafeed-resource.html[Datafeed Resources].
[float]
=== Security Integration
When {security} is enabled, a {dfeed} stores the roles of the user who created
or updated the {dfeed} **at that time**. This means that if those roles are
updated then the {dfeed} will subsequently run with the new permissions associated
with the roles. However, if the user's roles are adjusted after creating or
updating the {dfeed} then the {dfeed} will continue to run with the permissions
associated with the original roles.
A way to update the roles stored within the {dfeed} without changing any other
settings is to submit an empty JSON document (`{}`) to the
{ref}/ml-update-datafeed.html[update {dfeed} API].

View File

@ -34,6 +34,17 @@ privileges to use this API. For more information, see
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, the {dfeed} query will be previewed using the
credentials of the user calling the preview {dfeed} API. When the {dfeed}
is started it will run the query using the roles of the last user to
create or update it. If the two sets of roles differ then the preview may
not accurately reflect what the {dfeed} will return when started. To avoid
such problems, the same user that creates/updates the {dfeed} should preview
it to ensure it is returning the expected data.
==== Examples
The following example obtains a preview of the `datafeed-farequote` {dfeed}:

View File

@ -87,6 +87,13 @@ For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, your {dfeed} will remember which roles the user who
created it had at the time of creation, and run the query using those same roles.
==== Examples
The following example creates the `datafeed-total-requests` {dfeed}:

View File

@ -81,6 +81,13 @@ For more information, see
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, your {dfeed} will remember which roles the last
user to create or update it had at the time of creation/update, and run the query
using those same roles.
==== Examples
The following example starts the `datafeed-it-ops-kpi` {dfeed}:

View File

@ -82,6 +82,13 @@ For more information, see
{xpack-ref}/security-privileges.html[Security Privileges].
//<<privileges-list-cluster>>.
==== Security Integration
When {security} is enabled, your {dfeed} will remember which roles the user who
updated it had at the time of update, and run the query using those same roles.
==== Examples
The following example updates the query for the `datafeed-it-ops-kpi` {dfeed}

View File

@ -71,5 +71,5 @@ When a `_doc` field exists, if the field holds an object, it is extracted and in
as a single document. If the field holds an array of objects, each object is treated as
a document and the index action indexes all of them in a bulk.
An `_id` value can be added per document to dynamically set the ID of the indexed
document.
An `_index`, `_type` or `_id` value can be added per document to dynamically set the ID
of the indexed document.

View File

@ -49,7 +49,7 @@ public class XPackLicenseState {
"Machine learning APIs are disabled"
});
messages.put(XPackPlugin.LOGSTASH, new String[] {
"Logstash specific APIs are disabled. You can continue to manage and poll stored configurations"
"Logstash will continue to poll centrally-managed pipelines"
});
messages.put(XPackPlugin.DEPRECATION, new String[] {
"Deprecation APIs are disabled"
@ -201,8 +201,7 @@ public class XPackLicenseState {
case STANDARD:
case GOLD:
case PLATINUM:
return new String[] { "Logstash specific APIs will be disabled, but you can continue to manage " +
"and poll stored configurations" };
return new String[] { "Logstash will no longer poll for centrally-managed pipelines" };
}
break;
}

View File

@ -38,6 +38,8 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
private static final String INDEX_FIELD = "_index";
private static final String TYPE_FIELD = "_type";
private static final String ID_FIELD = "_id";
private final Client client;
@ -71,24 +73,13 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
}
String docId = action.docId;
// prevent double-setting id
if (data.containsKey(ID_FIELD)) {
if (docId != null) {
throw illegalState("could not execute action [{}] of watch [{}]. " +
"[ctx.payload.{}] or [ctx.payload._doc.{}] were set with [doc_id]. Only set [{}] or [doc_id]",
actionId, ctx.watch().id(), ID_FIELD, ID_FIELD, ID_FIELD);
}
if (data.containsKey(INDEX_FIELD) || data.containsKey(TYPE_FIELD) || data.containsKey(ID_FIELD)) {
data = mutableMap(data);
docId = data.remove(ID_FIELD).toString();
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
indexRequest.id(docId);
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",data, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
data = addTimestampToDocument(data, ctx.executionTime());
BytesReference bytesReference;
@ -97,8 +88,8 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
}
if (ctx.simulateAction(actionId)) {
return new IndexAction.Simulated(indexRequest.index(), action.docType, docId, new XContentSource(indexRequest.source(),
XContentType.JSON));
return new IndexAction.Simulated(indexRequest.index(), indexRequest.type(), indexRequest.id(),
new XContentSource(indexRequest.source(), XContentType.JSON));
}
IndexResponse response = WatcherClientHelper.execute(ctx.watch(), client,
@ -121,14 +112,17 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
throw illegalState("could not execute action [{}] of watch [{}]. failed to index payload data. " +
"[_data] field must either hold a Map or an List/Array of Maps", actionId, ctx.watch().id());
}
Map<String, Object> doc = (Map<String, Object>) item;
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(action.index);
indexRequest.type(action.docType);
if (doc.containsKey(ID_FIELD)) {
if (doc.containsKey(INDEX_FIELD) || doc.containsKey(TYPE_FIELD) || doc.containsKey(ID_FIELD)) {
doc = mutableMap(doc);
indexRequest.id(doc.remove(ID_FIELD).toString());
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
indexRequest.type(getField(actionId, ctx.id().watchId(), "type",doc, TYPE_FIELD, action.docType));
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
doc = addTimestampToDocument(doc, ctx.executionTime());
try (XContentBuilder builder = jsonBuilder()) {
indexRequest.source(builder.prettyPrint().map(doc));
@ -163,6 +157,24 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
return data;
}
/**
* Extracts the specified field out of data map, or alternative falls back to the action value
*/
private String getField(String actionId, String watchId, String name, Map<String, Object> data, String fieldName, String defaultValue) {
Object obj = data.remove(fieldName);
if (obj != null) {
if (defaultValue != null) {
throw illegalState("could not execute action [{}] of watch [{}]. " +
"[ctx.payload.{}] or [ctx.payload._doc.{}] were set together with action [{}] field. Only set one of them",
actionId, watchId, fieldName, fieldName, name);
} else {
return obj.toString();
}
}
return defaultValue;
}
/**
* Guarantees that the {@code data} is mutable for any code that needs to modify the {@linkplain Map} before using it (e.g., from
* singleton, immutable {@code Map}s).

View File

@ -25,14 +25,14 @@ public class IndexAction implements Action {
public static final String TYPE = "index";
final String index;
final String docType;
@Nullable final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final String executionTimeField;
@Nullable final TimeValue timeout;
@Nullable final DateTimeZone dynamicNameTimeZone;
public IndexAction(String index, String docType, @Nullable String docId,
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable DateTimeZone dynamicNameTimeZone) {
this.index = index;
@ -89,8 +89,12 @@ public class IndexAction implements Action {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (index != null) {
builder.field(Field.INDEX.getPreferredName(), index);
}
if (docType != null) {
builder.field(Field.DOC_TYPE.getPreferredName(), docType);
}
if (docId != null) {
builder.field(Field.DOC_ID.getPreferredName(), docId);
}
@ -144,12 +148,7 @@ public class IndexAction implements Action {
// Parser for human specified timeouts and 2.x compatibility
timeout = WatcherDateTimeUtils.parseTimeValue(parser, Field.TIMEOUT_HUMAN.toString());
} else if (Field.DYNAMIC_NAME_TIMEZONE.match(currentFieldName)) {
if (token == XContentParser.Token.VALUE_STRING) {
dynamicNameTimeZone = DateTimeZone.forID(parser.text());
} else {
throw new ElasticsearchParseException("could not parse [{}] action for watch [{}]. failed to parse [{}]. must be " +
"a string value (e.g. 'UTC' or '+01:00').", TYPE, watchId, currentFieldName);
}
} else {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. unexpected string field [{}]", TYPE,
watchId, actionId, currentFieldName);
@ -160,16 +159,6 @@ public class IndexAction implements Action {
}
}
if (index == null) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId,
actionId, Field.INDEX.getPreferredName());
}
if (docType == null) {
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. missing required [{}] field", TYPE, watchId,
actionId, Field.DOC_TYPE.getPreferredName());
}
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone);
}

View File

@ -61,6 +61,13 @@
}
]
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
}
]
}

View File

@ -54,6 +54,13 @@
}
]
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
}
]
}

View File

@ -51,6 +51,13 @@
"term": {
"type": "cluster_stats"
}
},
{
"range": {
"timestamp": {
"gte": "now-2m"
}
}
}
]
}

View File

@ -5,27 +5,35 @@
*/
package org.elasticsearch.xpack.watcher.actions.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Arrays;
@ -37,161 +45,36 @@ import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class IndexActionTests extends ESIntegTestCase {
public class IndexActionTests extends ESTestCase {
public void testIndexActionExecuteSingleDoc() throws Exception {
boolean customId = randomBoolean();
boolean docIdAsParam = customId && randomBoolean();
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
boolean customTimestampField = timestampField != null;
private final Client client = mock(Client.class);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
Payload payload;
if (customId && docIdAsParam == false) {
// intentionally immutable because the other side needs to cut out _id
payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap());
} else {
payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar"));
}
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response();
assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("version"), equalTo((Object) 1));
assertThat(response.getValue("type").toString(), equalTo("test-type"));
assertThat(response.getValue("index").toString(), equalTo("test-index"));
refresh(); //Manually refresh to make sure data is available
SearchRequestBuilder searchRequestbuilder = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource().query(matchAllQuery()));
if (customTimestampField) {
searchRequestbuilder.addAggregation(terms("timestamps").field(timestampField));
}
SearchResponse searchResponse = searchRequestbuilder.get();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
SearchHit hit = searchResponse.getHits().getAt(0);
if (customId) {
assertThat(hit.getId(), is(docId));
}
if (customTimestampField) {
assertThat(hit.getSourceAsMap().size(), is(2));
assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar"));
assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
Terms terms = searchResponse.getAggregations().get("timestamps");
assertThat(terms, notNullValue());
assertThat(terms.getBuckets(), hasSize(1));
assertThat(terms.getBuckets().get(0).getKeyAsNumber().longValue(), is(executionTime.getMillis()));
assertThat(terms.getBuckets().get(0).getDocCount(), is(1L));
} else {
assertThat(hit.getSourceAsMap().size(), is(1));
assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) "bar"));
}
}
public void testIndexActionExecuteMultiDoc() throws Exception {
String timestampField = randomFrom("@timestamp", null);
boolean customTimestampField = "@timestamp".equals(timestampField);
assertAcked(prepareCreate("test-index")
.addMapping("test-type", "foo", "type=keyword"));
List<Map> idList = Arrays.asList(
MapBuilder.newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap(),
MapBuilder.newMapBuilder().put("foo", "bar1").put("_id", "1").map()
);
Object list = randomFrom(
new Map[] { singletonMap("foo", "bar"), singletonMap("foo", "bar1") },
Arrays.asList(singletonMap("foo", "bar"), singletonMap("foo", "bar1")),
unmodifiableSet(newHashSet(singletonMap("foo", "bar"), singletonMap("foo", "bar1"))),
idList
);
boolean customId = list == idList;
IndexAction action = new IndexAction("test-index", "test-type", null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(), TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("watch_id", executionTime, new Payload.Simple("_doc", list));
Action.Result result = executable.execute("watch_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response();
assertThat(successResult.toString(), response.getValue("0.created"), equalTo((Object)Boolean.TRUE));
assertThat(successResult.toString(), response.getValue("0.version"), equalTo((Object) 1));
assertThat(successResult.toString(), response.getValue("0.type").toString(), equalTo("test-type"));
assertThat(successResult.toString(), response.getValue("0.index").toString(), equalTo("test-index"));
assertThat(successResult.toString(), response.getValue("1.created"), equalTo((Object)Boolean.TRUE));
assertThat(successResult.toString(), response.getValue("1.version"), equalTo((Object) 1));
assertThat(successResult.toString(), response.getValue("1.type").toString(), equalTo("test-type"));
assertThat(successResult.toString(), response.getValue("1.index").toString(), equalTo("test-index"));
refresh(); //Manually refresh to make sure data is available
SearchResponse searchResponse = client().prepareSearch("test-index")
.setTypes("test-type")
.setSource(searchSource().sort("foo", SortOrder.ASC)
.query(matchAllQuery()))
.get();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(2L));
final int fields = customTimestampField ? 2 : 1;
for (int i = 0; i < 2; ++i) {
final SearchHit hit = searchResponse.getHits().getAt(i);
final String value = "bar" + (i != 0 ? i : "");
assertThat(hit.getSourceAsMap().size(), is(fields));
if (customId) {
assertThat(hit.getId(), is(Integer.toString(i)));
}
if (customTimestampField) {
assertThat(hit.getSourceAsMap(), hasEntry(timestampField, (Object) WatcherDateTimeUtils.formatDate(executionTime)));
}
assertThat(hit.getSourceAsMap(), hasEntry("foo", (Object) value));
}
@Before
public void setupClient() {
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.threadPool()).thenReturn(threadPool);
}
public void testParser() throws Exception {
String timestampField = randomBoolean() ? "@timestamp" : null;
XContentBuilder builder = jsonBuilder();
builder.startObject();
boolean includeIndex = randomBoolean();
if (includeIndex) {
builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index");
}
builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type");
if (timestampField != null) {
builder.field(IndexAction.Field.EXECUTION_TIME_FIELD.getPreferredName(), timestampField);
@ -201,14 +84,16 @@ public class IndexActionTests extends ESIntegTestCase {
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client());
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
ExecutableIndexAction executable = actionParser.parseExecutable(randomAlphaOfLength(5), randomAlphaOfLength(3), parser);
assertThat(executable.action().docType, equalTo("test-type"));
if (includeIndex) {
assertThat(executable.action().index, equalTo("test-index"));
}
if (timestampField != null) {
assertThat(executable.action().executionTimeField, equalTo(timestampField));
}
@ -216,63 +101,40 @@ public class IndexActionTests extends ESIntegTestCase {
}
public void testParserFailure() throws Exception {
XContentBuilder builder = jsonBuilder();
boolean useIndex = randomBoolean();
boolean useType = randomBoolean();
builder.startObject();
{
if (useIndex) {
builder.field(IndexAction.Field.INDEX.getPreferredName(), "test-index");
// wrong type for field
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.DOC_TYPE.getPreferredName(), 1234)
.endObject());
expectParseFailure(jsonBuilder()
.startObject()
.field(IndexAction.Field.TIMEOUT.getPreferredName(), "1234")
.endObject());
// unknown field
expectParseFailure(jsonBuilder()
.startObject()
.field("unknown", "whatever")
.endObject());
expectParseFailure(jsonBuilder()
.startObject()
.field("unknown", 1234)
.endObject());
}
if (useType) {
builder.field(IndexAction.Field.DOC_TYPE.getPreferredName(), "test-type");
}
}
builder.endObject();
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client());
private void expectParseFailure(XContentBuilder builder) throws Exception {
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
XContentParser parser = createParser(builder);
parser.nextToken();
try {
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser);
if (!(useIndex && useType)) {
fail();
}
} catch (ElasticsearchParseException iae) {
assertThat(useIndex && useType, equalTo(false));
}
}
// https://github.com/elastic/x-pack/issues/4416
public void testIndexingWithWrongMappingReturnsFailureResult() throws Exception {
// index a document to set the mapping of the foo field to a boolean
client().prepareIndex("test-index", "test-type", "_id").setSource("foo", true)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(),
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
List<Map<String, Object>> docs = new ArrayList<>();
boolean addSuccessfulIndexedDoc = randomBoolean();
if (addSuccessfulIndexedDoc) {
docs.add(Collections.singletonMap("foo", randomBoolean()));
}
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload);
Action.Result result = executable.execute("_id", ctx, payload);
if (addSuccessfulIndexedDoc) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
} else {
assertThat(result.status(), is(Status.FAILURE));
}
expectThrows(ElasticsearchParseException.class, () ->
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
}
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
final IndexAction action = new IndexAction("test-index", "test-type", "123", null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client(),
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithId = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar").put("_id", "0").immutableMap();
final DateTime executionTime = DateTime.now(UTC);
@ -301,4 +163,184 @@ public class IndexActionTests extends ESIntegTestCase {
executable.execute("_id", ctx, ctx.payload());
});
}
public void testThatIndexTypeIdDynamically() throws Exception {
boolean configureIndexDynamically = randomBoolean();
boolean configureTypeDynamically = randomBoolean();
boolean configureIdDynamically = (configureTypeDynamically == false && configureIndexDynamically == false) || randomBoolean();
MapBuilder<String, Object> builder = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar");
if (configureIdDynamically) {
builder.put("_id", "my_dynamic_id");
}
if (configureTypeDynamically) {
builder.put("_type", "my_dynamic_type");
}
if (configureIndexDynamically) {
builder.put("_index", "my_dynamic_index");
}
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
configureTypeDynamically ? null : "my_type",
configureIdDynamically ? null : "my_id",
null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", new Payload.Simple(builder.immutableMap()));
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
PlainActionFuture<IndexResponse> listener = PlainActionFuture.newFuture();
listener.onResponse(new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true));
when(client.index(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), is(Status.SUCCESS));
assertThat(captor.getAllValues(), hasSize(1));
assertThat(captor.getValue().index(), is(configureIndexDynamically ? "my_dynamic_index" : "my_index"));
assertThat(captor.getValue().type(), is(configureTypeDynamically ? "my_dynamic_type" : "my_type"));
assertThat(captor.getValue().id(), is(configureIdDynamically ? "my_dynamic_id" : "my_id"));
}
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
final IndexAction action = new IndexAction(null, "my-type", null, null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithIndex = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar")
.put("_index", "my-index").immutableMap();
final Map<String, Object> docWithOtherIndex = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar")
.put("_index", "my-other-index").immutableMap();
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id",
new Payload.Simple("_doc", Arrays.asList(docWithIndex, docWithOtherIndex)));
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
PlainActionFuture<BulkResponse> listener = PlainActionFuture.newFuture();
IndexResponse indexResponse = new IndexResponse(new ShardId(new Index("foo", "bar"), 0), "whatever", "whatever", 1, 1, 1, true);
BulkItemResponse response = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, indexResponse);
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{response}, 1);
listener.onResponse(bulkResponse);
when(client.bulk(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), is(Status.SUCCESS));
assertThat(captor.getAllValues(), hasSize(1));
assertThat(captor.getValue().requests(), hasSize(2));
assertThat(captor.getValue().requests().get(0).type(), is("my-type"));
assertThat(captor.getValue().requests().get(0).index(), is("my-index"));
assertThat(captor.getValue().requests().get(1).type(), is("my-type"));
assertThat(captor.getValue().requests().get(1).index(), is("my-other-index"));
}
public void testConfigureIndexInMapAndAction() {
String fieldName = randomFrom("_index", "_type");
final IndexAction action = new IndexAction(fieldName.equals("_index") ? "my_index" : null,
fieldName.equals("_type") ? "my_type" : null,
null,null, null, null);
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
final Map<String, Object> docWithIndex = MapBuilder.<String, Object>newMapBuilder().put("foo", "bar")
.put(fieldName, "my-value").immutableMap();
final WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id",
new Payload.Simple("_doc", Collections.singletonList(docWithIndex)));
IllegalStateException e = expectThrows(IllegalStateException.class, () -> executable.execute("_id", ctx, ctx.payload()));
assertThat(e.getMessage(), startsWith("could not execute action [_id] of watch [_id]. [ctx.payload." +
fieldName + "] or [ctx.payload._doc." + fieldName + "]"));
}
public void testIndexActionExecuteSingleDoc() throws Exception {
boolean customId = randomBoolean();
boolean docIdAsParam = customId && randomBoolean();
String docId = randomAlphaOfLength(5);
String timestampField = randomFrom("@timestamp", null);
IndexAction action = new IndexAction("test-index", "test-type", docIdAsParam ? docId : null, timestampField, null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(30));
DateTime executionTime = DateTime.now(UTC);
Payload payload;
if (customId && docIdAsParam == false) {
// intentionally immutable because the other side needs to cut out _id
payload = new Payload.Simple("_doc", MapBuilder.newMapBuilder().put("foo", "bar").put("_id", docId).immutableMap());
} else {
payload = randomBoolean() ? new Payload.Simple("foo", "bar") : new Payload.Simple("_doc", singletonMap("foo", "bar"));
}
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", executionTime, payload);
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
PlainActionFuture<IndexResponse> listener = PlainActionFuture.newFuture();
listener.onResponse(new IndexResponse(new ShardId(new Index("test-index", "uuid"), 0), "test-type", docId, 1, 1, 1, true));
when(client.index(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, ctx.payload());
assertThat(result.status(), equalTo(Status.SUCCESS));
assertThat(result, instanceOf(IndexAction.Result.class));
IndexAction.Result successResult = (IndexAction.Result) result;
XContentSource response = successResult.response();
assertThat(response.getValue("created"), equalTo((Object)Boolean.TRUE));
assertThat(response.getValue("version"), equalTo((Object) 1));
assertThat(response.getValue("type").toString(), equalTo("test-type"));
assertThat(response.getValue("index").toString(), equalTo("test-index"));
assertThat(captor.getAllValues(), hasSize(1));
IndexRequest indexRequest = captor.getValue();
assertThat(indexRequest.sourceAsMap(), is(hasEntry("foo", "bar")));
if (customId) {
assertThat(indexRequest.id(), is(docId));
}
if (timestampField != null) {
assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(2)));
assertThat(indexRequest.sourceAsMap(), hasEntry(timestampField, executionTime.toString()));
} else {
assertThat(indexRequest.sourceAsMap().keySet(), is(hasSize(1)));
}
}
public void testFailureResult() throws Exception {
IndexAction action = new IndexAction("test-index", "test-type", null, "@timestamp", null, null);
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
// should the result resemble a failure or a partial failure
boolean isPartialFailure = randomBoolean();
List<Map<String, Object>> docs = new ArrayList<>();
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
docs.add(Collections.singletonMap("foo", Collections.singletonMap("foo", "bar")));
Payload payload = new Payload.Simple(Collections.singletonMap("_doc", docs));
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", DateTime.now(UTC), payload);
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
PlainActionFuture<BulkResponse> listener = PlainActionFuture.newFuture();
BulkItemResponse.Failure failure = new BulkItemResponse.Failure("test-index", "test-type", "anything",
new ElasticsearchException("anything"));
BulkItemResponse firstResponse = new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, failure);
BulkItemResponse secondResponse;
if (isPartialFailure) {
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
IndexResponse indexResponse = new IndexResponse(shardId, "whatever", "whatever", 1, 1, 1, true);
secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, indexResponse);
} else {
secondResponse = new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, failure);
}
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{firstResponse, secondResponse}, 1);
listener.onResponse(bulkResponse);
when(client.bulk(captor.capture())).thenReturn(listener);
Action.Result result = executable.execute("_id", ctx, payload);
if (isPartialFailure) {
assertThat(result.status(), is(Status.PARTIAL_FAILURE));
} else {
assertThat(result.status(), is(Status.FAILURE));
}
}
}

View File

@ -15,6 +15,8 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.XPackSingleNodeTestCase;
import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.actions.hipchat.HipChatAction;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
@ -127,6 +129,8 @@ public class HipChatServiceTests extends XPackSingleNodeTestCase {
}
public void testWatchWithHipChatAction() throws Exception {
assertBusy(() -> assertThat(getInstanceFromNode(WatcherService.class).state(), is(WatcherState.STARTED)));
HipChatAccount.Profile profile = randomFrom(HipChatAccount.Profile.values());
HipChatMessage.Color color = randomFrom(HipChatMessage.Color.values());
String account;