Improve error message for non append-only writes that target data stream (#60874)
Backport of #60809 to 7.x branch. Closes #60581
This commit is contained in:
parent
6b2ddf4453
commit
64bb082f9b
|
@ -89,6 +89,7 @@ import java.util.function.LongSupplier;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
|
||||
|
@ -595,7 +596,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
if (concreteIndex == null) {
|
||||
try {
|
||||
concreteIndex = concreteIndices.resolveIfAbsent(request);
|
||||
} catch (IndexClosedException | IndexNotFoundException ex) {
|
||||
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
|
||||
addFailure(request, idx, ex);
|
||||
return true;
|
||||
}
|
||||
|
@ -641,8 +642,16 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
Index concreteIndex = indices.get(request.index());
|
||||
if (concreteIndex == null) {
|
||||
boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE;
|
||||
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(), request.indices()[0],
|
||||
false, includeDataStreams);
|
||||
try {
|
||||
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(),
|
||||
request.indices()[0], false, includeDataStreams);
|
||||
} catch (IndexNotFoundException e) {
|
||||
if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
|
||||
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
indices.put(request.index(), concreteIndex);
|
||||
}
|
||||
return concreteIndex;
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -50,6 +51,8 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
|
||||
|
||||
public abstract class TransportInstanceSingleOperationAction<
|
||||
Request extends InstanceShardOperationRequest<Request>,
|
||||
Response extends ActionResponse
|
||||
|
@ -143,7 +146,15 @@ public abstract class TransportInstanceSingleOperationAction<
|
|||
throw blockException;
|
||||
}
|
||||
}
|
||||
request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName());
|
||||
try {
|
||||
request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName());
|
||||
} catch (IndexNotFoundException e) {
|
||||
if (request.includeDataStreams() == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
|
||||
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
resolveRequest(clusterState, request);
|
||||
blockException = checkRequestBlock(clusterState, request);
|
||||
if (blockException != null) {
|
||||
|
|
|
@ -60,6 +60,8 @@ import java.util.stream.StreamSupport;
|
|||
|
||||
public class IndexNameExpressionResolver {
|
||||
|
||||
public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds";
|
||||
|
||||
private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver();
|
||||
private final WildcardExpressionResolver wildcardExpressionResolver = new WildcardExpressionResolver();
|
||||
private final List<ExpressionResolver> expressionResolvers =
|
||||
|
@ -208,6 +210,7 @@ public class IndexNameExpressionResolver {
|
|||
}
|
||||
}
|
||||
|
||||
boolean excludedDataStreams = false;
|
||||
final Set<Index> concreteIndices = new HashSet<>(expressions.size());
|
||||
for (String expression : expressions) {
|
||||
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression);
|
||||
|
@ -232,6 +235,7 @@ public class IndexNameExpressionResolver {
|
|||
}
|
||||
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
|
||||
context.includeDataStreams() == false) {
|
||||
excludedDataStreams = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -273,6 +277,10 @@ public class IndexNameExpressionResolver {
|
|||
if (options.allowNoIndices() == false && concreteIndices.isEmpty()) {
|
||||
IndexNotFoundException infe = new IndexNotFoundException((String)null);
|
||||
infe.setResources("index_expression", indexExpressions);
|
||||
if (excludedDataStreams) {
|
||||
// Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded.
|
||||
infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true");
|
||||
}
|
||||
throw infe;
|
||||
}
|
||||
return concreteIndices.toArray(new Index[concreteIndices.size()]);
|
||||
|
|
|
@ -200,3 +200,50 @@
|
|||
indices.delete_data_stream:
|
||||
name: simple-data-stream1
|
||||
- is_true: acknowledged
|
||||
|
||||
---
|
||||
"Non append-only writes into a data stream":
|
||||
- skip:
|
||||
version: " - 7.8.99"
|
||||
reason: "data streams only supported in 7.9+"
|
||||
features: allowed_warnings
|
||||
|
||||
- do:
|
||||
allowed_warnings:
|
||||
- "index template [my-template1] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
|
||||
indices.put_index_template:
|
||||
name: my-template1
|
||||
body:
|
||||
index_patterns: [logs-*]
|
||||
data_stream: {}
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
index:
|
||||
index: logs-foobar
|
||||
id: "1"
|
||||
body:
|
||||
'@timestamp': '2020-12-12'
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
body:
|
||||
- index:
|
||||
_index: logs-foobar
|
||||
_id: "1"
|
||||
- '@timestamp': '2020-12-12'
|
||||
- create:
|
||||
_index: logs-foobar
|
||||
_id: "1"
|
||||
- '@timestamp': '2020-12-12'
|
||||
- match: { errors: true }
|
||||
- match: { items.0.index.status: 400 }
|
||||
- match: { items.0.index.error.type: illegal_argument_exception }
|
||||
- match: { items.0.index.error.reason: "only write ops with an op_type of create are allowed in data streams" }
|
||||
- match: { items.1.create.result: created }
|
||||
- match: { items.1.create._index: .ds-logs-foobar-000001 }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
name: logs-foobar
|
||||
- is_true: acknowledged
|
||||
|
|
|
@ -230,13 +230,19 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
);
|
||||
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
|
||||
assertThat(bulkResponse.getItems(), arrayWithSize(1));
|
||||
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]"));
|
||||
assertThat(
|
||||
bulkResponse.getItems()[0].getFailure().getMessage(),
|
||||
containsString("only write ops with an op_type of create are allowed in data streams")
|
||||
);
|
||||
}
|
||||
{
|
||||
BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest(dataStreamName, "_id"));
|
||||
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
|
||||
assertThat(bulkResponse.getItems(), arrayWithSize(1));
|
||||
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]"));
|
||||
assertThat(
|
||||
bulkResponse.getItems()[0].getFailure().getMessage(),
|
||||
containsString("only write ops with an op_type of create are allowed in data streams")
|
||||
);
|
||||
}
|
||||
{
|
||||
BulkRequest bulkRequest = new BulkRequest().add(
|
||||
|
@ -244,22 +250,25 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
);
|
||||
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
|
||||
assertThat(bulkResponse.getItems(), arrayWithSize(1));
|
||||
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("no such index [null]"));
|
||||
assertThat(
|
||||
bulkResponse.getItems()[0].getFailure().getMessage(),
|
||||
containsString("only write ops with an op_type of create are allowed in data streams")
|
||||
);
|
||||
}
|
||||
{
|
||||
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON);
|
||||
Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("no such index [null]"));
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
|
||||
}
|
||||
{
|
||||
UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON);
|
||||
Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("no such index [null]"));
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> client().update(updateRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
|
||||
}
|
||||
{
|
||||
DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id");
|
||||
Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("no such index [null]"));
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> client().delete(deleteRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
|
||||
}
|
||||
{
|
||||
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
|
||||
|
|
Loading…
Reference in New Issue