From b7f30fc929e469bd247b51a89dca4ae5a3852f07 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 17 Jul 2020 10:24:58 -0400 Subject: [PATCH] [7.x] Adding new `require_alias` option to indexing requests (#58917) (#59769) * Adding new `require_alias` option to indexing requests (#58917) This commit adds the `require_alias` flag to requests that create new documents. This flag, when `true` prevents the request from automatically creating an index. Instead, the destination of the request MUST be an alias. When the flag is not set, or `false`, the behavior defaults to the `action.auto_create_index` settings. This is useful when an alias is required instead of a concrete index. closes https://github.com/elastic/elasticsearch/issues/55267 --- .../noop/action/bulk/RestNoopBulkAction.java | 3 +- docs/reference/docs/index_.asciidoc | 8 +- docs/reference/docs/update.asciidoc | 2 + docs/reference/rest-api/common-parms.asciidoc | 6 ++ .../test/ingest/270_set_processor.yml | 48 +++++++++++ .../index/reindex/ReindexValidator.java | 10 +++ .../index/reindex/RestReindexAction.java | 5 ++ .../resources/rest-api-spec/api/bulk.json | 4 + .../resources/rest-api-spec/api/index.json | 4 + .../resources/rest-api-spec/api/update.json | 4 + .../rest-api-spec/test/bulk/10_basic.yml | 83 +++++++++++++++++++ .../test/index/70_require_alias.yml | 29 +++++++ .../test/update/95_require_alias.yml | 35 ++++++++ .../elasticsearch/action/DocWriteRequest.java | 8 ++ .../action/bulk/BulkProcessor.java | 2 +- .../action/bulk/BulkRequest.java | 32 +++++-- .../action/bulk/BulkRequestParser.java | 27 ++++-- .../action/bulk/TransportBulkAction.java | 36 ++++++-- .../action/delete/DeleteRequest.java | 5 ++ .../action/index/IndexRequest.java | 20 +++++ .../action/index/IndexRequestBuilder.java | 8 ++ .../action/update/TransportUpdateAction.java | 11 +++ .../action/update/UpdateRequest.java | 19 +++++ .../index/reindex/ReindexRequest.java | 8 ++ .../rest/action/document/RestBulkAction.java | 4 +- .../rest/action/document/RestIndexAction.java | 2 + .../action/document/RestUpdateAction.java | 2 + .../action/bulk/BulkRequestParserTests.java | 53 +++++++++++- .../action/index/IndexRequestTests.java | 3 + .../action/MonitoringBulkRequest.java | 2 +- 30 files changed, 447 insertions(+), 36 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/index/70_require_alias.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/update/95_require_alias.yml diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java index 9eb4af6600a..3ed34616e23 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java @@ -70,6 +70,7 @@ public class RestNoopBulkAction extends BaseRestHandler { String defaultType = request.param("type"); String defaultRouting = request.param("routing"); String defaultPipeline = request.param("pipeline"); + Boolean defaultRequireAlias = request.paramAsBoolean("require_alias", null); String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) { @@ -78,7 +79,7 @@ public class RestNoopBulkAction extends BaseRestHandler { bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting, - null, defaultPipeline, true, request.getXContentType()); + null, defaultPipeline, defaultRequireAlias, true, request.getXContentType()); // short circuit the call to the transport layer return channel -> { diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index d9f34638435..f93d463f1e3 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -25,9 +25,9 @@ and <>. `POST //_create/<_id>` -IMPORTANT: You cannot add new documents to a data stream using the -`PUT //_doc/<_id>` request format. To specify a document ID, use the -`PUT //_create/<_id>` format instead. See +IMPORTANT: You cannot add new documents to a data stream using the +`PUT //_doc/<_id>` request format. To specify a document ID, use the +`PUT //_create/<_id>` format instead. See <>. [[docs-index-api-path-params]] @@ -94,6 +94,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=version_type] include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=wait_for_active_shards] +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=require-alias] + [[docs-index-api-request-body]] ==== {api-request-body-title} diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 7e7526983d3..899bb79ff7f 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -53,6 +53,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=if_primary_term] `lang`:: (Optional, string) The script language. Default: `painless`. +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=require-alias] + include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh] `retry_on_conflict`:: diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index eb23601d7ad..d3c62456b15 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -592,6 +592,12 @@ such as `1264`. A value of `-1` indicates {es} was unable to compute this number. end::memory[] +tag::require-alias[] +`require_alias`:: +(Optional, boolean) When true, this requires the destination to be an alias. +Defaults to false. +end::require-alias[] + tag::node-filter[] ``:: (Optional, string) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml index be60767aef6..bcd9e238646 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml @@ -53,3 +53,51 @@ teardown: index: test id: 2 - match: { _source.foo: "hello" } +--- +"Test set processor with index change and require_alias": + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "set" : { + "field" : "_index", + "value" : "new_require_alias_index" + } + } + ] + } + - match: { acknowledged: true } + - do: + catch: missing + index: + index: test_require_alias + pipeline: 1 + require_alias: true + body: { foo: bar } + + - do: + catch: missing + indices.get: + index: test_require_alias + - do: + catch: missing + indices.get: + index: new_require_alias_index + + - do: + indices.create: + index: backing_index + body: + mappings: {} + aliases: + new_require_alias_index: {} + + - do: + index: + index: test_require_alias + pipeline: 1 + require_alias: true + body: { foo: bar } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java index 64e7fd31944..867a8f895c8 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java @@ -27,6 +27,7 @@ import org.apache.lucene.util.automaton.CharacterRunAutomaton; import org.apache.lucene.util.automaton.MinimizationOperations; import org.apache.lucene.util.automaton.Operations; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.AutoCreateIndex; @@ -37,6 +38,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.util.List; @@ -114,6 +116,14 @@ class ReindexValidator { return; } String target = destination.index(); + if (destination.isRequireAlias() && (false == clusterState.getMetadata().hasAlias(target))) { + throw new IndexNotFoundException("[" + + DocWriteRequest.REQUIRE_ALIAS + + "] request flag is [true] and [" + + target + + "] is not an alias", + target); + } if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) { /* * If we're going to autocreate the index we don't need to resolve diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index 1677fbd7c61..49e32cfeaaa 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestRequest; @@ -69,6 +70,10 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler extends IndicesRequest, Accountable { + // Flag set for disallowing index auto creation for an individual write request. + String REQUIRE_ALIAS = "require_alias"; + /** * Set the index for this request * @return the Request @@ -163,6 +166,11 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { */ OpType opType(); + /** + * Should this request override specifically require the destination to be an alias? + * @return boolean flag, when true specifically requires an alias + */ + boolean isRequireAlias(); /** * Requested operation type to perform on the document */ diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 2a81393067f..6bdae23a8a3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -408,7 +408,7 @@ public class BulkProcessor implements Closeable { lock.lock(); try { ensureOpen(); - bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, + bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, null, true, xContentType); bulkRequestToExecute = newBulkRequestIfNeeded(); } finally { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 083d325aca8..825d189137b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -79,6 +79,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques private String globalRouting; private String globalIndex; private String globalType; + private Boolean globalRequireAlias; private long sizeInBytes = 0; @@ -258,7 +259,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques @Deprecated public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, XContentType xContentType) throws IOException { - return add(data, defaultIndex, defaultType, null, null, null, true, xContentType); + return add(data, defaultIndex, defaultType, null, null, null, null, true, xContentType); } /** @@ -266,7 +267,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, true, xContentType); + return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, true, xContentType); } /** @@ -276,7 +277,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques @Deprecated public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex, xContentType); + return add(data, defaultIndex, defaultType, null, null, null, null, allowExplicitIndex, xContentType); } /** @@ -284,7 +285,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, allowExplicitIndex, xContentType); + return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, allowExplicitIndex, xContentType); } public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @@ -292,7 +293,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques @Nullable String defaultPipeline, boolean allowExplicitIndex, XContentType xContentType) throws IOException { return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, defaultRouting, defaultFetchSourceContext, - defaultPipeline, allowExplicitIndex, xContentType); + defaultPipeline, null, allowExplicitIndex, xContentType); } /** @@ -301,11 +302,12 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques @Deprecated public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, - @Nullable String defaultPipeline, boolean allowExplicitIndex, + @Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex, XContentType xContentType) throws IOException { String routing = valueOrDefault(defaultRouting, globalRouting); String pipeline = valueOrDefault(defaultPipeline, globalPipeline); - new BulkRequestParser(true).parse(data, defaultIndex, defaultType, routing, defaultFetchSourceContext, pipeline, + Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias); + new BulkRequestParser(true).parse(data, defaultIndex, defaultType, routing, defaultFetchSourceContext, pipeline, requireAlias, allowExplicitIndex, xContentType, this::internalAdd, this::internalAdd, this::add); return this; } @@ -379,6 +381,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return globalRouting; } + public Boolean requireAlias() { + return globalRequireAlias; + } + + public BulkRequest requireAlias(Boolean globalRequireAlias) { + this.globalRequireAlias = globalRequireAlias; + return this; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -434,6 +445,13 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return value; } + private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) { + if (Objects.isNull(value) && !Objects.isNull(globalDefault)) { + return globalDefault; + } + return value; + } + @Override public long ramBytesUsed() { return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 83ed533951e..1667a5767a7 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -66,6 +67,7 @@ public final class BulkRequestParser { private static final ParseField SOURCE = new ParseField("_source"); private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); + private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS); private final boolean warnOnTypeUsage; @@ -112,27 +114,27 @@ public final class BulkRequestParser { public void parse( BytesReference data, @Nullable String defaultIndex, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, - @Nullable String defaultPipeline, boolean allowExplicitIndex, + @Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex, XContentType xContentType, Consumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer) throws IOException { - parse(data, defaultIndex, null, defaultRouting, defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, xContentType, - indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer); + parse(data, defaultIndex, null, defaultRouting, defaultFetchSourceContext, defaultPipeline, + defaultRequireAlias, allowExplicitIndex, xContentType, indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer); } /** * Parse the provided {@code data} assuming the provided default values. Index requests * will be passed to the {@code indexRequestConsumer}, update requests to the * {@code updateRequestConsumer} and delete requests to the {@code deleteRequestConsumer}. - * @deprecated Use {@link #parse(BytesReference, String, String, FetchSourceContext, String, boolean, XContentType, + * @deprecated Use {@link #parse(BytesReference, String, String, FetchSourceContext, String, Boolean, boolean, XContentType, * Consumer, Consumer, Consumer)} instead. */ @Deprecated public void parse( BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, - @Nullable String defaultPipeline, boolean allowExplicitIndex, + @Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex, XContentType xContentType, Consumer indexRequestConsumer, Consumer updateRequestConsumer, @@ -190,6 +192,7 @@ public final class BulkRequestParser { long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; int retryOnConflict = 0; String pipeline = defaultPipeline; + boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters @@ -206,7 +209,7 @@ public final class BulkRequestParser { throw new IllegalArgumentException("explicit index in bulk is not allowed"); } index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); - } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { if (warnOnTypeUsage && typesDeprecationLogged == false) { deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE); typesDeprecationLogged = true; @@ -232,6 +235,8 @@ public final class BulkRequestParser { pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { fetchSourceContext = FetchSourceContext.fromXContent(parser); + } else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) { + requireAlias = parser.booleanValue(); } else { throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"); @@ -269,19 +274,22 @@ public final class BulkRequestParser { indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing) .version(version).versionType(versionType) .setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType)); + .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType) + .setRequireAlias(requireAlias)); } else { indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing) .version(version).versionType(versionType) .create("create".equals(opType)).setPipeline(pipeline) .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)); + .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) + .setRequireAlias(requireAlias)); } } else if ("create".equals(action)) { indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing) .version(version).versionType(versionType) .create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)); + .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) + .setRequireAlias(requireAlias)); } else if ("update".equals(action)) { if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { throw new IllegalArgumentException("Update requests do not support versioning. " + @@ -289,6 +297,7 @@ public final class BulkRequestParser { } UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) + .setRequireAlias(requireAlias) .routing(routing); // EMPTY is safe here because we never call namedObject try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index adbe82bc4e3..634df393cbe 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -223,28 +223,29 @@ public class TransportBulkAction extends HandledTransportAction indices = bulkRequest.requests.stream() - // delete requests should not attempt to create the index (if the index does not - // exists), unless an external versioning is used + final Map indices = bulkRequest.requests.stream() + // delete requests should not attempt to create the index (if the index does not + // exists), unless an external versioning is used .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE - || request.versionType() == VersionType.EXTERNAL - || request.versionType() == VersionType.EXTERNAL_GTE) - .map(DocWriteRequest::index) - .collect(Collectors.toSet()); + || request.versionType() == VersionType.EXTERNAL + || request.versionType() == VersionType.EXTERNAL_GTE) + .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2)); /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create * that we'll use when we try to run the requests. */ final Map indicesThatCannotBeCreated = new HashMap<>(); Set autoCreateIndices = new HashSet<>(); ClusterState state = clusterService.state(); - for (String index : indices) { + for (Map.Entry indexAndFlag : indices.entrySet()) { boolean shouldAutoCreate; + final String index = indexAndFlag.getKey(); try { shouldAutoCreate = shouldAutoCreate(index, state); } catch (IndexNotFoundException e) { shouldAutoCreate = false; indicesThatCannotBeCreated.put(index, e); } - if (shouldAutoCreate) { + // We should only auto create if we are not requiring it to be an alias + if (shouldAutoCreate && (indexAndFlag.getValue() == false)) { autoCreateIndices.add(index); } } @@ -415,6 +416,9 @@ public class TransportBulkAction extends HandledTransportAction request, int idx, final Metadata metadata) { + if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) { + Exception exception = new IndexNotFoundException("[" + + DocWriteRequest.REQUIRE_ALIAS + + "] request flag is [true] and [" + + request.index() + + "] is not an alias", + request.index()); + addFailure(request, idx, exception); + return true; + } + return false; + } + private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int idx, final ConcreteIndices concreteIndices, final Metadata metadata) { IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index ae2849c39d8..275e72513e9 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -305,6 +305,11 @@ public class DeleteRequest extends ReplicatedWriteRequest return OpType.DELETE; } + @Override + public boolean isRequireAlias() { + return false; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index d3280a1adc9..55aa58ac88a 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -109,6 +109,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isPipelineResolved; + private boolean requireAlias; + /** * Value for {@link #getAutoGeneratedTimestamp()} if the document has an external * provided ID. @@ -162,6 +164,11 @@ public class IndexRequest extends ReplicatedWriteRequest implement ifSeqNo = UNASSIGNED_SEQ_NO; ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; } + if (in.getVersion().onOrAfter(Version.V_7_10_0)) { + requireAlias = in.readBoolean(); + } else { + requireAlias = false; + } } public IndexRequest() { @@ -760,6 +767,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement "sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " + "Stream version [" + out.getVersion() + "]"); } + if (out.getVersion().onOrAfter(Version.V_7_10_0)) { + out.writeBoolean(requireAlias); + } } @Override @@ -807,4 +817,14 @@ public class IndexRequest extends ReplicatedWriteRequest implement public long ramBytesUsed() { return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.length()); } + + @Override + public boolean isRequireAlias() { + return requireAlias; + } + + public IndexRequest setRequireAlias(boolean requireAlias) { + this.requireAlias = requireAlias; + return this; + } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java index 19074bbd92e..a7a7e556b7c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java @@ -230,4 +230,12 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder listener) { + if (request.isRequireAlias() && (clusterService.state().getMetadata().hasAlias(request.index()) == false)) { + throw new IndexNotFoundException("[" + + DocWriteRequest.REQUIRE_ALIAS + + "] request flag is [true] and [" + + request.index() + + "] is not an alias", request.index()); + } // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { client.admin().indices().create(new CreateIndexRequest() diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 7fc440c6673..cccdb9e424c 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -126,6 +126,7 @@ public class UpdateRequest extends InstanceShardOperationRequest private boolean scriptedUpsert = false; private boolean docAsUpsert = false; private boolean detectNoop = true; + private boolean requireAlias = false; @Nullable private IndexRequest doc; @@ -176,6 +177,11 @@ public class UpdateRequest extends InstanceShardOperationRequest ifPrimaryTerm = in.readVLong(); detectNoop = in.readBoolean(); scriptedUpsert = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_7_10_0)) { + requireAlias = in.readBoolean(); + } else { + requireAlias = false; + } } public UpdateRequest(String index, String id) { @@ -877,6 +883,16 @@ public class UpdateRequest extends InstanceShardOperationRequest return this; } + @Override + public boolean isRequireAlias() { + return requireAlias; + } + + public UpdateRequest setRequireAlias(boolean requireAlias) { + this.requireAlias = requireAlias; + return this; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -948,6 +964,9 @@ public class UpdateRequest extends InstanceShardOperationRequest out.writeVLong(ifPrimaryTerm); out.writeBoolean(detectNoop); out.writeBoolean(scriptedUpsert); + if (out.getVersion().onOrAfter(Version.V_7_10_0)) { + out.writeBoolean(requireAlias); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index bb7bbf277a3..7c7cff64000 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -257,6 +257,14 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel)); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index 4c7c0dc8f7e..b34016bb0c7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.document; import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; @@ -145,6 +146,7 @@ public class RestIndexAction extends BaseRestHandler { indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType())); indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo())); indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm())); + indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias())); String sOpType = request.param("op_type"); String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index c652e097f03..d397a4e10cc 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.document; import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.update.UpdateRequest; @@ -94,6 +95,7 @@ public class RestUpdateAction extends BaseRestHandler { updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); + updateRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, updateRequest.isRequireAlias())); request.applyContentParser(parser -> { updateRequest.fromXContent(parser); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java index 73a29443629..75d52d3863b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java @@ -42,10 +42,31 @@ public class BulkRequestParserTests extends ESTestCase { assertFalse(parsed.get()); assertEquals("foo", indexRequest.index()); assertEquals("bar", indexRequest.id()); + assertFalse(indexRequest.isRequireAlias()); parsed.set(true); }, req -> fail(), req -> fail()); assertTrue(parsed.get()); + + parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON, + indexRequest -> { + assertTrue(indexRequest.isRequireAlias()); + }, + req -> fail(), req -> fail()); + + request = new BytesArray("{ \"index\":{ \"_id\": \"bar\", \"require_alias\": true } }\n{}\n"); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + indexRequest -> { + assertTrue(indexRequest.isRequireAlias()); + }, + req -> fail(), req -> fail()); + + request = new BytesArray("{ \"index\":{ \"_id\": \"bar\", \"require_alias\": false } }\n{}\n"); + parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON, + indexRequest -> { + assertFalse(indexRequest.isRequireAlias()); + }, + req -> fail(), req -> fail()); } public void testDeleteRequest() throws IOException { @@ -73,13 +94,37 @@ public class BulkRequestParserTests extends ESTestCase { assertFalse(parsed.get()); assertEquals("foo", updateRequest.index()); assertEquals("bar", updateRequest.id()); + assertFalse(updateRequest.isRequireAlias()); parsed.set(true); }, req -> fail()); assertTrue(parsed.get()); + + parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON, + req -> fail(), + updateRequest -> { + assertTrue(updateRequest.isRequireAlias()); + }, + req -> fail()); + + request = new BytesArray("{ \"update\":{ \"_id\": \"bar\", \"require_alias\": true } }\n{}\n"); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + req -> fail(), + updateRequest -> { + assertTrue(updateRequest.isRequireAlias()); + }, + req -> fail()); + + request = new BytesArray("{ \"update\":{ \"_id\": \"bar\", \"require_alias\": false } }\n{}\n"); + parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON, + req -> fail(), + updateRequest -> { + assertFalse(updateRequest.isRequireAlias()); + }, + req -> fail()); } - public void testBarfOnLackOfTrailingNewline() throws IOException { + public void testBarfOnLackOfTrailingNewline() { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}"); BulkRequestParser parser = new BulkRequestParser(randomBoolean()); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, @@ -88,10 +133,10 @@ public class BulkRequestParserTests extends ESTestCase { assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage()); } - public void testFailOnExplicitIndex() throws IOException { + public void testFailOnExplicitIndex() { BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n"); BulkRequestParser parser = new BulkRequestParser(randomBoolean()); - + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail())); @@ -121,7 +166,7 @@ public class BulkRequestParserTests extends ESTestCase { + "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n"); BulkRequestParser parser = new BulkRequestParser(randomBoolean()); final List indexRequests = new ArrayList<>(); - parser.parse(request, null, null, null, null, true, XContentType.JSON, + parser.parse(request, null, null, null, null, null, true, XContentType.JSON, indexRequests::add, req -> fail(), req -> fail()); assertThat(indexRequests, Matchers.hasSize(2)); diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index aa906331943..6bf58b93a04 100644 --- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -157,7 +157,9 @@ public class IndexRequestTests extends ESTestCase { public void testIndexRequestXContentSerialization() throws IOException { IndexRequest indexRequest = new IndexRequest("foo").id("1"); + boolean isRequireAlias = randomBoolean(); indexRequest.source("{}", XContentType.JSON); + indexRequest.setRequireAlias(isRequireAlias); assertEquals(XContentType.JSON, indexRequest.getContentType()); BytesStreamOutput out = new BytesStreamOutput(); @@ -166,6 +168,7 @@ public class IndexRequestTests extends ESTestCase { IndexRequest serialized = new IndexRequest(in); assertEquals(XContentType.JSON, serialized.getContentType()); assertEquals(new BytesArray("{}"), serialized.source()); + assertEquals(isRequireAlias, serialized.isRequireAlias()); } // reindex makes use of index requests without a source so this needs to be handled diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java index 5c1d700343f..1c71e0b163a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java @@ -79,7 +79,7 @@ public class MonitoringBulkRequest extends ActionRequest { final long intervalMillis) throws IOException { // MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest - new BulkRequestParser(false).parse(content, null, null, null, null, true, xContentType, + new BulkRequestParser(false).parse(content, null, null, null, null, null, true, xContentType, indexRequest -> { // we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data // and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it