[7.x] Adding new `require_alias` option to indexing requests (#58917) (#59769)

* Adding new `require_alias` option to indexing requests (#58917)

This commit adds the `require_alias` flag to requests that create new documents.

This flag, when `true` prevents the request from automatically creating an index. Instead, the destination of the request MUST be an alias.

When the flag is not set, or `false`, the behavior defaults to the `action.auto_create_index` settings.

This is useful when an alias is required instead of a concrete index.

closes https://github.com/elastic/elasticsearch/issues/55267
This commit is contained in:
Benjamin Trent 2020-07-17 10:24:58 -04:00 committed by GitHub
parent 65f6fb8e94
commit b7f30fc929
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 447 additions and 36 deletions

View File

@ -70,6 +70,7 @@ public class RestNoopBulkAction extends BaseRestHandler {
String defaultType = request.param("type");
String defaultRouting = request.param("routing");
String defaultPipeline = request.param("pipeline");
Boolean defaultRequireAlias = request.paramAsBoolean("require_alias", null);
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
@ -78,7 +79,7 @@ public class RestNoopBulkAction extends BaseRestHandler {
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
null, defaultPipeline, true, request.getXContentType());
null, defaultPipeline, defaultRequireAlias, true, request.getXContentType());
// short circuit the call to the transport layer
return channel -> {

View File

@ -25,9 +25,9 @@ and <<update-delete-docs-in-a-backing-index>>.
`POST /<target>/_create/<_id>`
IMPORTANT: You cannot add new documents to a data stream using the
`PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
`PUT /<target>/_create/<_id>` format instead. See
IMPORTANT: You cannot add new documents to a data stream using the
`PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
`PUT /<target>/_create/<_id>` format instead. See
<<add-documents-to-a-data-stream>>.
[[docs-index-api-path-params]]
@ -94,6 +94,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=version_type]
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=wait_for_active_shards]
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=require-alias]
[[docs-index-api-request-body]]
==== {api-request-body-title}

View File

@ -53,6 +53,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=if_primary_term]
`lang`::
(Optional, string) The script language. Default: `painless`.
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=require-alias]
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]
`retry_on_conflict`::

View File

@ -592,6 +592,12 @@ such as `1264`.
A value of `-1` indicates {es} was unable to compute this number.
end::memory[]
tag::require-alias[]
`require_alias`::
(Optional, boolean) When true, this requires the destination to be an alias.
Defaults to false.
end::require-alias[]
tag::node-filter[]
`<node_filter>`::
(Optional, string)

View File

@ -53,3 +53,51 @@ teardown:
index: test
id: 2
- match: { _source.foo: "hello" }
---
"Test set processor with index change and require_alias":
- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"set" : {
"field" : "_index",
"value" : "new_require_alias_index"
}
}
]
}
- match: { acknowledged: true }
- do:
catch: missing
index:
index: test_require_alias
pipeline: 1
require_alias: true
body: { foo: bar }
- do:
catch: missing
indices.get:
index: test_require_alias
- do:
catch: missing
indices.get:
index: new_require_alias_index
- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
new_require_alias_index: {}
- do:
index:
index: test_require_alias
pipeline: 1
require_alias: true
body: { foo: bar }

View File

@ -27,6 +27,7 @@ import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.apache.lucene.util.automaton.MinimizationOperations;
import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.AutoCreateIndex;
@ -37,6 +38,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.List;
@ -114,6 +116,14 @@ class ReindexValidator {
return;
}
String target = destination.index();
if (destination.isRequireAlias() && (false == clusterState.getMetadata().hasAlias(target))) {
throw new IndexNotFoundException("["
+ DocWriteRequest.REQUIRE_ALIAS
+ "] request flag is [true] and ["
+ target
+ "] is not an alias",
target);
}
if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
/*
* If we're going to autocreate the index we don't need to resolve

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestRequest;
@ -69,6 +70,10 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
if (request.hasParam("scroll")) {
internal.setScroll(parseTimeValue(request.param("scroll"), "scroll"));
}
if (request.hasParam(DocWriteRequest.REQUIRE_ALIAS)) {
internal.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false));
}
return internal;
}
}

View File

@ -87,6 +87,10 @@
"pipeline":{
"type":"string",
"description":"The pipeline id to preprocess incoming documents with"
},
"require_alias": {
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
}
},
"body":{

View File

@ -139,6 +139,10 @@
"pipeline":{
"type":"string",
"description":"The pipeline id to preprocess incoming documents with"
},
"require_alias": {
"type": "boolean",
"description": "When true, requires destination to be an alias. Default is false"
}
},
"body":{

View File

@ -99,6 +99,10 @@
"if_primary_term":{
"type":"number",
"description":"only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"require_alias": {
"type": "boolean",
"description": "When true, requires destination is an alias. Default is false"
}
},
"body":{

View File

@ -122,3 +122,86 @@
{"index": {"_index": "test_index", "_id": "test_id"}}
{"f1": "v1", "f2": 42}
{}
---
"When setting require_alias flag per request":
- skip:
version: " - 7.9.99"
reason: "require_alias flag was added in version 7.10"
- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
test_require_alias: {}
- do:
bulk:
refresh: true
body:
- index:
_index: new_index_not_created
require_alias: true
- f: 1
- index:
_index: new_index_created
- f: 2
- index:
_index: test_require_alias
require_alias: true
- f: 3
- create:
_index: test_require_alias
- f: 4
- match: { errors: true }
- match: { items.0.index.status: 404 }
- match: { items.0.index.error.type: index_not_found_exception }
- match: { items.0.index.error.reason: "no such index [new_index_not_created] and [require_alias] request flag is [true] and [new_index_not_created] is not an alias" }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- match: { items.3.create.result: created }
- do:
catch: missing
indices.get:
index: new_index_not_created
---
"When setting require_alias flag":
- skip:
version: " - 7.9.99"
reason: "require_alias flag was added in version 7.10"
- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
test_require_alias: {}
- do:
bulk:
refresh: true
require_alias: true
body:
- index:
_index: new_index_not_created
- f: 1
- index:
_index: new_index_created
require_alias: false
- f: 2
- index:
_index: test_require_alias
- f: 3
- match: { errors: true }
- match: { items.0.index.status: 404 }
- match: { items.0.index.error.type: index_not_found_exception }
- match: { items.0.index.error.reason: "no such index [new_index_not_created] and [require_alias] request flag is [true] and [new_index_not_created] is not an alias" }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- do:
catch: missing
indices.get:
index: new_index_not_created

View File

@ -0,0 +1,29 @@
---
"Set require_alias flag":
- skip:
version: " - 7.9.99"
reason: "require_alias flag added in 7.10"
- do:
catch: missing
index:
index: test_require_alias
require_alias: true
body: { foo: bar }
- do:
catch: missing
indices.get:
index: test_require_alias
- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
test_require_alias: {}
- do:
index:
index: test_require_alias
require_alias: true
body: { foo: bar }

View File

@ -0,0 +1,35 @@
---
"Set require_alias flag":
- skip:
version: " - 7.9.99"
reason: "require_alias flag added in 7.10"
- do:
catch: missing
update:
index: test_require_alias
id: 1
require_alias: true
body:
doc: { foo: bar, count: 1 }
doc_as_upsert: true
- do:
catch: missing
indices.get:
index: test_require_alias
- do:
indices.create:
index: backing_index
body:
mappings: {}
aliases:
test_require_alias: {}
- do:
update:
index: test_require_alias
id: 1
require_alias: true
body:
doc: { foo: bar, count: 1 }
doc_as_upsert: true

View File

@ -43,6 +43,9 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
*/
public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
// Flag set for disallowing index auto creation for an individual write request.
String REQUIRE_ALIAS = "require_alias";
/**
* Set the index for this request
* @return the Request
@ -163,6 +166,11 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
*/
OpType opType();
/**
* Should this request override specifically require the destination to be an alias?
* @return boolean flag, when true specifically requires an alias
*/
boolean isRequireAlias();
/**
* Requested operation type to perform on the document
*/

View File

@ -408,7 +408,7 @@ public class BulkProcessor implements Closeable {
lock.lock();
try {
ensureOpen();
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline,
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, null,
true, xContentType);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {

View File

@ -79,6 +79,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private String globalRouting;
private String globalIndex;
private String globalType;
private Boolean globalRequireAlias;
private long sizeInBytes = 0;
@ -258,7 +259,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Deprecated
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, defaultType, null, null, null, true, xContentType);
return add(data, defaultIndex, defaultType, null, null, null, null, true, xContentType);
}
/**
@ -266,7 +267,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, true, xContentType);
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, true, xContentType);
}
/**
@ -276,7 +277,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Deprecated
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex, xContentType);
return add(data, defaultIndex, defaultType, null, null, null, null, allowExplicitIndex, xContentType);
}
/**
@ -284,7 +285,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, allowExplicitIndex, xContentType);
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, allowExplicitIndex, xContentType);
}
public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
@ -292,7 +293,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Nullable String defaultPipeline, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, defaultRouting, defaultFetchSourceContext,
defaultPipeline, allowExplicitIndex, xContentType);
defaultPipeline, null, allowExplicitIndex, xContentType);
}
/**
@ -301,11 +302,12 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
@Deprecated
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, boolean allowExplicitIndex,
@Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
new BulkRequestParser(true).parse(data, defaultIndex, defaultType, routing, defaultFetchSourceContext, pipeline,
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser(true).parse(data, defaultIndex, defaultType, routing, defaultFetchSourceContext, pipeline, requireAlias,
allowExplicitIndex, xContentType, this::internalAdd, this::internalAdd, this::add);
return this;
}
@ -379,6 +381,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return globalRouting;
}
public Boolean requireAlias() {
return globalRequireAlias;
}
public BulkRequest requireAlias(Boolean globalRequireAlias) {
this.globalRequireAlias = globalRequireAlias;
return this;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -434,6 +445,13 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
return value;
}
private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) {
if (Objects.isNull(value) && !Objects.isNull(globalDefault)) {
return globalDefault;
}
return value;
}
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -66,6 +67,7 @@ public final class BulkRequestParser {
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
private final boolean warnOnTypeUsage;
@ -112,27 +114,27 @@ public final class BulkRequestParser {
public void parse(
BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, boolean allowExplicitIndex,
@Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer) throws IOException {
parse(data, defaultIndex, null, defaultRouting, defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, xContentType,
indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer);
parse(data, defaultIndex, null, defaultRouting, defaultFetchSourceContext, defaultPipeline,
defaultRequireAlias, allowExplicitIndex, xContentType, indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer);
}
/**
* Parse the provided {@code data} assuming the provided default values. Index requests
* will be passed to the {@code indexRequestConsumer}, update requests to the
* {@code updateRequestConsumer} and delete requests to the {@code deleteRequestConsumer}.
* @deprecated Use {@link #parse(BytesReference, String, String, FetchSourceContext, String, boolean, XContentType,
* @deprecated Use {@link #parse(BytesReference, String, String, FetchSourceContext, String, Boolean, boolean, XContentType,
* Consumer, Consumer, Consumer)} instead.
*/
@Deprecated
public void parse(
BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, boolean allowExplicitIndex,
@Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
@ -190,6 +192,7 @@ public final class BulkRequestParser {
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
@ -206,7 +209,7 @@ public final class BulkRequestParser {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (warnOnTypeUsage && typesDeprecationLogged == false) {
deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
typesDeprecationLogged = true;
@ -232,6 +235,8 @@ public final class BulkRequestParser {
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
@ -269,19 +274,22 @@ public final class BulkRequestParser {
indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing)
.version(version).versionType(versionType)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType));
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType)
.setRequireAlias(requireAlias));
} else {
indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing)
.version(version).versionType(versionType)
.create("create".equals(opType)).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType));
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias));
}
} else if ("create".equals(action)) {
indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing)
.version(version).versionType(versionType)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType));
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias));
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException("Update requests do not support versioning. " +
@ -289,6 +297,7 @@ public final class BulkRequestParser {
}
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
.routing(routing);
// EMPTY is safe here because we never call namedObject
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();

View File

@ -223,28 +223,29 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (needToCheck()) {
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
final Set<String> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
final Map<String, Boolean> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));
/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
* that we'll use when we try to run the requests. */
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
for (String index : indices) {
for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
boolean shouldAutoCreate;
final String index = indexAndFlag.getKey();
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
// We should only auto create if we are not requiring it to be an alias
if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
autoCreateIndices.add(index);
}
}
@ -415,6 +416,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (docWriteRequest == null) {
continue;
}
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
continue;
}
@ -566,6 +570,20 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
});
}
private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> request, int idx, final Metadata metadata) {
if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) {
Exception exception = new IndexNotFoundException("["
+ DocWriteRequest.REQUIRE_ALIAS
+ "] request flag is [true] and ["
+ request.index()
+ "] is not an alias",
request.index());
addFailure(request, idx, exception);
return true;
}
return false;
}
private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int idx, final ConcreteIndices concreteIndices,
final Metadata metadata) {
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());

View File

@ -305,6 +305,11 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
return OpType.DELETE;
}
@Override
public boolean isRequireAlias() {
return false;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -109,6 +109,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private boolean isPipelineResolved;
private boolean requireAlias;
/**
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
* provided ID.
@ -162,6 +164,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
ifSeqNo = UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
requireAlias = in.readBoolean();
} else {
requireAlias = false;
}
}
public IndexRequest() {
@ -760,6 +767,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
}
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeBoolean(requireAlias);
}
}
@Override
@ -807,4 +817,14 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
public long ramBytesUsed() {
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.length());
}
@Override
public boolean isRequireAlias() {
return requireAlias;
}
public IndexRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
}
}

View File

@ -230,4 +230,12 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
request.setPipeline(pipeline);
return this;
}
/**
* Sets the require_alias flag
*/
public IndexRequestBuilder setRequireAlias(boolean requireAlias) {
request.setRequireAlias(requireAlias);
return this;
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.update;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -48,6 +49,7 @@ import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@ -71,6 +73,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
private final UpdateHelper updateHelper;
private final IndicesService indicesService;
private final NodeClient client;
private final ClusterService clusterService;
@Inject
public TransportUpdateAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
@ -83,6 +86,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
this.indicesService = indicesService;
this.autoCreateIndex = autoCreateIndex;
this.client = client;
this.clusterService = clusterService;
}
@Override
@ -115,6 +119,13 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
@Override
protected void doExecute(Task task, final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
if (request.isRequireAlias() && (clusterService.state().getMetadata().hasAlias(request.index()) == false)) {
throw new IndexNotFoundException("["
+ DocWriteRequest.REQUIRE_ALIAS
+ "] request flag is [true] and ["
+ request.index()
+ "] is not an alias", request.index());
}
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
client.admin().indices().create(new CreateIndexRequest()

View File

@ -126,6 +126,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private boolean scriptedUpsert = false;
private boolean docAsUpsert = false;
private boolean detectNoop = true;
private boolean requireAlias = false;
@Nullable
private IndexRequest doc;
@ -176,6 +177,11 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
ifPrimaryTerm = in.readVLong();
detectNoop = in.readBoolean();
scriptedUpsert = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
requireAlias = in.readBoolean();
} else {
requireAlias = false;
}
}
public UpdateRequest(String index, String id) {
@ -877,6 +883,16 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return this;
}
@Override
public boolean isRequireAlias() {
return requireAlias;
}
public UpdateRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -948,6 +964,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
out.writeVLong(ifPrimaryTerm);
out.writeBoolean(detectNoop);
out.writeBoolean(scriptedUpsert);
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeBoolean(requireAlias);
}
}
@Override

View File

@ -257,6 +257,14 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
return this;
}
/**
* Sets the require_alias request flag on the destination index
*/
public ReindexRequest setRequireAlias(boolean requireAlias) {
this.getDestination().setRequireAlias(requireAlias);
return this;
}
/**
* Gets the target for this reindex request in the for of an {@link IndexRequest}
*/

View File

