Prohibit append-only writes targeting backing indices directly. (#58025)
Backport of #57788 to 7.x branch. Append-only writes can only target the corresponding data stream. Relates to #53100
This commit is contained in:
parent
93b693527a
commit
f4199f2ee0
|
@ -200,3 +200,59 @@ setup:
|
|||
catch: missing
|
||||
indices.get:
|
||||
index: ".ds-simple-data-stream1-000001"
|
||||
|
||||
---
|
||||
"append-only writes to backing indices prohobited":
|
||||
- skip:
|
||||
version: " - 7.9.99"
|
||||
reason: "enable in 7.9+ when backported"
|
||||
features: allowed_warnings
|
||||
|
||||
- do:
|
||||
allowed_warnings:
|
||||
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
|
||||
indices.put_index_template:
|
||||
name: generic_logs_template
|
||||
body:
|
||||
index_patterns: logs-*
|
||||
data_stream:
|
||||
timestamp_field: timestamp
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: logs-foobar
|
||||
body: { foo: bar }
|
||||
- match: { _index: .ds-logs-foobar-000001 }
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
index:
|
||||
index: .ds-logs-foobar-000001
|
||||
body: { foo: bar }
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
body:
|
||||
- create:
|
||||
_index: .ds-logs-foobar-000001
|
||||
- foo: bar
|
||||
- index:
|
||||
_index: .ds-logs-foobar-000001
|
||||
- foo: bar
|
||||
- create:
|
||||
_index: logs-foobar
|
||||
- foo: bar
|
||||
- match: { errors: true }
|
||||
- match: { items.0.create.status: 400 }
|
||||
- match: { items.0.create.error.type: illegal_argument_exception }
|
||||
- match: { items.0.create.error.reason: "index request with op_type=create targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead" }
|
||||
- match: { items.1.index.status: 400 }
|
||||
- match: { items.1.index.error.type: illegal_argument_exception }
|
||||
- match: { items.1.index.error.reason: "index request with op_type=index and no if_primary_term and if_seq_no set targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead" }
|
||||
- match: { items.2.create.result: created }
|
||||
- match: { items.2.create._index: .ds-logs-foobar-000001 }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
name: logs-foobar
|
||||
- is_true: acknowledged
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
|
@ -90,6 +91,8 @@ import java.util.function.LongSupplier;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||
|
||||
/**
|
||||
* Groups bulk request items by shard, optionally creating non-existent indices and
|
||||
|
@ -276,6 +279,40 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
}
|
||||
}
|
||||
|
||||
static void prohibitAppendWritesInBackingIndices(DocWriteRequest<?> writeRequest, Metadata metadata) {
|
||||
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index());
|
||||
if (indexAbstraction == null) {
|
||||
return;
|
||||
}
|
||||
if (indexAbstraction.getType() != IndexAbstraction.Type.CONCRETE_INDEX) {
|
||||
return;
|
||||
}
|
||||
if (indexAbstraction.getParentDataStream() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
|
||||
|
||||
// At this point with write op is targeting a backing index of a data stream directly,
|
||||
// so checking if write op is append-only and if so fail.
|
||||
// (Updates and deletes are allowed to target a backing index)
|
||||
|
||||
DocWriteRequest.OpType opType = writeRequest.opType();
|
||||
// CREATE op_type is considered append-only and
|
||||
// INDEX op_type is considered append-only when no if_primary_term and if_seq_no is specified.
|
||||
// (the latter maybe an update, but at this stage we can't determine that. In order to determine
|
||||
// that an engine level change is needed and for now this check is sufficient.)
|
||||
if (opType == DocWriteRequest.OpType.CREATE) {
|
||||
throw new IllegalArgumentException("index request with op_type=create targeting backing indices is disallowed, " +
|
||||
"target corresponding data stream [" + dataStream.getName() + "] instead");
|
||||
}
|
||||
if (opType == DocWriteRequest.OpType.INDEX && writeRequest.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM &&
|
||||
writeRequest.ifSeqNo() == UNASSIGNED_SEQ_NO) {
|
||||
throw new IllegalArgumentException("index request with op_type=index and no if_primary_term and if_seq_no set " +
|
||||
"targeting backing indices is disallowed, target corresponding data stream [" + dataStream.getName() + "] instead");
|
||||
}
|
||||
}
|
||||
|
||||
static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final Metadata metadata) {
|
||||
if (indexRequest.isPipelineResolved() == false) {
|
||||
final String requestPipeline = indexRequest.getPipeline();
|
||||
|
@ -459,6 +496,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
switch (docWriteRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
|
||||
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
|
||||
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
|
||||
MappingMetadata mappingMd = indexMetadata.mappingOrDefault();
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
|
@ -28,7 +29,9 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetadata;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
|
@ -53,6 +56,7 @@ import org.junit.Before;
|
|||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamServiceTests.createDataStream;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -290,4 +294,52 @@ public class TransportBulkActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testProhibitAppendWritesInBackingIndices() throws Exception {
|
||||
String dataStreamName = "logs-foobar";
|
||||
ClusterState clusterState = createDataStream(dataStreamName);
|
||||
Metadata metadata = clusterState.metadata();
|
||||
|
||||
// Testing create op against backing index fails:
|
||||
String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
|
||||
IndexRequest invalidRequest1 = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.CREATE);
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> TransportBulkAction.prohibitAppendWritesInBackingIndices(invalidRequest1, metadata));
|
||||
assertThat(e.getMessage(), equalTo("index request with op_type=create targeting backing indices is disallowed, " +
|
||||
"target corresponding data stream [logs-foobar] instead"));
|
||||
|
||||
// Testing index op against backing index fails:
|
||||
IndexRequest invalidRequest2 = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.INDEX);
|
||||
e = expectThrows(IllegalArgumentException.class,
|
||||
() -> TransportBulkAction.prohibitAppendWritesInBackingIndices(invalidRequest2, metadata));
|
||||
assertThat(e.getMessage(), equalTo("index request with op_type=index and no if_primary_term and if_seq_no set " +
|
||||
"targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead"));
|
||||
|
||||
// Testing valid writes ops against a backing index:
|
||||
DocWriteRequest<?> validRequest = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.INDEX)
|
||||
.setIfSeqNo(1).setIfPrimaryTerm(1);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
validRequest = new DeleteRequest(backingIndexName);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
validRequest = new UpdateRequest(backingIndexName, "_id");
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
|
||||
// Testing append only write via ds name
|
||||
validRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
|
||||
validRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.INDEX);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
|
||||
// Append only for a backing index that doesn't exist is allowed:
|
||||
validRequest = new IndexRequest(DataStream.getDefaultBackingIndexName("logs-barbaz", 1))
|
||||
.opType(DocWriteRequest.OpType.CREATE);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
|
||||
// Some other index names:
|
||||
validRequest = new IndexRequest("my-index").opType(DocWriteRequest.OpType.CREATE);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
validRequest = new IndexRequest("foobar").opType(DocWriteRequest.OpType.CREATE);
|
||||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -134,6 +134,18 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
|
|||
equalTo("matching index template [template] for data stream [my-data-stream] has no data stream template"));
|
||||
}
|
||||
|
||||
public static ClusterState createDataStream(final String dataStreamName) throws Exception {
|
||||
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
|
||||
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStreamName + "*"),
|
||||
null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.metadata(Metadata.builder().put("template", template).build())
|
||||
.build();
|
||||
MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest req =
|
||||
new MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
|
||||
return MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
|
||||
}
|
||||
|
||||
private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
|
||||
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
|
||||
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
|
||||
|
|
Loading…
Reference in New Issue