From f4199f2ee0672ee566cccf995f0722b922549d27 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 12 Jun 2020 13:17:55 +0200 Subject: [PATCH] Prohibit append-only writes targeting backing indices directly. (#58025) Backport of #57788 to 7.x branch. Append-only writes can only target the corresponding data stream. Relates to #53100 --- .../test/indices.data_stream/10_basic.yml | 56 +++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 38 +++++++++++++ .../action/bulk/TransportBulkActionTests.java | 52 +++++++++++++++++ .../MetadataCreateDataStreamServiceTests.java | 12 ++++ 4 files changed, 158 insertions(+) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 3e51f8866ee..5650e816ba2 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -200,3 +200,59 @@ setup: catch: missing indices.get: index: ".ds-simple-data-stream1-000001" + +--- +"append-only writes to backing indices prohobited": + - skip: + version: " - 7.9.99" + reason: "enable in 7.9+ when backported" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" + indices.put_index_template: + name: generic_logs_template + body: + index_patterns: logs-* + data_stream: + timestamp_field: timestamp + + - do: + index: + index: logs-foobar + body: { foo: bar } + - match: { _index: .ds-logs-foobar-000001 } + + - do: + catch: bad_request + index: + index: .ds-logs-foobar-000001 + body: { foo: bar } + + - do: + bulk: + body: + - create: + _index: .ds-logs-foobar-000001 + - foo: bar + - index: + _index: .ds-logs-foobar-000001 + - foo: bar + - create: + _index: logs-foobar + - foo: bar + - match: { errors: true } + - match: { items.0.create.status: 400 } + - match: { items.0.create.error.type: illegal_argument_exception } + - match: { items.0.create.error.reason: "index request with op_type=create targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead" } + - match: { items.1.index.status: 400 } + - match: { items.1.index.error.type: illegal_argument_exception } + - match: { items.1.index.error.reason: "index request with op_type=index and no if_primary_term and if_seq_no set targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead" } + - match: { items.2.create.result: created } + - match: { items.2.create._index: .ds-logs-foobar-000001 } + + - do: + indices.delete_data_stream: + name: logs-foobar + - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 870fe6846e5..fe21ec34084 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -90,6 +91,8 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** * Groups bulk request items by shard, optionally creating non-existent indices and @@ -276,6 +279,40 @@ public class TransportBulkAction extends HandledTransportAction writeRequest, Metadata metadata) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index()); + if (indexAbstraction == null) { + return; + } + if (indexAbstraction.getType() != IndexAbstraction.Type.CONCRETE_INDEX) { + return; + } + if (indexAbstraction.getParentDataStream() == null) { + return; + } + + DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream(); + + // At this point with write op is targeting a backing index of a data stream directly, + // so checking if write op is append-only and if so fail. + // (Updates and deletes are allowed to target a backing index) + + DocWriteRequest.OpType opType = writeRequest.opType(); + // CREATE op_type is considered append-only and + // INDEX op_type is considered append-only when no if_primary_term and if_seq_no is specified. + // (the latter maybe an update, but at this stage we can't determine that. In order to determine + // that an engine level change is needed and for now this check is sufficient.) + if (opType == DocWriteRequest.OpType.CREATE) { + throw new IllegalArgumentException("index request with op_type=create targeting backing indices is disallowed, " + + "target corresponding data stream [" + dataStream.getName() + "] instead"); + } + if (opType == DocWriteRequest.OpType.INDEX && writeRequest.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && + writeRequest.ifSeqNo() == UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("index request with op_type=index and no if_primary_term and if_seq_no set " + + "targeting backing indices is disallowed, target corresponding data stream [" + dataStream.getName() + "] instead"); + } + } + static boolean resolvePipelines(final DocWriteRequest originalRequest, final IndexRequest indexRequest, final Metadata metadata) { if (indexRequest.isPipelineResolved() == false) { final String requestPipeline = indexRequest.getPipeline(); @@ -459,6 +496,7 @@ public class TransportBulkAction extends HandledTransportAction TransportBulkAction.prohibitAppendWritesInBackingIndices(invalidRequest1, metadata)); + assertThat(e.getMessage(), equalTo("index request with op_type=create targeting backing indices is disallowed, " + + "target corresponding data stream [logs-foobar] instead")); + + // Testing index op against backing index fails: + IndexRequest invalidRequest2 = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.INDEX); + e = expectThrows(IllegalArgumentException.class, + () -> TransportBulkAction.prohibitAppendWritesInBackingIndices(invalidRequest2, metadata)); + assertThat(e.getMessage(), equalTo("index request with op_type=index and no if_primary_term and if_seq_no set " + + "targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead")); + + // Testing valid writes ops against a backing index: + DocWriteRequest validRequest = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.INDEX) + .setIfSeqNo(1).setIfPrimaryTerm(1); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + validRequest = new DeleteRequest(backingIndexName); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + validRequest = new UpdateRequest(backingIndexName, "_id"); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + + // Testing append only write via ds name + validRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + + validRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.INDEX); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + + // Append only for a backing index that doesn't exist is allowed: + validRequest = new IndexRequest(DataStream.getDefaultBackingIndexName("logs-barbaz", 1)) + .opType(DocWriteRequest.OpType.CREATE); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + + // Some other index names: + validRequest = new IndexRequest("my-index").opType(DocWriteRequest.OpType.CREATE); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + validRequest = new IndexRequest("foobar").opType(DocWriteRequest.OpType.CREATE); + TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index 4cbf49d9874..94786f99f4d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -134,6 +134,18 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase { equalTo("matching index template [template] for data stream [my-data-stream] has no data stream template")); } + public static ClusterState createDataStream(final String dataStreamName) throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); + ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStreamName + "*"), + null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate("@timestamp")); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metadata(Metadata.builder().put("template", template).build()) + .build(); + MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest req = + new MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO); + return MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); + } + private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception { MetadataCreateIndexService s = mock(MetadataCreateIndexService.class); when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))