Always validate that only a create op is allowed in bulk api for data streams (#62820)

Backport #62766 to 7.x branch.

The bulk api cache the resolved concrete indices when resolving the user provided
index name into the actual index name. The validation that prevents write ops other
than create from being executed in a data stream was only performed if the result
wasn't cached. In case of cached resolvings, the validation never occurs.

The validation would be skipped for all bulk items for a data stream after a create
operation for that same data stream. This commit ensures that the validation is always
performed for all bulk items (whether the concrete index resolution has been cached or
not cached).

Closes #62762
This commit is contained in:
Martijn van Groningen 2020-09-23 16:27:54 +02:00 committed by GitHub
parent f8bc5a3e6b
commit 0baefc8ddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 2 deletions

View File

@ -446,6 +446,18 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
try {
// The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
// an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
// the validation needs to be performed here too.
IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName());
if (indexAbstraction.getParentDataStream() != null &&
// avoid valid cases when directly indexing into a backing index
// (for example when directly indexing into .ds-logs-foobar-000001)
concreteIndex.getName().equals(docWriteRequest.index()) == false &&
docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
}
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:

View File

@ -284,6 +284,118 @@ public class DataStreamIT extends ESIntegTestCase {
BulkResponse bulkItemResponses = client().bulk(bulkRequest).actionGet();
assertThat(bulkItemResponses.getItems()[0].getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
}
{
// TODO: remove when fixing the bug when an index matching a backing index name is created before the data stream is created
createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName + "-baz");
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
BulkRequest bulkRequest = new BulkRequest().add(
new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON),
new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON).create(true),
new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON),
new UpdateRequest(dataStreamName, "_id").doc("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON),
new DeleteRequest(dataStreamName, "_id"),
new IndexRequest(dataStreamName + "-baz").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON).create(true),
new DeleteRequest(dataStreamName + "-baz", "_id"),
new IndexRequest(dataStreamName + "-baz").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON),
new IndexRequest(dataStreamName + "-baz").source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON).create(true),
// Non create ops directly against backing indices are allowed:
new DeleteRequest(DataStream.getDefaultBackingIndexName(dataStreamName + "-baz", 1), "_id"),
new IndexRequest(DataStream.getDefaultBackingIndexName(dataStreamName + "-baz", 1)).source(
"{\"@timestamp\": \"2020-12-12\"}",
XContentType.JSON
).id("_id").setIfSeqNo(1).setIfPrimaryTerm(1)
);
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems(), arrayWithSize(11));
{
assertThat(bulkResponse.getItems()[0].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[0].getResponse(), nullValue());
assertThat(
bulkResponse.getItems()[0].getFailure().getMessage(),
containsString("only write ops with an op_type of create are allowed in data streams")
);
}
{
assertThat(bulkResponse.getItems()[1].getFailure(), nullValue());
assertThat(bulkResponse.getItems()[1].getResponse(), notNullValue());
assertThat(bulkResponse.getItems()[1].getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1)));
}
{
assertThat(bulkResponse.getItems()[2].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[2].getResponse(), nullValue());
assertThat(
bulkResponse.getItems()[2].getFailure().getMessage(),
containsString("only write ops with an op_type of create are allowed in data streams")
);
}
{
assertThat(bulkResponse.getItems()[3].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[3].getResponse(), nullValue());
assertThat(
bulkResponse.getItems()[3].getFailure().getMessage(),
containsString("only write ops with an op_type of create are allowed in data streams")
);
}
{
assertThat(bulkResponse.getItems()[4].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[4].getResponse(), nullValue());
assertThat(
bulkResponse.getItems()[4].getFailure().getMessage(),
containsString("only write ops with an op_type of create are allowed in data streams")
);
}
{
assertThat(bulkResponse.getItems()[5].getFailure(), nullValue());
assertThat(bulkResponse.getItems()[5].getResponse(), notNullValue());
assertThat(
bulkResponse.getItems()[5].getIndex(),
equalTo(DataStream.getDefaultBackingIndexName(dataStreamName + "-baz", 1))
);
}
{
assertThat(bulkResponse.getItems()[6].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[6].getResponse(), nullValue());
assertThat(
bulkResponse.getItems()[6].getFailure().getMessage(),
containsString("only write ops with an op_type of create are allowed in data streams")
);
}
{
assertThat(bulkResponse.getItems()[7].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[7].getResponse(), nullValue());
assertThat(
bulkResponse.getItems()[7].getFailure().getMessage(),
containsString("only write ops with an op_type of create are allowed in data streams")
);
}
{
assertThat(bulkResponse.getItems()[8].getFailure(), nullValue());
assertThat(bulkResponse.getItems()[8].getResponse(), notNullValue());
assertThat(
bulkResponse.getItems()[8].getIndex(),
equalTo(DataStream.getDefaultBackingIndexName(dataStreamName + "-baz", 1))
);
}
{
assertThat(bulkResponse.getItems()[9].getFailure(), nullValue());
assertThat(bulkResponse.getItems()[9].getResponse(), notNullValue());
assertThat(
bulkResponse.getItems()[9].getIndex(),
equalTo(DataStream.getDefaultBackingIndexName(dataStreamName + "-baz", 1))
);
}
{
assertThat(bulkResponse.getItems()[10].getResponse(), nullValue());
assertThat(bulkResponse.getItems()[10].getFailure(), notNullValue());
assertThat(bulkResponse.getItems()[10].status(), equalTo(RestStatus.CONFLICT));
assertThat(
bulkResponse.getItems()[10].getIndex(),
equalTo(DataStream.getDefaultBackingIndexName(dataStreamName + "-baz", 1))
);
}
}
}
/**

View File

@ -200,8 +200,8 @@
---
"Non append-only writes into a data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
version: " - 7.99.99"
reason: "set to 7.9.3 after backporting #62766"
features: allowed_warnings
- do:
@ -232,12 +232,25 @@
_index: logs-foobar
_id: "1"
- '@timestamp': '2020-12-12'
- index:
_index: logs-foobar
_id: "1"
- '@timestamp': '2020-12-12'
- delete:
_index: logs-foobar
_id: 10
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: "only write ops with an op_type of create are allowed in data streams" }
- match: { items.1.create.result: created }
- match: { items.1.create._index: .ds-logs-foobar-000001 }
- match: { items.2.index.status: 400 }
- match: { items.2.index.error.type: illegal_argument_exception }
- match: { items.2.index.error.reason: "only write ops with an op_type of create are allowed in data streams" }
- match: { items.3.delete.status: 400 }
- match: { items.3.delete.error.type: illegal_argument_exception }
- match: { items.3.delete.error.reason: "only write ops with an op_type of create are allowed in data streams" }
- do:
indices.delete_data_stream: