This prohibits the use of a custom _routing when the index/bulk requests are targetting a data stream. Using a custom _routing when targetting a backing index is still permitted. Relates to #53100 (cherry picked from commit ece6b7a318a8bd3a010499189f31fc5e3a012d4f) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
3ba16e0f39
commit
f7dc09340b
|
@ -659,6 +659,57 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
assertThat(getSettingsResponse.getSetting(backingIndex2, "index.number_of_replicas"), equalTo("0"));
|
||||
}
|
||||
|
||||
public void testIndexDocsWithCustomRoutingTargetingDataStreamIsNotAllowed() throws Exception {
|
||||
putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*"));
|
||||
|
||||
// Index doc that triggers creation of a data stream
|
||||
String dataStream = "logs-foobar";
|
||||
IndexRequest indexRequest = new IndexRequest(dataStream).source("{}", XContentType.JSON).opType(DocWriteRequest.OpType.CREATE);
|
||||
IndexResponse indexResponse = client().index(indexRequest).actionGet();
|
||||
assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStream, 1)));
|
||||
|
||||
// Index doc with custom routing that targets the data stream
|
||||
IndexRequest indexRequestWithRouting =
|
||||
new IndexRequest(dataStream).source("@timestamp", System.currentTimeMillis()).opType(DocWriteRequest.OpType.CREATE)
|
||||
.routing("custom");
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> client().index(indexRequestWithRouting).actionGet());
|
||||
assertThat(exception.getMessage(), is("index request targeting data stream [logs-foobar] specifies a custom routing. target the " +
|
||||
"backing indices directly or remove the custom routing."));
|
||||
|
||||
// Bulk indexing with custom routing targeting the data stream is also prohibited
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
bulkRequest.add(new IndexRequest(dataStream)
|
||||
.opType(DocWriteRequest.OpType.CREATE)
|
||||
.routing("bulk-request-routing")
|
||||
.source("{}", XContentType.JSON));
|
||||
}
|
||||
|
||||
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
|
||||
for (BulkItemResponse responseItem : bulkResponse.getItems()) {
|
||||
assertThat(responseItem.getFailure(), notNullValue());
|
||||
assertThat(responseItem.getFailureMessage(), is("java.lang.IllegalArgumentException: index request targeting data stream " +
|
||||
"[logs-foobar] specifies a custom routing. target the backing indices directly or remove the custom routing."));
|
||||
}
|
||||
}
|
||||
|
||||
public void testIndexDocsWithCustomRoutingTargetingBackingIndex() throws Exception {
|
||||
putComposableIndexTemplate("id1", "@timestamp", List.of("logs-foo*"));
|
||||
|
||||
// Index doc that triggers creation of a data stream
|
||||
IndexRequest indexRequest = new IndexRequest("logs-foobar").source("{}", XContentType.JSON).opType(DocWriteRequest.OpType.CREATE);
|
||||
IndexResponse indexResponse = client().index(indexRequest).actionGet();
|
||||
assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
|
||||
|
||||
// Index doc with custom routing that targets the backing index
|
||||
IndexRequest indexRequestWithRouting = new IndexRequest(DataStream.getDefaultBackingIndexName("logs-foobar", 1L))
|
||||
.source("@timestamp", System.currentTimeMillis()).opType(DocWriteRequest.OpType.INDEX).routing("custom")
|
||||
.id(indexResponse.getId()).setIfPrimaryTerm(indexResponse.getPrimaryTerm()).setIfSeqNo(indexResponse.getSeqNo());
|
||||
IndexResponse response = client().index(indexRequestWithRouting).actionGet();
|
||||
assertThat(response.getIndex(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1)));
|
||||
}
|
||||
|
||||
private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) {
|
||||
assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date"));
|
||||
}
|
||||
|
|
|
@ -313,6 +313,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
}
|
||||
}
|
||||
|
||||
static void prohibitCustomRoutingOnDataStream(DocWriteRequest<?> writeRequest, Metadata metadata) {
|
||||
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index());
|
||||
if (indexAbstraction == null) {
|
||||
return;
|
||||
}
|
||||
if (indexAbstraction.getType() != IndexAbstraction.Type.DATA_STREAM) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (writeRequest.routing() != null) {
|
||||
IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction;
|
||||
throw new IllegalArgumentException("index request targeting data stream [" + dataStream.getName() + "] specifies a custom " +
|
||||
"routing. target the backing indices directly or remove the custom routing.");
|
||||
}
|
||||
}
|
||||
|
||||
static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final Metadata metadata) {
|
||||
if (indexRequest.isPipelineResolved() == false) {
|
||||
final String requestPipeline = indexRequest.getPipeline();
|
||||
|
@ -497,6 +513,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
case CREATE:
|
||||
case INDEX:
|
||||
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
|
||||
prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
|
||||
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
|
||||
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
|
||||
MappingMetadata mappingMd = indexMetadata.mappingOrDefault();
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.junit.Before;
|
|||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.action.bulk.TransportBulkAction.prohibitCustomRoutingOnDataStream;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamServiceTests.createDataStream;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -342,4 +343,23 @@ public class TransportBulkActionTests extends ESTestCase {
|
|||
TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
|
||||
}
|
||||
|
||||
public void testProhibitCustomRoutingOnDataStream() throws Exception {
|
||||
String dataStreamName = "logs-foobar";
|
||||
ClusterState clusterState = createDataStream(dataStreamName);
|
||||
Metadata metadata = clusterState.metadata();
|
||||
|
||||
// custom routing requests against the data stream are prohibited
|
||||
DocWriteRequest<?> writeRequestAgainstDataStream = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.INDEX)
|
||||
.routing("custom");
|
||||
IllegalArgumentException exception =
|
||||
expectThrows(IllegalArgumentException.class, () -> prohibitCustomRoutingOnDataStream(writeRequestAgainstDataStream, metadata));
|
||||
assertThat(exception.getMessage(), is("index request targeting data stream [logs-foobar] specifies a custom routing. target the " +
|
||||
"backing indices directly or remove the custom routing."));
|
||||
|
||||
// test custom routing is allowed when the index request targets the backing index
|
||||
DocWriteRequest<?> writeRequestAgainstIndex =
|
||||
new IndexRequest(DataStream.getDefaultBackingIndexName(dataStreamName, 1L)).opType(DocWriteRequest.OpType.INDEX)
|
||||
.routing("custom");
|
||||
prohibitCustomRoutingOnDataStream(writeRequestAgainstIndex, metadata);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue