From 64bb082f9be570f5fdffa356c152c4432da85038 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Aug 2020 13:18:59 +0200 Subject: [PATCH] Improve error message for non append-only writes that target data stream (#60874) Backport of #60809 to 7.x branch. Closes #60581 --- .../action/bulk/TransportBulkAction.java | 15 ++++-- ...ransportInstanceSingleOperationAction.java | 13 ++++- .../metadata/IndexNameExpressionResolver.java | 8 ++++ .../test/data-streams/20_unsupported_apis.yml | 47 +++++++++++++++++++ .../datastreams/DataStreamIT.java | 27 +++++++---- 5 files changed, 97 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 634df393cbe..72e674c2e56 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -89,6 +89,7 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; +import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -595,7 +596,7 @@ public class TransportBulkAction extends HandledTransportAction, Response extends ActionResponse @@ -143,7 +146,15 @@ public abstract class TransportInstanceSingleOperationAction< throw blockException; } } - request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName()); + try { + request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName()); + } catch (IndexNotFoundException e) { + if (request.includeDataStreams() == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); + } else { + throw e; + } + } resolveRequest(clusterState, request); blockException = checkRequestBlock(clusterState, request); if (blockException != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index f3396a78f0b..09b8a15482d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -60,6 +60,8 @@ import java.util.stream.StreamSupport; public class IndexNameExpressionResolver { + public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds"; + private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver(); private final WildcardExpressionResolver wildcardExpressionResolver = new WildcardExpressionResolver(); private final List expressionResolvers = @@ -208,6 +210,7 @@ public class IndexNameExpressionResolver { } } + boolean excludedDataStreams = false; final Set concreteIndices = new HashSet<>(expressions.size()); for (String expression : expressions) { IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression); @@ -232,6 +235,7 @@ public class IndexNameExpressionResolver { } } else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM && context.includeDataStreams() == false) { + excludedDataStreams = true; continue; } @@ -273,6 +277,10 @@ public class IndexNameExpressionResolver { if (options.allowNoIndices() == false && concreteIndices.isEmpty()) { IndexNotFoundException infe = new IndexNotFoundException((String)null); infe.setResources("index_expression", indexExpressions); + if (excludedDataStreams) { + // Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded. + infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true"); + } throw infe; } return concreteIndices.toArray(new Index[concreteIndices.size()]); diff --git a/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml b/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml index 03cbb1bae8c..2fc77ee7368 100644 --- a/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml +++ b/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml @@ -200,3 +200,50 @@ indices.delete_data_stream: name: simple-data-stream1 - is_true: acknowledged + +--- +"Non append-only writes into a data stream": + - skip: + version: " - 7.8.99" + reason: "data streams only supported in 7.9+" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [my-template1] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation" + indices.put_index_template: + name: my-template1 + body: + index_patterns: [logs-*] + data_stream: {} + + - do: + catch: bad_request + index: + index: logs-foobar + id: "1" + body: + '@timestamp': '2020-12-12' + + - do: + bulk: + body: + - index: + _index: logs-foobar + _id: "1" + - '@timestamp': '2020-12-12' + - create: + _index: logs-foobar + _id: "1" + - '@timestamp': '2020-12-12' + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: illegal_argument_exception } + - match: { items.0.index.error.reason: "only write ops with an op_type of create are allowed in data streams" } + - match: { items.1.create.result: created } + - match: { items.1.create._index: .ds-logs-foobar-000001 } + + - do: + indices.delete_data_stream: + name: logs-foobar + - is_true: acknowledged diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 004e8efea34..3361d790a21 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -230,13 +230,19 @@ public class DataStreamIT extends ESIntegTestCase { ); BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); assertThat(bulkResponse.getItems(), arrayWithSize(1)); - assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]")); + assertThat( + bulkResponse.getItems()[0].getFailure().getMessage(), + containsString("only write ops with an op_type of create are allowed in data streams") + ); } { BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest(dataStreamName, "_id")); BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); assertThat(bulkResponse.getItems(), arrayWithSize(1)); - assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]")); + assertThat( + bulkResponse.getItems()[0].getFailure().getMessage(), + containsString("only write ops with an op_type of create are allowed in data streams") + ); } { BulkRequest bulkRequest = new BulkRequest().add( @@ -244,22 +250,25 @@ public class DataStreamIT extends ESIntegTestCase { ); BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); assertThat(bulkResponse.getItems(), arrayWithSize(1)); - assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]")); + assertThat( + bulkResponse.getItems()[0].getFailure().getMessage(), + containsString("only write ops with an op_type of create are allowed in data streams") + ); } { IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON); - Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet()); - assertThat(e.getMessage(), equalTo("no such index [null]")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams")); } { UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON); - Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet()); - assertThat(e.getMessage(), equalTo("no such index [null]")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().update(updateRequest).actionGet()); + assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams")); } { DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id"); - Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet()); - assertThat(e.getMessage(), equalTo("no such index [null]")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().delete(deleteRequest).actionGet()); + assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams")); } { IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)