diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java index 8ece311e042..cd661c61be3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/DataStreamIT.java @@ -659,6 +659,57 @@ public class DataStreamIT extends ESIntegTestCase { assertThat(getSettingsResponse.getSetting(backingIndex2, "index.number_of_replicas"), equalTo("0")); } + public void testIndexDocsWithCustomRoutingTargetingDataStreamIsNotAllowed() throws Exception { + putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*")); + + // Index doc that triggers creation of a data stream + String dataStream = "logs-foobar"; + IndexRequest indexRequest = new IndexRequest(dataStream).source("{}", XContentType.JSON).opType(DocWriteRequest.OpType.CREATE); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStream, 1))); + + // Index doc with custom routing that targets the data stream + IndexRequest indexRequestWithRouting = + new IndexRequest(dataStream).source("@timestamp", System.currentTimeMillis()).opType(DocWriteRequest.OpType.CREATE) + .routing("custom"); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> client().index(indexRequestWithRouting).actionGet()); + assertThat(exception.getMessage(), is("index request targeting data stream [logs-foobar] specifies a custom routing. target the " + + "backing indices directly or remove the custom routing.")); + + // Bulk indexing with custom routing targeting the data stream is also prohibited + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 10; i++) { + bulkRequest.add(new IndexRequest(dataStream) + .opType(DocWriteRequest.OpType.CREATE) + .routing("bulk-request-routing") + .source("{}", XContentType.JSON)); + } + + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + for (BulkItemResponse responseItem : bulkResponse.getItems()) { + assertThat(responseItem.getFailure(), notNullValue()); + assertThat(responseItem.getFailureMessage(), is("java.lang.IllegalArgumentException: index request targeting data stream " + + "[logs-foobar] specifies a custom routing. target the backing indices directly or remove the custom routing.")); + } + } + + public void testIndexDocsWithCustomRoutingTargetingBackingIndex() throws Exception { + putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*")); + + // Index doc that triggers creation of a data stream + IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType(DocWriteRequest.OpType.CREATE); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + + // Index doc with custom routing that targets the backing index + IndexRequest indexRequestWithRouting = new IndexRequest(DataStream.getDefaultBackingIndexName("logs-foobar", 1L)) + .source("@timestamp", System.currentTimeMillis()).opType(DocWriteRequest.OpType.INDEX).routing("custom") + .id(indexResponse.getId()).setIfPrimaryTerm(indexResponse.getPrimaryTerm()).setIfSeqNo(indexResponse.getSeqNo()); + IndexResponse response = client().index(indexRequestWithRouting).actionGet(); + assertThat(response.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + } + private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) { assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date")); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index fe21ec34084..e0fa03f2dce 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -313,6 +313,22 @@ public class TransportBulkAction extends HandledTransportAction writeRequest, Metadata metadata) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index()); + if (indexAbstraction == null) { + return; + } + if (indexAbstraction.getType() != IndexAbstraction.Type.DATA_STREAM) { + return; + } + + if (writeRequest.routing() != null) { + IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction; + throw new IllegalArgumentException("index request targeting data stream [" + dataStream.getName() + "] specifies a custom " + + "routing. target the backing indices directly or remove the custom routing."); + } + } + static boolean resolvePipelines(final DocWriteRequest originalRequest, final IndexRequest indexRequest, final Metadata metadata) { if (indexRequest.isPipelineResolved() == false) { final String requestPipeline = indexRequest.getPipeline(); @@ -497,6 +513,7 @@ public class TransportBulkAction extends HandledTransportAction writeRequestAgainstDataStream = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.INDEX) + .routing("custom"); + IllegalArgumentException exception = + expectThrows(IllegalArgumentException.class, () -> prohibitCustomRoutingOnDataStream(writeRequestAgainstDataStream, metadata)); + assertThat(exception.getMessage(), is("index request targeting data stream [logs-foobar] specifies a custom routing. target the " + + "backing indices directly or remove the custom routing.")); + + // test custom routing is allowed when the index request targets the backing index + DocWriteRequest writeRequestAgainstIndex = + new IndexRequest(DataStream.getDefaultBackingIndexName(dataStreamName, 1L)).opType(DocWriteRequest.OpType.INDEX) + .routing("custom"); + prohibitCustomRoutingOnDataStream(writeRequestAgainstIndex, metadata); + } }