@ -20,6 +20,7 @@
package org.elasticsearch.rest.action.document;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActiveShardCount;
@ -96,10 +97,11 @@ public class RestBulkAction extends BaseRestHandler {
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, request.getXContentType());
defaultFetchSourceContext, defaultPipeline, defaultRequireAlias, allowExplicitIndex, request.getXContentType());
return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.document;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
@ -145,6 +146,7 @@ public class RestIndexAction extends BaseRestHandler {
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias()));
String sOpType = request.param("op_type");
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.document;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.update.UpdateRequest;
@ -94,6 +95,7 @@ public class RestUpdateAction extends BaseRestHandler {
updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo()));
updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm()));
updateRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, updateRequest.isRequireAlias()));
request.applyContentParser(parser -> {
updateRequest.fromXContent(parser);

View File

@ -42,10 +42,31 @@ public class BulkRequestParserTests extends ESTestCase {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
assertFalse(indexRequest.isRequireAlias());
parsed.set(true);
},
req -> fail(), req -> fail());
assertTrue(parsed.get());
parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON,
indexRequest -> {
assertTrue(indexRequest.isRequireAlias());
},
req -> fail(), req -> fail());
request = new BytesArray("{ \"index\":{ \"_id\": \"bar\", \"require_alias\": true } }\n{}\n");
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> {
assertTrue(indexRequest.isRequireAlias());
},
req -> fail(), req -> fail());
request = new BytesArray("{ \"index\":{ \"_id\": \"bar\", \"require_alias\": false } }\n{}\n");
parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON,
indexRequest -> {
assertFalse(indexRequest.isRequireAlias());
},
req -> fail(), req -> fail());
}
public void testDeleteRequest() throws IOException {
@ -73,13 +94,37 @@ public class BulkRequestParserTests extends ESTestCase {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
assertFalse(updateRequest.isRequireAlias());
parsed.set(true);
},
req -> fail());
assertTrue(parsed.get());
parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON,
req -> fail(),
updateRequest -> {
assertTrue(updateRequest.isRequireAlias());
},
req -> fail());
request = new BytesArray("{ \"update\":{ \"_id\": \"bar\", \"require_alias\": true } }\n{}\n");
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
req -> fail(),
updateRequest -> {
assertTrue(updateRequest.isRequireAlias());
},
req -> fail());
request = new BytesArray("{ \"update\":{ \"_id\": \"bar\", \"require_alias\": false } }\n{}\n");
parser.parse(request, "foo", null, null, null, true, false, XContentType.JSON,
req -> fail(),
updateRequest -> {
assertFalse(updateRequest.isRequireAlias());
},
req -> fail());
}
public void testBarfOnLackOfTrailingNewline() throws IOException {
public void testBarfOnLackOfTrailingNewline() {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
@ -88,10 +133,10 @@ public class BulkRequestParserTests extends ESTestCase {
assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage());
}
public void testFailOnExplicitIndex() throws IOException {
public void testFailOnExplicitIndex() {
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON,
req -> fail(), req -> fail(), req -> fail()));
@ -121,7 +166,7 @@ public class BulkRequestParserTests extends ESTestCase {
+ "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final List<IndexRequest> indexRequests = new ArrayList<>();
parser.parse(request, null, null, null, null, true, XContentType.JSON,
parser.parse(request, null, null, null, null, null, true, XContentType.JSON,
indexRequests::add,
req -> fail(), req -> fail());
assertThat(indexRequests, Matchers.hasSize(2));

View File

@ -157,7 +157,9 @@ public class IndexRequestTests extends ESTestCase {
public void testIndexRequestXContentSerialization() throws IOException {
IndexRequest indexRequest = new IndexRequest("foo").id("1");
boolean isRequireAlias = randomBoolean();
indexRequest.source("{}", XContentType.JSON);
indexRequest.setRequireAlias(isRequireAlias);
assertEquals(XContentType.JSON, indexRequest.getContentType());
BytesStreamOutput out = new BytesStreamOutput();
@ -166,6 +168,7 @@ public class IndexRequestTests extends ESTestCase {
IndexRequest serialized = new IndexRequest(in);
assertEquals(XContentType.JSON, serialized.getContentType());
assertEquals(new BytesArray("{}"), serialized.source());
assertEquals(isRequireAlias, serialized.isRequireAlias());
}
// reindex makes use of index requests without a source so this needs to be handled

View File

@ -79,7 +79,7 @@ public class MonitoringBulkRequest extends ActionRequest {
final long intervalMillis) throws IOException {
// MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest
new BulkRequestParser(false).parse(content, null, null, null, null, true, xContentType,
new BulkRequestParser(false).parse(content, null, null, null, null, null, true, xContentType,
indexRequest -> {
// we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data
// and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it