Remove the index type from internal watcher indexes (#39761) (#39853)

This commit removes the "doc" type from watcher internal indexes.
The template still carries the "_doc" type since that is needed for
the internal representation.

This impacts the .watches, .triggered-watches, and .watch-history indexes.

External consumers do not need any changes since all external calls
go through the _watcher API, and should not interact with the the .index directly.

Relates #38637
This commit is contained in:
Jake Landis 2019-03-08 12:46:36 -06:00 committed by GitHub
parent 3c7fafd0cc
commit e0abc3ce96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 177 additions and 173 deletions

View File

@ -8,5 +8,4 @@ package org.elasticsearch.xpack.core.watcher.execution;
public final class TriggeredWatchStoreField {
public static final String INDEX_NAME = ".triggered_watches";
public static final String DOC_TYPE = "doc";
}

View File

@ -26,7 +26,6 @@ public class Watch implements ToXContentObject {
public static final String INCLUDE_STATUS_KEY = "include_status";
public static final String INDEX = ".watches";
public static final String DOC_TYPE = "doc";
private final String id;
private final Trigger trigger;

View File

@ -9,7 +9,7 @@
"index.priority": 900
},
"mappings": {
"doc": {
"_doc": {
"dynamic" : "strict",
"properties": {
"trigger_event": {

View File

@ -9,7 +9,7 @@
"index.format": 6
},
"mappings": {
"doc": {
"_doc": {
"_meta": {
"watcher-history-version": "${xpack.watcher.template.version}"
},

View File

@ -9,7 +9,7 @@
"index.priority": 800
},
"mappings": {
"doc": {
"_doc": {
"dynamic" : "strict",
"properties": {
"status": {

View File

@ -101,7 +101,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
if (isWatchDocument(shardId.getIndexName(), operation.type())) {
if (isWatchDocument(shardId.getIndexName())) {
ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
@ -150,7 +150,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName(), index.type())) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
}
@ -166,7 +166,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
*/
@Override
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
if (isWatchDocument(shardId.getIndexName(), delete.type())) {
if (isWatchDocument(shardId.getIndexName())) {
triggerService.remove(delete.id());
}
@ -177,11 +177,10 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
* Check if a supplied index and document matches the current configuration for watcher
*
* @param index The index to check for
* @param docType The document type
* @return true if this is a watch in the active watcher index, false otherwise
*/
private boolean isWatchDocument(String index, String docType) {
return configuration.isIndexAndActive(index) && docType.equals(Watch.DOC_TYPE);
private boolean isWatchDocument(String index) {
return configuration.isIndexAndActive(index);
}
/**

View File

@ -34,10 +34,18 @@ public final class ActionBuilders {
return EmailAction.builder(email);
}
/**
* Types are deprecated and should not be used. use {@link #indexAction(String)}
*/
@Deprecated
public static IndexAction.Builder indexAction(String index, String type) {
return IndexAction.builder(index, type);
}
public static IndexAction.Builder indexAction(String index) {
return IndexAction.builder(index);
}
public static JiraAction.Builder jiraAction(String account, MapBuilder<String, Object> fields) {
return jiraAction(account, fields.immutableMap());
}

View File

@ -29,7 +29,7 @@ public class IndexAction implements Action {
public static final String TYPE = "index";
@Nullable final String docType;
@Nullable @Deprecated final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final String executionTimeField;
@ -40,6 +40,15 @@ public class IndexAction implements Action {
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(IndexAction.class));
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in a watcher index action is deprecated.";
public IndexAction(@Nullable String index, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
/**
* Document types are deprecated, use constructor without docType
*/
@Deprecated
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
@ -188,10 +197,18 @@ public class IndexAction implements Action {
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
/**
* Document types are deprecated, use {@link #builder(java.lang.String)}
*/
@Deprecated
public static Builder builder(String index, String docType) {
return new Builder(index, docType);
}
public static Builder builder(String index) {
return new Builder(index);
}
public static class Result extends Action.Result {
private final XContentSource response;
@ -278,11 +295,20 @@ public class IndexAction implements Action {
ZoneId dynamicNameTimeZone;
RefreshPolicy refreshPolicy;
/**
* Document types are deprecated and should not be used. Use: {@link Builder#Builder(java.lang.String)}
*/
@Deprecated
private Builder(String index, String docType) {
this.index = index;
this.docType = docType;
}
private Builder(String index) {
this.index = index;
this.docType = null;
}
public Builder setDocId(String docId) {
this.docId = docId;
return this;

View File

@ -352,7 +352,7 @@ public class ExecutionService {
.field(WatchField.STATUS.getPreferredName(), watch.status(), params)
.endObject();
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, watch.id());
updateRequest.doc(source);
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
@ -501,7 +501,7 @@ public class ExecutionService {
*/
private GetResponse getWatch(String id) {
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true);
GetRequest getRequest = new GetRequest(Watch.INDEX, id).preference(Preference.LOCAL.type()).realtime(true);
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
client.get(getRequest, future);
return future.actionGet();

View File

@ -96,8 +96,7 @@ public class TriggeredWatchStore {
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches) throws IOException {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE,
triggeredWatch.id().value());
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME).id(triggeredWatch.id().value());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
@ -115,7 +114,7 @@ public class TriggeredWatchStore {
* @param wid The ID os the triggered watch id
*/
public void delete(Wid wid) {
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value());
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, wid.value());
bulkProcessor.add(request);
}

View File

@ -28,8 +28,6 @@ import static org.elasticsearch.xpack.core.watcher.support.Exceptions.ioExceptio
public class HistoryStore {
public static final String DOC_TYPE = "doc";
private static final Logger logger = LogManager.getLogger(HistoryStore.class);
private final BulkProcessor bulkProcessor;
@ -47,7 +45,7 @@ public class HistoryStore {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
IndexRequest request = new IndexRequest(index).id(watchRecord.id().value()).source(builder);
request.opType(IndexRequest.OpType.CREATE);
bulkProcessor.add(request);
} catch (IOException ioe) {
@ -64,7 +62,7 @@ public class HistoryStore {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
IndexRequest request = new IndexRequest(index).id(watchRecord.id().value()).source(builder);
bulkProcessor.add(request);
} catch (IOException ioe) {
final WatchRecord wr = watchRecord;

View File

@ -75,7 +75,7 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
RestStatus.CONFLICT, request.getWatchId()));
} else {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
@ -99,7 +99,7 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
return;
}
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.setIfSeqNo(getResponse.getSeqNo());
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());

View File

@ -61,7 +61,7 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWatchResponse> listener) {
try {
ZonedDateTime now = clock.instant().atZone(ZoneOffset.UTC);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getWatchId());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = activateWatchBuilder(request.isActivate(), now);
updateRequest.doc(builder);
@ -72,7 +72,7 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(updateResponse -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,

View File

@ -42,7 +42,7 @@ public class TransportDeleteWatchAction extends HandledTransportAction<DeleteWat
@Override
protected void doExecute(Task task, DeleteWatchRequest request, ActionListener<DeleteWatchResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, request.getId());
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, deleteRequest,
ActionListener.<DeleteResponse>wrap(deleteResponse -> {

View File

@ -81,7 +81,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
@Override
protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener) {
if (request.getId() != null) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,

View File

@ -52,7 +52,7 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
@Override
protected void doExecute(GetWatchRequest request, ActionListener<GetWatchResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,

View File

@ -94,7 +94,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
watch.toXContent(builder, DEFAULT_PARAMS);
if (isUpdate) {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getId());
if (request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
updateRequest.setIfSeqNo(request.getIfSeqNo());
updateRequest.setIfPrimaryTerm(request.getIfPrimaryTerm());
@ -112,7 +112,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
}, listener::onFailure),
client::update);
} else {
IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
IndexRequest indexRequest = new IndexRequest(Watch.INDEX).id(request.getId());
indexRequest.source(builder);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest,

View File

@ -34,7 +34,7 @@ public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCas
createIndex(watchResultsIndex);
stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName);
startWatcher();
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()

View File

@ -95,20 +95,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
listener.setConfiguration(new Configuration(Watch.INDEX, map));
}
//
// tests for document level operations
//
public void testPreIndexCheckType() throws Exception {
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(operation.type()).thenReturn(randomAlphaOfLength(10));
Engine.Index index = listener.preIndex(shardId, operation);
assertThat(index, is(operation));
verifyZeroInteractions(parser);
}
public void testPreIndexCheckIndex() throws Exception {
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(shardId.getIndexName()).thenReturn(randomAlphaOfLength(10));
Engine.Index index = listener.preIndex(shardId, operation);
@ -118,7 +105,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
public void testPreIndexCheckActive() throws Exception {
listener.setConfiguration(INACTIVE);
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
Engine.Index index = listener.preIndex(shardId, operation);
@ -127,7 +113,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
}
public void testPreIndex() throws Exception {
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(operation.id()).thenReturn(randomAlphaOfLength(10));
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
@ -161,7 +146,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
Watch watch = mockWatch(id, watchActive, isNewWatch);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
for (int idx = 0; idx < totalShardCount; idx++) {
@ -202,7 +186,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
}
public void testPreIndexCheckParsingException() throws Exception {
when(operation.type()).thenReturn(Watch.DOC_TYPE);
String id = randomAlphaOfLength(10);
when(operation.id()).thenReturn(id);
when(operation.source()).thenReturn(BytesArray.EMPTY);
@ -218,7 +201,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
public void testPostIndexRemoveTriggerOnException() throws Exception {
when(operation.id()).thenReturn("_id");
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
@ -227,7 +209,6 @@ public class WatcherIndexingListenerTests extends ESTestCase {
public void testPostIndexDontInvokeForOtherDocuments() throws Exception {
when(operation.id()).thenReturn("_id");
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(shardId.getIndexName()).thenReturn("anything");
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);
@ -250,18 +231,8 @@ public class WatcherIndexingListenerTests extends ESTestCase {
verifyZeroInteractions(triggerService);
}
public void testPreDeleteCheckType() throws Exception {
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(delete.type()).thenReturn(randomAlphaOfLength(10));
listener.preDelete(shardId, delete);
verifyZeroInteractions(triggerService);
}
public void testPreDelete() throws Exception {
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(delete.type()).thenReturn(Watch.DOC_TYPE);
when(delete.id()).thenReturn("_id");
listener.preDelete(shardId, delete);

View File

@ -68,7 +68,7 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(simpleInput())
.addAction("my-logging-action", indexAction("my_watcher_index", "action")))
.addAction("my-logging-action", indexAction("my_watcher_index")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));

View File

@ -301,7 +301,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
new IntervalSchedule.Interval(60, IntervalSchedule.Interval.Unit.MINUTES))))
.defaultThrottlePeriod(throttlePeriod)
.addAction("logging", loggingAction("test out"))
.addAction("failing_hook", indexAction("foo", "bar").setExecutionTimeField("@timestamp")))
.addAction("failing_hook", indexAction("foo").setExecutionTimeField("@timestamp")))
.get();
refresh(Watch.INDEX);

View File

@ -147,7 +147,7 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
String host = publishAddress.address().getHostString();
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder(host, publishAddress.getPort())
.path(new TextTemplate("/%3Clogstash-%7Bnow%2Fd%7D%3E/log/1"))
.path(new TextTemplate("/%3Clogstash-%7Bnow%2Fd%7D%3E/_doc/1"))
.body(new TextTemplate("{\"foo\":\"bar\"}"))
.putHeader("Content-Type", new TextTemplate("application/json"))
.method(HttpMethod.PUT);
@ -162,7 +162,7 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
watcherClient().prepareExecuteWatch("_id").get();
GetResponse response = client().prepareGet("<logstash-{now/d}>", "log", "1").get();
GetResponse response = client().prepareGet().setIndex("<logstash-{now/d}>").setId("1").get();
assertExists(response);
}
}

View File

@ -80,6 +80,7 @@ import java.util.List;
import java.util.Map;
import static java.util.Collections.singleton;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ -214,7 +215,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
when(searchResponse1.getSuccessfulShards()).thenReturn(1);
when(searchResponse1.getTotalShards()).thenReturn(1);
BytesArray source = new BytesArray("{}");
SearchHit hit = new SearchHit(0, "first_foo", new Text(TriggeredWatchStoreField.DOC_TYPE), null);
SearchHit hit = new SearchHit(0, "first_foo", new Text(SINGLE_MAPPING_NAME), null);
hit.version(1L);
hit.shard(new SearchShardTarget("_node_id", new ShardId(index, 0), null, OriginalIndices.NONE));
hit.sourceRef(source);
@ -228,7 +229,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
}).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
// First return a scroll response with a single hit and then with no hits
hit = new SearchHit(0, "second_foo", new Text(TriggeredWatchStoreField.DOC_TYPE), null);
hit = new SearchHit(0, "second_foo", new Text(SINGLE_MAPPING_NAME), null);
hit.version(1L);
hit.shard(new SearchShardTarget("_node_id", new ShardId(index, 0), null, OriginalIndices.NONE));
hit.sourceRef(source);

View File

@ -91,7 +91,7 @@ public class HistoryStoreTests extends ESTestCase {
ActionListener<BulkResponse> listener = (ActionListener<BulkResponse>) invocation.getArguments()[2];
IndexRequest indexRequest = (IndexRequest) request.requests().get(0);
if (indexRequest.id().equals(wid.value()) && indexRequest.type().equals(HistoryStore.DOC_TYPE) &&
if (indexRequest.id().equals(wid.value()) &&
indexRequest.opType() == OpType.CREATE && indexRequest.index().equals(index)) {
listener.onResponse(new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse(1, OpType.CREATE, indexResponse) }, 1));
} else {

View File

@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -158,8 +159,8 @@ public class HistoryTemplateHttpMappingsTests extends AbstractWatcherIntegration
Iterator<ImmutableOpenMap<String, MappingMetaData>> iterator = mappingsResponse.getMappings().valuesIt();
while (iterator.hasNext()) {
ImmutableOpenMap<String, MappingMetaData> mapping = iterator.next();
assertThat(mapping.containsKey("doc"), is(true));
Map<String, Object> docMapping = mapping.get("doc").getSourceAsMap();
assertThat(mapping.containsKey(SINGLE_MAPPING_NAME), is(true));
Map<String, Object> docMapping = mapping.get(SINGLE_MAPPING_NAME).getSourceAsMap();
if (abortAtInput) {
Boolean enabled = ObjectPath.eval("properties.result.properties.input.properties.error.enabled", docMapping);
indexed.add(enabled);

View File

@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
@ -30,11 +31,10 @@ public class HistoryTemplateIndexActionMappingsTests extends AbstractWatcherInte
public void testIndexActionFields() throws Exception {
String index = "the-index";
String type = "the-type";
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("5m")))
.addAction("index", indexAction(index, type)))
.addAction("index", indexAction(index)))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
@ -66,7 +66,7 @@ public class HistoryTemplateIndexActionMappingsTests extends AbstractWatcherInte
terms = aggs.get("index_action_types");
assertThat(terms, notNullValue());
assertThat(terms.getBuckets().size(), is(1));
assertThat(terms.getBucketByKey(type), notNullValue());
assertThat(terms.getBucketByKey(type).getDocCount(), is(1L));
assertThat(terms.getBucketByKey(SINGLE_MAPPING_NAME), notNullValue());
assertThat(terms.getBucketByKey(SINGLE_MAPPING_NAME).getDocCount(), is(1L));
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import java.util.Map;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
@ -52,7 +53,7 @@ public class HistoryTemplateTimeMappingsTests extends AbstractWatcherIntegration
if (!metadatas.key.startsWith(HistoryStoreField.INDEX_PREFIX)) {
continue;
}
MappingMetaData metadata = metadatas.value.get("doc");
MappingMetaData metadata = metadatas.value.get(SINGLE_MAPPING_NAME);
assertThat(metadata, notNullValue());
try {
Map<String, Object> source = metadata.getSourceAsMap();

View File

@ -15,6 +15,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
@ -40,9 +41,9 @@ public class HistoryTemplateTransformMappingsTests extends AbstractWatcherIntegr
.endObject()));
client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(client().prepareIndex("idx", "doc", "1")
.add(client().prepareIndex().setIndex("idx").setId("1")
.setSource(jsonBuilder().startObject().field("name", "first").field("foo", "bar").endObject()))
.add(client().prepareIndex("idx", "doc", "2")
.add(client().prepareIndex().setIndex("idx").setId("2")
.setSource(jsonBuilder().startObject().field("name", "second")
.startObject("foo").field("what", "ever").endObject().endObject()))
.get();
@ -74,13 +75,13 @@ public class HistoryTemplateTransformMappingsTests extends AbstractWatcherIntegr
GetFieldMappingsResponse response = client().admin().indices()
.prepareGetFieldMappings(".watcher-history*")
.setFields("result.actions.transform.payload")
.setTypes("doc")
.setTypes(SINGLE_MAPPING_NAME)
.includeDefaults(true)
.get();
// time might have rolled over to a new day, thus we need to check that this field exists only in one of the history indices
List<Boolean> payloadNulls = response.mappings().values().stream()
.map(map -> map.get("doc"))
.map(map -> map.get(SINGLE_MAPPING_NAME))
.map(map -> map.get("result.actions.transform.payload"))
.filter(Objects::nonNull)
.map(GetFieldMappingsResponse.FieldMappingMetaData::isNull)

View File

@ -57,7 +57,7 @@ public class ChainIntegrationTests extends AbstractWatcherIntegrationTestCase {
public void testChainedInputsAreWorking() throws Exception {
String index = "the-most-awesome-index-ever";
createIndex(index);
client().prepareIndex(index, "type", "id").setSource("{}", XContentType.JSON).setRefreshPolicy(IMMEDIATE).get();
client().prepareIndex().setIndex(index).setId("id").setSource("{}", XContentType.JSON).setRefreshPolicy(IMMEDIATE).get();
InetSocketAddress address = internalCluster().httpAddresses()[0];
HttpInput.Builder httpInputBuilder = httpInput(HttpRequestTemplate.builder(address.getHostString(), address.getPort())
@ -72,7 +72,7 @@ public class ChainIntegrationTests extends AbstractWatcherIntegrationTestCase {
.setSource(watchBuilder()
.trigger(schedule(interval(5, SECONDS)))
.input(chainedInputBuilder)
.addAction("indexAction", indexAction("my-index", "my-type")))
.addAction("indexAction", indexAction("my-index")))
.get();
timeWarp().trigger("_name");
@ -84,7 +84,7 @@ public class ChainIntegrationTests extends AbstractWatcherIntegrationTestCase {
public void assertWatchExecuted() {
try {
refresh();
SearchResponse searchResponse = client().prepareSearch("my-index").setTypes("my-type").get();
SearchResponse searchResponse = client().prepareSearch("my-index").get();
assertHitCount(searchResponse, 1);
assertThat(searchResponse.getHits().getAt(0).getSourceAsString(), containsString("the-most-awesome-index-ever"));
} catch (IndexNotFoundException e) {

View File

@ -95,7 +95,7 @@ public class EmailSecretsIntegrationTests extends AbstractWatcherIntegrationTest
.get();
// verifying the email password is stored encrypted in the index
GetResponse response = client().prepareGet(Watch.INDEX, Watch.DOC_TYPE, "_id").get();
GetResponse response = client().prepareGet().setIndex(Watch.INDEX).setId("_id").get();
assertThat(response, notNullValue());
assertThat(response.getId(), is("_id"));
Map<String, Object> source = response.getSource();

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
@ -53,7 +54,6 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStats
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
import org.elasticsearch.xpack.watcher.notification.email.Email;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
@ -235,7 +235,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
// Now replace it with a randomly named index
watchIndexName = randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT);
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName, Watch.DOC_TYPE);
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName);
logger.info("set alias for .watches index to [{}]", watchIndexName);
} else {
@ -259,8 +259,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
// Now replace it with a randomly-named index
triggeredWatchIndexName = randomValueOtherThan(watchIndexName,
() -> randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT));
replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName,
TriggeredWatchStoreField.DOC_TYPE);
replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName);
logger.info("set alias for .triggered-watches index to [{}]", triggeredWatchIndexName);
} else {
triggeredWatchIndexName = TriggeredWatchStoreField.INDEX_NAME;
@ -274,9 +273,9 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
}
public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to, String docType) {
public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to) {
GetIndexResponse index = client().admin().indices().prepareGetIndex().setIndices(originalIndexOrAlias).get();
MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(docType);
MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(MapperService.SINGLE_MAPPING_NAME);
Settings settings = index.getSettings().get(index.getIndices()[0]);
Settings.Builder newSettings = Settings.builder().put(settings);
@ -286,7 +285,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
newSettings.remove("index.version.created");
CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate(to)
.addMapping(docType, mapping.sourceAsMap())
.addMapping(MapperService.SINGLE_MAPPING_NAME, mapping.sourceAsMap())
.setSettings(newSettings)
.get();
assertTrue(createIndexResponse.isAcknowledged());
@ -315,22 +314,18 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
return false;
}
protected long docCount(String index, String type, QueryBuilder query) {
protected long docCount(String index, QueryBuilder query) {
refresh();
return docCount(index, type, SearchSourceBuilder.searchSource().query(query));
return docCount(index, SearchSourceBuilder.searchSource().query(query));
}
protected long watchRecordCount(QueryBuilder query) {
refresh();
return docCount(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*",
HistoryStore.DOC_TYPE, SearchSourceBuilder.searchSource().query(query));
return docCount(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*", SearchSourceBuilder.searchSource().query(query));
}
protected long docCount(String index, String type, SearchSourceBuilder source) {
protected long docCount(String index, SearchSourceBuilder source) {
SearchRequestBuilder builder = client().prepareSearch(index).setSource(source).setSize(0);
if (type != null) {
builder.setTypes(type);
}
return builder.get().getHits().getTotalHits().value;
}
@ -407,7 +402,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
protected SearchResponse searchWatchRecords(Consumer<SearchRequestBuilder> requestBuilderCallback) {
SearchRequestBuilder builder =
client().prepareSearch(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*").setTypes(HistoryStore.DOC_TYPE);
client().prepareSearch(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*");
requestBuilderCallback.accept(builder);
return builder.get();
}

View File

@ -74,7 +74,7 @@ public class WatcherExecutorServiceBenchmark {
public static void main(String[] args) throws Exception {
start();
client.admin().indices().prepareCreate("test").get();
client.prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
client.prepareIndex().setIndex("test").setId("1").setSource("{}", XContentType.JSON).get();
int numAlerts = 1000;
for (int i = 0; i < numAlerts; i++) {
@ -129,7 +129,7 @@ public class WatcherExecutorServiceBenchmark {
.input(searchInput(templateRequest(new SearchSourceBuilder(), "test"))
.extractKeys("hits.total.value"))
.condition(new ScriptCondition(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "1 == 1", emptyMap())))
.addAction("_id", indexAction("index", "type")).buildAsBytes(XContentType.JSON), XContentType.JSON);
.addAction("_id", indexAction("index")).buildAsBytes(XContentType.JSON), XContentType.JSON);
putAlertRequest.setId(name);
watcherClient.putWatch(putAlertRequest).actionGet();
}

View File

@ -110,12 +110,12 @@ public class WatcherScheduleEngineBenchmark {
client.admin().indices().prepareDelete("_all").get();
client.admin().indices().prepareCreate("test").get();
client.prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
client.prepareIndex().setIndex("test").setId("1").setSource("{}", XContentType.JSON).get();
System.out.println("===============> indexing [" + numWatches + "] watches");
for (int i = 0; i < numWatches; i++) {
final String id = "_id_" + i;
client.prepareIndex(Watch.INDEX, Watch.DOC_TYPE, id)
client.prepareIndex().setIndex(Watch.INDEX).setId(id)
.setSource(new WatchSourceBuilder()
.trigger(schedule(interval(interval + "s")))
.input(searchInput(templateRequest(new SearchSourceBuilder(), "test")))

View File

@ -160,7 +160,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
// In watch store we fail parsing if an watch contains undefined fields.
}
try {
client().prepareIndex(Watch.INDEX, Watch.DOC_TYPE, "_name")
client().prepareIndex().setIndex(Watch.INDEX).setId("_name")
.setSource(watchSource)
.get();
fail();
@ -177,7 +177,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
WatchSourceBuilder source = watchBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(searchRequest))
.addAction("_id", indexAction("idx", "action"));
.addAction("_id", indexAction("idx"));
watcherClient().preparePutWatch("_name")
.setSource(source.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L)))

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.watcher.watch.WatchField;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatch;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -62,7 +61,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
}
public void testLoadMalformedWatchRecord() throws Exception {
client().prepareIndex(Watch.INDEX, Watch.DOC_TYPE, "_id")
client().prepareIndex().setIndex(Watch.INDEX).setId("_id")
.setSource(jsonBuilder().startObject()
.startObject(WatchField.TRIGGER.getPreferredName())
.startObject("schedule")
@ -80,7 +79,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
ExecutableCondition condition = InternalAlwaysCondition.INSTANCE;
String index = HistoryStoreField.getHistoryIndexNameForTime(now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
client().prepareIndex().setIndex(index).setId(wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event)
@ -98,7 +97,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
// unknown condition:
wid = new Wid("_id", now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
client().prepareIndex().setIndex(index).setId(wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event)
@ -116,7 +115,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
// unknown trigger:
wid = new Wid("_id", now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
client().prepareIndex().setIndex(index).setId(wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
.startObject("unknown").endObject()
@ -151,7 +150,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numWatches; i++) {
bulkRequestBuilder.add(
client().prepareIndex(Watch.INDEX, Watch.DOC_TYPE, "_id" + i)
client().prepareIndex().setIndex(Watch.INDEX).setId("_id" + i)
.setSource(watchBuilder()
.trigger(schedule(cron("0 0/5 * * * ? 2050")))
.input(searchInput(request))
@ -173,7 +172,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
public void testMixedTriggeredWatchLoading() throws Exception {
createIndex("output");
client().prepareIndex("my-index", "foo", "bar")
client().prepareIndex().setIndex("my-index").setId("bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource("field", "value").get();
@ -190,7 +189,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(searchInput(request))
.condition(InternalAlwaysCondition.INSTANCE)
.addAction("_id", indexAction("output", "test"))
.addAction("_id", indexAction("output"))
.defaultThrottlePeriod(TimeValue.timeValueMillis(0))
).get();
}
@ -207,10 +206,9 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
client().prepareIndex(
TriggeredWatchStoreField.INDEX_NAME,
TriggeredWatchStoreField.DOC_TYPE,
triggeredWatch.id().value())
client().prepareIndex()
.setIndex(TriggeredWatchStoreField.INDEX_NAME)
.setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.request());
}
@ -224,7 +222,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29846")
public void testTriggeredWatchLoading() throws Exception {
createIndex("output");
client().prepareIndex("my-index", "foo", "bar")
client().prepareIndex().setIndex("my-index").setId("bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource("field", "value").get();
@ -237,7 +235,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
.trigger(schedule(cron("0/5 * * * * ? 2050")))
.input(searchInput(request))
.condition(InternalAlwaysCondition.INSTANCE)
.addAction("_id", indexAction("output", "test"))
.addAction("_id", indexAction("output"))
.defaultThrottlePeriod(TimeValue.timeValueMillis(0))
).get();
@ -252,7 +250,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(client()
.prepareIndex(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, triggeredWatch.id().value())
.prepareIndex().setIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.setWaitForActiveShards(ActiveShardCount.ALL)
);
@ -324,13 +322,13 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
Wid wid = new Wid(watchId, triggeredTime);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
client().prepareIndex(TriggeredWatchStoreField.INDEX_NAME,
TriggeredWatchStoreField.DOC_TYPE, triggeredWatch.id().value()).setSource(jsonBuilder().value(triggeredWatch))
client().prepareIndex().setIndex(TriggeredWatchStoreField.INDEX_NAME)
.setId(triggeredWatch.id().value()).setSource(jsonBuilder().value(triggeredWatch))
);
String id = internalCluster().getInstance(ClusterService.class).localNode().getId();
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, "executed", id);
bulkRequestBuilder.add(client().prepareIndex(watchRecordIndex, HistoryStore.DOC_TYPE, watchRecord.id().value())
bulkRequestBuilder.add(client().prepareIndex().setIndex(watchRecordIndex).setId(watchRecord.id().value())
.setSource(jsonBuilder().value(watchRecord))
);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import java.util.Locale;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -98,16 +99,19 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
assertHitCount(searchResponse, 1);
// as fields with dots are allowed in 5.0 again, the mapping must be checked in addition
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*").addTypes("doc").get();
byte[] bytes = response.getMappings().values().iterator().next().value.get("doc").source().uncompressed();
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*")
.addTypes(SINGLE_MAPPING_NAME).get();
byte[] bytes = response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME).source().uncompressed();
XContentSource source = new XContentSource(new BytesArray(bytes), XContentType.JSON);
// lets make sure the body fields are disabled
if (useChained) {
String chainedPath = "doc.properties.result.properties.input.properties.chain.properties.chained.properties.search" +
String chainedPath = SINGLE_MAPPING_NAME +
".properties.result.properties.input.properties.chain.properties.chained.properties.search" +
".properties.request.properties.body.enabled";
assertThat(source.getValue(chainedPath), is(false));
} else {
String path = "doc.properties.result.properties.input.properties.search.properties.request.properties.body.enabled";
String path = SINGLE_MAPPING_NAME +
".properties.result.properties.input.properties.search.properties.request.properties.body.enabled";
assertThat(source.getValue(path), is(false));
}
}
@ -136,16 +140,18 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
assertHitCount(searchResponse, 1);
// as fields with dots are allowed in 5.0 again, the mapping must be checked in addition
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*").addTypes("doc").get();
byte[] bytes = response.getMappings().values().iterator().next().value.get("doc").source().uncompressed();
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*")
.addTypes(SINGLE_MAPPING_NAME).get();
byte[] bytes = response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME).source().uncompressed();
XContentSource source = new XContentSource(new BytesArray(bytes), XContentType.JSON);
// lets make sure the body fields are disabled
if (useChained) {
String path = "doc.properties.result.properties.input.properties.chain.properties.chained.properties.payload.enabled";
String path = SINGLE_MAPPING_NAME +
".properties.result.properties.input.properties.chain.properties.chained.properties.payload.enabled";
assertThat(source.getValue(path), is(false));
} else {
String path = "doc.properties.result.properties.input.properties.payload.enabled";
String path = SINGLE_MAPPING_NAME + ".properties.result.properties.input.properties.payload.enabled";
assertThat(source.getValue(path), is(false));
}
}
@ -192,12 +198,13 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
assertThat(lastExecutionSuccesful, is(actionStatus.lastExecution().successful()));
// also ensure that the status field is disabled in the watch history
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*").addTypes("doc").get();
byte[] bytes = response.getMappings().values().iterator().next().value.get("doc").source().uncompressed();
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*")
.addTypes(SINGLE_MAPPING_NAME).get();
byte[] bytes = response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME).source().uncompressed();
XContentSource mappingSource = new XContentSource(new BytesArray(bytes), XContentType.JSON);
assertThat(mappingSource.getValue("doc.properties.status.enabled"), is(false));
assertThat(mappingSource.getValue("doc.properties.status.properties.status"), is(nullValue()));
assertThat(mappingSource.getValue("doc.properties.status.properties.status.properties.active"), is(nullValue()));
assertThat(mappingSource.getValue(SINGLE_MAPPING_NAME + ".properties.status.enabled"), is(false));
assertThat(mappingSource.getValue(SINGLE_MAPPING_NAME + ".properties.status.properties.status"), is(nullValue()));
assertThat(mappingSource.getValue(SINGLE_MAPPING_NAME + ".properties.status.properties.status.properties.active"), is(nullValue()));
}

View File

@ -101,7 +101,7 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
// verifying the basic auth password is stored encrypted in the index when security
// is enabled, and when it's not enabled, it's stored in plain text
GetResponse response = client().prepareGet(Watch.INDEX, Watch.DOC_TYPE, "_id").get();
GetResponse response = client().prepareGet().setIndex(Watch.INDEX).setId("_id").get();
assertThat(response, notNullValue());
assertThat(response.getId(), is("_id"));
Map<String, Object> source = response.getSource();
@ -180,7 +180,7 @@ public class HttpSecretsIntegrationTests extends AbstractWatcherIntegrationTestC
// verifying the basic auth password is stored encrypted in the index when security
// is enabled, when it's not enabled, the the passowrd should be stored in plain text
GetResponse response = client().prepareGet(Watch.INDEX, Watch.DOC_TYPE, "_id").get();
GetResponse response = client().prepareGet().setIndex(Watch.INDEX).setId("_id").get();
assertThat(response, notNullValue());
assertThat(response.getId(), is("_id"));
Map<String, Object> source = response.getSource();

View File

@ -54,7 +54,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
@Before
public void indexTestDocument() {
IndexResponse eventIndexResponse = client().prepareIndex("events", "event", id)
IndexResponse eventIndexResponse = client().prepareIndex().setIndex("events").setId(id)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource("level", "error")
.get();
@ -69,8 +69,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
.input(searchInput(templateRequest(searchSource(), "events")))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(templateRequest(searchSource(), "events")))
.addAction("_a1", indexAction("actions1", "doc"))
.addAction("_a2", indexAction("actions2", "doc"))
.addAction("_a1", indexAction("actions1"))
.addAction("_a2", indexAction("actions2"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
@ -83,8 +83,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(ackResponse.getStatus().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKABLE));
refresh();
long a1CountAfterAck = docCount("actions1", "doc", matchAllQuery());
long a2CountAfterAck = docCount("actions2", "doc", matchAllQuery());
long a1CountAfterAck = docCount("actions1", matchAllQuery());
long a2CountAfterAck = docCount("actions2", matchAllQuery());
assertThat(a1CountAfterAck, greaterThan(0L));
assertThat(a2CountAfterAck, greaterThan(0L));
@ -93,15 +93,15 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
refresh();
// There shouldn't be more a1 actions in the index after we ack the watch, even though the watch was triggered
long a1CountAfterPostAckFires = docCount("actions1", "doc", matchAllQuery());
long a1CountAfterPostAckFires = docCount("actions1", matchAllQuery());
assertThat(a1CountAfterPostAckFires, equalTo(a1CountAfterAck));
// There should be more a2 actions in the index after we ack the watch
long a2CountAfterPostAckFires = docCount("actions2", "doc", matchAllQuery());
long a2CountAfterPostAckFires = docCount("actions2", matchAllQuery());
assertThat(a2CountAfterPostAckFires, greaterThan(a2CountAfterAck));
// Now delete the event and the ack states should change to AWAITS_EXECUTION
DeleteResponse response = client().prepareDelete("events", "event", id).get();
DeleteResponse response = client().prepareDelete().setIndex("events").setId(id).get();
assertEquals(DocWriteResponse.Result.DELETED, response.getResult());
refresh();
@ -117,7 +117,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(),
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
long throttledCount = docCount(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*", null,
long throttledCount = docCount(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*",
matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
assertThat(throttledCount, greaterThan(0L));
}
@ -130,8 +130,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
.input(searchInput(templateRequest(searchSource(), "events")))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(templateRequest(searchSource(), "events")))
.addAction("_a1", indexAction("actions1", "doc"))
.addAction("_a2", indexAction("actions2", "doc"))
.addAction("_a1", indexAction("actions1"))
.addAction("_a2", indexAction("actions2"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
@ -152,8 +152,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(ackResponse.getStatus().actionStatus("_a2").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED));
refresh();
long a1CountAfterAck = docCount("actions1", "doc", matchAllQuery());
long a2CountAfterAck = docCount("actions2", "doc", matchAllQuery());
long a1CountAfterAck = docCount("actions1", matchAllQuery());
long a2CountAfterAck = docCount("actions2", matchAllQuery());
assertThat(a1CountAfterAck, greaterThanOrEqualTo((long) 1));
assertThat(a2CountAfterAck, greaterThanOrEqualTo((long) 1));
@ -162,15 +162,15 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
refresh();
// There shouldn't be more a1 actions in the index after we ack the watch, even though the watch was triggered
long a1CountAfterPostAckFires = docCount("actions1", "doc", matchAllQuery());
long a1CountAfterPostAckFires = docCount("actions1", matchAllQuery());
assertThat(a1CountAfterPostAckFires, equalTo(a1CountAfterAck));
// There shouldn't be more a2 actions in the index after we ack the watch, even though the watch was triggered
long a2CountAfterPostAckFires = docCount("actions2", "doc", matchAllQuery());
long a2CountAfterPostAckFires = docCount("actions2", matchAllQuery());
assertThat(a2CountAfterPostAckFires, equalTo(a2CountAfterAck));
// Now delete the event and the ack states should change to AWAITS_EXECUTION
DeleteResponse response = client().prepareDelete("events", "event", id).get();
DeleteResponse response = client().prepareDelete().setIndex("events").setId(id).get();
assertEquals(DocWriteResponse.Result.DELETED, response.getResult());
refresh();
@ -186,7 +186,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(),
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
long throttledCount = docCount(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*", null,
long throttledCount = docCount(HistoryStoreField.INDEX_PREFIX_WITH_TEMPLATE + "*",
matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
assertThat(throttledCount, greaterThan(0L));
}
@ -199,7 +199,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
.input(searchInput(templateRequest(searchSource(), "events")))
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GT, 0L))
.transform(searchTransform(templateRequest(searchSource(), "events")))
.addAction("_id", indexAction("actions", "action")))
.addAction("_id", indexAction("actions")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
assertThat(watcherClient().prepareWatcherStats().get().getWatchesCount(), is(1L));
@ -211,7 +211,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(ackResponse.getStatus().actionStatus("_id").ackStatus().state(), is(ActionStatus.AckStatus.State.ACKED));
refresh("actions");
long countAfterAck = client().prepareSearch("actions").setTypes("action").setQuery(matchAllQuery()).get()
long countAfterAck = client().prepareSearch("actions").setQuery(matchAllQuery()).get()
.getHits().getTotalHits().value;
assertThat(countAfterAck, greaterThanOrEqualTo(1L));
@ -221,7 +221,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(), Matchers.equalTo(ActionStatus.AckStatus.State.ACKED));
refresh();
GetResponse getResponse = client().get(new GetRequest(Watch.INDEX, Watch.DOC_TYPE, "_name")).actionGet();
GetResponse getResponse = client().get(new GetRequest(Watch.INDEX, "_name")).actionGet();
Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON,
getResponse.getSeqNo(), getResponse.getPrimaryTerm());
assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(),
@ -231,7 +231,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
refresh("actions");
// There shouldn't be more actions in the index after we ack the watch, even though the watch was triggered
long countAfterPostAckFires = docCount("actions", "action", matchAllQuery());
long countAfterPostAckFires = docCount("actions", matchAllQuery());
assertThat(countAfterPostAckFires, equalTo(countAfterAck));
}

View File

@ -121,7 +121,7 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
.trigger(schedule(interval("5s")))
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
.transform(scriptTransform(script))
.addAction("_id", indexAction("output1", "type")))
.addAction("_id", indexAction("output1")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
// put a watch that has a action level transform:
@ -129,7 +129,7 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
.addAction("_id", scriptTransform(script), indexAction("output2", "type")))
.addAction("_id", scriptTransform(script), indexAction("output2")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
@ -177,7 +177,7 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(searchInput(inputRequest))
.addAction("_id", searchTransform(transformRequest), indexAction("output2", "result"))
.addAction("_id", searchTransform(transformRequest), indexAction("output2"))
).get();
assertThat(putWatchResponse.isCreated(), is(true));
@ -210,7 +210,7 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
.trigger(schedule(interval("5s")))
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
.transform(chainTransform(scriptTransform(script1), scriptTransform(script2)))
.addAction("_id", indexAction("output1", "type")))
.addAction("_id", indexAction("output1")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
// put a watch that has a action level transform:
@ -219,7 +219,7 @@ public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCas
.trigger(schedule(interval("5s")))
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
.addAction("_id", chainTransform(scriptTransform(script1), scriptTransform(script2)),
indexAction("output2", "type")))
indexAction("output2")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));

View File

@ -55,7 +55,7 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(simpleInput("foo", "bar"))
.addAction("_a1", indexAction("actions", "action1"))
.addAction("_a1", indexAction("actions"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
@ -86,13 +86,13 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
logger.info("Ensured no more watches are being executed");
refresh();
long count1 = docCount(".watcher-history*", "doc", matchAllQuery());
long count1 = docCount(".watcher-history*", matchAllQuery());
logger.info("Sleeping for 5 seconds, watch history count [{}]", count1);
Thread.sleep(5000);
refresh();
long count2 = docCount(".watcher-history*", "doc", matchAllQuery());
long count2 = docCount(".watcher-history*", matchAllQuery());
assertThat(count2, is(count1));
@ -110,7 +110,7 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
logger.info("Sleeping for another five seconds, ensuring that watch is executed");
Thread.sleep(5000);
refresh();
long count3 = docCount(".watcher-history*", "doc", matchAllQuery());
long count3 = docCount(".watcher-history*", matchAllQuery());
assertThat(count3, greaterThan(count1));
}
@ -122,7 +122,7 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
.setSource(watchBuilder()
.trigger(schedule(cron("0 0 0 1 1 ? 2050"))) // some time in 2050
.input(simpleInput("foo", "bar"))
.addAction("_a1", indexAction("actions", "action1"))
.addAction("_a1", indexAction("actions"))
.defaultThrottlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
@ -132,7 +132,7 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
assertThat(getWatchResponse, notNullValue());
assertThat(getWatchResponse.getStatus().state().isActive(), is(true));
GetResponse getResponse = client().prepareGet(".watches", "doc", "_id").get();
GetResponse getResponse = client().prepareGet().setIndex(".watches").setId("_id").get();
XContentSource source = new XContentSource(getResponse.getSourceAsBytesRef(), XContentType.JSON);
Set<String> filters = Sets.newHashSet(
@ -152,7 +152,7 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
source.toXContent(builder, ToXContent.EMPTY_PARAMS);
// now that we filtered out the watch status state, lets put it back in
IndexResponse indexResponse = client().prepareIndex(".watches", "doc", "_id")
IndexResponse indexResponse = client().prepareIndex().setIndex(".watches").setId("_id")
.setSource(BytesReference.bytes(builder), XContentType.JSON)
.get();
assertThat(indexResponse.getId(), is("_id"));

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
@ -69,8 +70,8 @@ public class TransportAckWatchActionTests extends ESTestCase {
String watchId = "my_watch_id";
doAnswer(invocation -> {
ActionListener<GetResponse> listener = (ActionListener<GetResponse>) invocation.getArguments()[1];
listener.onResponse(new GetResponse(new GetResult(Watch.INDEX, Watch.DOC_TYPE, watchId, UNASSIGNED_SEQ_NO, 0, -1, false,
BytesArray.EMPTY, Collections.emptyMap())));
listener.onResponse(new GetResponse(new GetResult(Watch.INDEX, MapperService.SINGLE_MAPPING_NAME, watchId, UNASSIGNED_SEQ_NO,
0, -1, false, BytesArray.EMPTY, Collections.emptyMap())));
return null;
}).when(client).get(anyObject(), anyObject());

View File

@ -48,7 +48,7 @@ public class WatchStatusIntegrationTests extends AbstractWatcherIntegrationTestC
assertThat(getWatchResponse.getSource(), notNullValue());
assertThat(getWatchResponse.getStatus().lastChecked(), is(notNullValue()));
GetResponse getResponse = client().prepareGet(".watches", "doc", "_name").get();
GetResponse getResponse = client().prepareGet().setIndex(".watches").setId("_name").get();
getResponse.getSource();
XContentSource source = new XContentSource(getResponse.getSourceAsBytesRef(), XContentType.JSON);