Support Data Streams in OpenSearch (#690)
This commit adds support for data streams by adding a DataStreamFieldMapper, and making timestamp field name configurable. Backwards compatibility is supported. Signed-off-by: Ketan Verma <ketan9495@gmail.com>
This commit is contained in:
parent
c2e816ecf6
commit
ce12097095
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.action.admin.indices.datastream;
|
||||
|
||||
import org.opensearch.common.collect.List;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class DataStreamIndexTemplateIT extends DataStreamTestCase {
|
||||
|
||||
public void testCreateDataStreamIndexTemplate() throws Exception {
|
||||
// Without the data stream metadata field mapper, data_stream would have been an unknown field in
|
||||
// the index template and would have thrown an error.
|
||||
createIndexTemplate(
|
||||
"demo-template",
|
||||
"{" +
|
||||
"\"index_patterns\": [ \"logs-*\" ]," +
|
||||
"\"data_stream\": { }" +
|
||||
"}"
|
||||
);
|
||||
|
||||
// Data stream index template with a custom timestamp field name.
|
||||
createIndexTemplate(
|
||||
"demo-template",
|
||||
"{" +
|
||||
"\"index_patterns\": [ \"logs-*\" ]," +
|
||||
"\"data_stream\": {" +
|
||||
"\"timestamp_field\": { \"name\": \"created_at\" }" +
|
||||
"}" +
|
||||
"}"
|
||||
);
|
||||
}
|
||||
|
||||
public void testDeleteIndexTemplate() throws Exception {
|
||||
createDataStreamIndexTemplate("demo-template", List.of("logs-*"));
|
||||
createDataStream("logs-demo");
|
||||
|
||||
// Index template deletion should fail if there is a data stream using it.
|
||||
ExecutionException exception = expectThrows(ExecutionException.class, () -> deleteIndexTemplate("demo-template"));
|
||||
assertThat(
|
||||
exception.getMessage(),
|
||||
containsString("unable to remove composable templates [demo-template] as they are in use by a data streams")
|
||||
);
|
||||
|
||||
// Index template can be deleted when all matching data streams are also deleted first.
|
||||
deleteDataStreams("logs-demo");
|
||||
deleteIndexTemplate("demo-template");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.action.admin.indices.datastream;
|
||||
|
||||
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
|
||||
import org.opensearch.cluster.metadata.DataStream;
|
||||
import org.opensearch.index.Index;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DataStreamRolloverIT extends DataStreamTestCase {
|
||||
|
||||
public void testDataStreamRollover() throws Exception {
|
||||
createDataStreamIndexTemplate("demo-template", Collections.singletonList("logs-*"));
|
||||
createDataStream("logs-demo");
|
||||
|
||||
DataStream dataStream;
|
||||
GetDataStreamAction.Response.DataStreamInfo dataStreamInfo;
|
||||
GetDataStreamAction.Response response;
|
||||
|
||||
// Data stream before a rollover.
|
||||
response = getDataStreams("logs-demo");
|
||||
dataStreamInfo = response.getDataStreams().get(0);
|
||||
assertThat(dataStreamInfo.getIndexTemplate(), equalTo("demo-template"));
|
||||
dataStream = dataStreamInfo.getDataStream();
|
||||
assertThat(dataStream.getGeneration(), equalTo(1L));
|
||||
assertThat(dataStream.getIndices().size(), equalTo(1));
|
||||
assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp")));
|
||||
assertThat(
|
||||
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()),
|
||||
containsInAnyOrder(".ds-logs-demo-000001")
|
||||
);
|
||||
|
||||
// Perform a rollover.
|
||||
RolloverResponse rolloverResponse = rolloverDataStream("logs-demo");
|
||||
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-demo-000001"));
|
||||
assertThat(rolloverResponse.getNewIndex(), equalTo(".ds-logs-demo-000002"));
|
||||
|
||||
// Data stream after a rollover.
|
||||
response = getDataStreams("logs-demo");
|
||||
dataStreamInfo = response.getDataStreams().get(0);
|
||||
assertThat(dataStreamInfo.getIndexTemplate(), equalTo("demo-template"));
|
||||
dataStream = dataStreamInfo.getDataStream();
|
||||
assertThat(dataStream.getGeneration(), equalTo(2L));
|
||||
assertThat(dataStream.getIndices().size(), equalTo(2));
|
||||
assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp")));
|
||||
assertThat(
|
||||
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()),
|
||||
containsInAnyOrder(".ds-logs-demo-000001", ".ds-logs-demo-000002")
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.action.admin.indices.datastream;
|
||||
|
||||
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
|
||||
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
|
||||
import org.opensearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
|
||||
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
|
||||
import org.opensearch.action.support.master.AcknowledgedResponse;
|
||||
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.opensearch.cluster.metadata.DataStream;
|
||||
import org.opensearch.cluster.metadata.Template;
|
||||
import org.opensearch.common.bytes.BytesArray;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.xcontent.XContentHelper;
|
||||
import org.opensearch.common.xcontent.XContentParser;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.test.OpenSearchIntegTestCase;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
|
||||
import static org.opensearch.test.OpenSearchIntegTestCase.Scope;
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 2)
|
||||
public class DataStreamTestCase extends OpenSearchIntegTestCase {
|
||||
|
||||
public AcknowledgedResponse createDataStream(String name) throws Exception {
|
||||
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name);
|
||||
AcknowledgedResponse response = client().admin().indices().createDataStream(request).get();
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
return response;
|
||||
}
|
||||
|
||||
public AcknowledgedResponse deleteDataStreams(String... names) throws Exception {
|
||||
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(names);
|
||||
AcknowledgedResponse response = client().admin().indices().deleteDataStream(request).get();
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
return response;
|
||||
}
|
||||
|
||||
public GetDataStreamAction.Response getDataStreams(String... names) throws Exception {
|
||||
GetDataStreamAction.Request request = new GetDataStreamAction.Request(names);
|
||||
return client().admin().indices().getDataStreams(request).get();
|
||||
}
|
||||
|
||||
public List<String> getDataStreamsNames(String... names) throws Exception {
|
||||
return getDataStreams(names)
|
||||
.getDataStreams()
|
||||
.stream()
|
||||
.map(dsInfo -> dsInfo.getDataStream().getName())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public DataStreamsStatsAction.Response getDataStreamsStats(String... names) throws Exception {
|
||||
DataStreamsStatsAction.Request request = new DataStreamsStatsAction.Request();
|
||||
request.indices(names);
|
||||
return client().execute(DataStreamsStatsAction.INSTANCE, request).get();
|
||||
}
|
||||
|
||||
public RolloverResponse rolloverDataStream(String name) throws Exception {
|
||||
RolloverRequest request = new RolloverRequest(name, null);
|
||||
RolloverResponse response = client().admin().indices().rolloverIndex(request).get();
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
assertThat(response.isRolledOver(), is(true));
|
||||
return response;
|
||||
}
|
||||
|
||||
public AcknowledgedResponse createDataStreamIndexTemplate(String name, List<String> indexPatterns) throws Exception {
|
||||
return createDataStreamIndexTemplate(name, indexPatterns, "@timestamp");
|
||||
}
|
||||
|
||||
public AcknowledgedResponse createDataStreamIndexTemplate(String name,
|
||||
List<String> indexPatterns,
|
||||
String timestampFieldName) throws Exception {
|
||||
ComposableIndexTemplate template = new ComposableIndexTemplate(
|
||||
indexPatterns,
|
||||
new Template(
|
||||
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1).build(),
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new ComposableIndexTemplate.DataStreamTemplate(new DataStream.TimestampField(timestampFieldName))
|
||||
);
|
||||
|
||||
return createIndexTemplate(name, template);
|
||||
}
|
||||
|
||||
public AcknowledgedResponse createIndexTemplate(String name, String jsonContent) throws Exception {
|
||||
XContentParser parser = XContentHelper.createParser(
|
||||
xContentRegistry(),
|
||||
null,
|
||||
new BytesArray(jsonContent),
|
||||
XContentType.JSON
|
||||
);
|
||||
|
||||
return createIndexTemplate(name, ComposableIndexTemplate.parse(parser));
|
||||
}
|
||||
|
||||
private AcknowledgedResponse createIndexTemplate(String name, ComposableIndexTemplate template) throws Exception {
|
||||
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(name);
|
||||
request.indexTemplate(template);
|
||||
AcknowledgedResponse response = client().execute(PutComposableIndexTemplateAction.INSTANCE, request).get();
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
return response;
|
||||
}
|
||||
|
||||
public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception {
|
||||
DeleteComposableIndexTemplateAction.Request request = new DeleteComposableIndexTemplateAction.Request(name);
|
||||
AcknowledgedResponse response = client().execute(DeleteComposableIndexTemplateAction.INSTANCE, request).get();
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.action.admin.indices.datastream;
|
||||
|
||||
import org.opensearch.action.DocWriteRequest;
|
||||
import org.opensearch.action.admin.indices.datastream.DataStreamsStatsAction.DataStreamStats;
|
||||
import org.opensearch.action.index.IndexRequest;
|
||||
import org.opensearch.action.index.IndexResponse;
|
||||
import org.opensearch.cluster.metadata.DataStream;
|
||||
import org.opensearch.common.collect.List;
|
||||
import org.opensearch.common.xcontent.XContentFactory;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.rest.RestStatus;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class DataStreamUsageIT extends DataStreamTestCase {
|
||||
|
||||
public void testDataStreamCrudAPIs() throws Exception {
|
||||
// Data stream creation without a matching index template should fail.
|
||||
ExecutionException exception = expectThrows(
|
||||
ExecutionException.class,
|
||||
() -> createDataStream("test-data-stream")
|
||||
);
|
||||
assertThat(exception.getMessage(), containsString("no matching index template found for data stream"));
|
||||
|
||||
// Create an index template for data streams.
|
||||
createDataStreamIndexTemplate("data-stream-template", List.of("logs-*", "metrics-*", "events"));
|
||||
|
||||
// Create multiple data streams matching the above index pattern.
|
||||
createDataStream("logs-dev");
|
||||
createDataStream("logs-prod");
|
||||
createDataStream("metrics-prod");
|
||||
createDataStream("events");
|
||||
ensureGreen();
|
||||
|
||||
// Get all data streams.
|
||||
assertThat(getDataStreamsNames(), containsInAnyOrder("logs-dev", "logs-prod", "metrics-prod", "events"));
|
||||
assertThat(getDataStreamsNames("*"), containsInAnyOrder("logs-dev", "logs-prod", "metrics-prod", "events"));
|
||||
|
||||
// Get data streams with and without wildcards.
|
||||
assertThat(getDataStreamsNames("logs-*", "events"), containsInAnyOrder("logs-dev", "logs-prod", "events"));
|
||||
|
||||
// Get data stream by name.
|
||||
GetDataStreamAction.Response response = getDataStreams("logs-prod");
|
||||
assertThat(response.getDataStreams().size(), equalTo(1));
|
||||
DataStream dataStream = response.getDataStreams().get(0).getDataStream();
|
||||
assertThat(dataStream.getName(), equalTo("logs-prod"));
|
||||
assertThat(dataStream.getIndices().size(), equalTo(1));
|
||||
assertThat(dataStream.getGeneration(), equalTo(1L));
|
||||
assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp")));
|
||||
|
||||
// Get data stream stats.
|
||||
DataStreamsStatsAction.Response stats = getDataStreamsStats("*");
|
||||
assertThat(stats.getTotalShards(), equalTo(16)); // 4 data streams, 1 backing index per stream, 2 shards, 1 replica
|
||||
assertThat(stats.getSuccessfulShards(), equalTo(16));
|
||||
assertThat(stats.getBackingIndices(), equalTo(4));
|
||||
assertThat(stats.getTotalStoreSize().getBytes(), greaterThan(0L));
|
||||
assertThat(stats.getDataStreams().length, equalTo(4));
|
||||
assertThat(
|
||||
Arrays.stream(stats.getDataStreams()).map(DataStreamStats::getDataStream).collect(Collectors.toList()),
|
||||
containsInAnyOrder("logs-dev", "logs-prod", "metrics-prod", "events")
|
||||
);
|
||||
|
||||
// Delete multiple data streams at once; with and without wildcards.
|
||||
deleteDataStreams("logs-*", "events");
|
||||
deleteDataStreams("metrics-prod");
|
||||
assertThat(getDataStreamsNames("*").size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testDataStreamIndexDocumentsDefaultTimestampField() throws Exception {
|
||||
assertDataStreamIndexDocuments("@timestamp");
|
||||
}
|
||||
|
||||
public void testDataStreamIndexDocumentsCustomTimestampField() throws Exception {
|
||||
assertDataStreamIndexDocuments("timestamp_" + randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
public void assertDataStreamIndexDocuments(String timestampFieldName) throws Exception {
|
||||
createDataStreamIndexTemplate("demo-template", List.of("logs-*"), timestampFieldName);
|
||||
createDataStream("logs-demo");
|
||||
|
||||
Exception exception;
|
||||
|
||||
// Only op_type=create requests should be allowed.
|
||||
exception = expectThrows(Exception.class, () -> index(
|
||||
new IndexRequest("logs-demo")
|
||||
.id("doc-1")
|
||||
.source("{}", XContentType.JSON)
|
||||
));
|
||||
assertThat(
|
||||
exception.getMessage(),
|
||||
containsString("only write ops with an op_type of create are allowed in data streams")
|
||||
);
|
||||
|
||||
// Documents must contain a valid timestamp field.
|
||||
exception = expectThrows(Exception.class, () -> index(
|
||||
new IndexRequest("logs-demo")
|
||||
.id("doc-1")
|
||||
.source("{}", XContentType.JSON)
|
||||
.opType(DocWriteRequest.OpType.CREATE)
|
||||
));
|
||||
assertThat(
|
||||
exception.getMessage(),
|
||||
containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type")
|
||||
);
|
||||
|
||||
// The timestamp field cannot have multiple values.
|
||||
exception = expectThrows(Exception.class, () -> index(
|
||||
new IndexRequest("logs-demo")
|
||||
.id("doc-1")
|
||||
.opType(DocWriteRequest.OpType.CREATE)
|
||||
.source(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.array(timestampFieldName, "2020-12-06T11:04:05.000Z", "2020-12-07T11:04:05.000Z")
|
||||
.field("message", "User registration successful")
|
||||
.endObject()
|
||||
)
|
||||
));
|
||||
assertThat(
|
||||
exception.getMessage(),
|
||||
containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type")
|
||||
);
|
||||
|
||||
// Successful case.
|
||||
IndexResponse response = index(
|
||||
new IndexRequest("logs-demo")
|
||||
.id("doc-1")
|
||||
.opType(DocWriteRequest.OpType.CREATE)
|
||||
.source(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.field(timestampFieldName, "2020-12-06T11:04:05.000Z")
|
||||
.field("message", "User registration successful")
|
||||
.endObject()
|
||||
)
|
||||
);
|
||||
assertThat(response.status(), equalTo(RestStatus.CREATED));
|
||||
assertThat(response.getId(), equalTo("doc-1"));
|
||||
assertThat(response.getIndex(), equalTo(".ds-logs-demo-000001"));
|
||||
|
||||
// Perform a rollover and ingest more documents.
|
||||
rolloverDataStream("logs-demo");
|
||||
response = index(
|
||||
new IndexRequest("logs-demo")
|
||||
.id("doc-2")
|
||||
.opType(DocWriteRequest.OpType.CREATE)
|
||||
.source(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.field(timestampFieldName, "2020-12-06T11:04:05.000Z")
|
||||
.field("message", "User registration successful")
|
||||
.endObject()
|
||||
)
|
||||
);
|
||||
assertThat(response.status(), equalTo(RestStatus.CREATED));
|
||||
assertThat(response.getId(), equalTo("doc-2"));
|
||||
assertThat(response.getIndex(), equalTo(".ds-logs-demo-000002"));
|
||||
}
|
||||
|
||||
private IndexResponse index(IndexRequest request) throws Exception {
|
||||
return client().index(request).get();
|
||||
}
|
||||
|
||||
}
|
|
@ -34,23 +34,14 @@ package org.opensearch.indices.template;
|
|||
|
||||
import org.opensearch.action.admin.indices.template.put.PutComponentTemplateAction;
|
||||
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
|
||||
|
||||
import org.opensearch.cluster.metadata.ComponentTemplate;
|
||||
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.opensearch.cluster.metadata.Template;
|
||||
import org.opensearch.common.bytes.BytesArray;
|
||||
import org.opensearch.common.collect.List;
|
||||
import org.opensearch.common.compress.CompressedXContent;
|
||||
import org.opensearch.common.xcontent.XContentHelper;
|
||||
import org.opensearch.common.xcontent.XContentParser;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.test.OpenSearchIntegTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ComposableTemplateIT extends OpenSearchIntegTestCase {
|
||||
|
||||
// See: https://github.com/elastic/elasticsearch/issues/58643
|
||||
|
@ -105,23 +96,4 @@ public class ComposableTemplateIT extends OpenSearchIntegTestCase {
|
|||
client().execute(PutComposableIndexTemplateAction.INSTANCE,
|
||||
new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(cit2)).get();
|
||||
}
|
||||
|
||||
public void testUsageOfDataStreamFails() throws IOException {
|
||||
// Exception that would happen if a unknown field is provided in a composable template:
|
||||
// The thrown exception will be used to compare against the exception that is thrown when providing
|
||||
// a composable template with a data stream definition.
|
||||
String content = "{\"index_patterns\":[\"logs-*-*\"],\"my_field\":\"bla\"}";
|
||||
XContentParser parser =
|
||||
XContentHelper.createParser(xContentRegistry(), null, new BytesArray(content), XContentType.JSON);
|
||||
Exception expectedException = expectThrows(Exception.class, () -> ComposableIndexTemplate.parse(parser));
|
||||
|
||||
ComposableIndexTemplate template = new ComposableIndexTemplate(List.of("logs-*-*"), null, null, null, null,
|
||||
null, new ComposableIndexTemplate.DataStreamTemplate());
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> client().execute(PutComposableIndexTemplateAction.INSTANCE,
|
||||
new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(template)).actionGet());
|
||||
Exception actualException = (Exception) e.getCause();
|
||||
assertThat(actualException.getMessage(),
|
||||
equalTo(expectedException.getMessage().replace("[1:32] ", "").replace("my_field", "data_stream")));
|
||||
assertThat(actualException.getMessage(), equalTo("[index_template] unknown field [data_stream]"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,8 +33,10 @@
|
|||
package org.opensearch.cluster.metadata;
|
||||
|
||||
import org.opensearch.LegacyESVersion;
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.cluster.AbstractDiffable;
|
||||
import org.opensearch.cluster.Diff;
|
||||
import org.opensearch.cluster.metadata.DataStream.TimestampField;
|
||||
import org.opensearch.common.Nullable;
|
||||
import org.opensearch.common.ParseField;
|
||||
import org.opensearch.common.Strings;
|
||||
|
@ -42,10 +44,10 @@ import org.opensearch.common.io.stream.StreamInput;
|
|||
import org.opensearch.common.io.stream.StreamOutput;
|
||||
import org.opensearch.common.io.stream.Writeable;
|
||||
import org.opensearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.opensearch.common.xcontent.ObjectParser;
|
||||
import org.opensearch.common.xcontent.ToXContentObject;
|
||||
import org.opensearch.common.xcontent.XContentBuilder;
|
||||
import org.opensearch.common.xcontent.XContentParser;
|
||||
import org.opensearch.index.mapper.DataStreamFieldMapper;
|
||||
import org.opensearch.index.mapper.MapperService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -55,7 +57,8 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.opensearch.cluster.metadata.DataStream.TimestampField.FIXED_TIMESTAMP_FIELD;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.opensearch.common.collect.Map.of;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -264,51 +267,84 @@ public class ComposableIndexTemplate extends AbstractDiffable<ComposableIndexTem
|
|||
|
||||
public static class DataStreamTemplate implements Writeable, ToXContentObject {
|
||||
|
||||
private static final ObjectParser<DataStreamTemplate, Void> PARSER = new ObjectParser<>(
|
||||
private static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
|
||||
|
||||
private static final ConstructingObjectParser<DataStreamTemplate, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"data_stream_template",
|
||||
DataStreamTemplate::new
|
||||
true,
|
||||
args -> new DataStreamTemplate((TimestampField) args[0])
|
||||
);
|
||||
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD);
|
||||
}
|
||||
|
||||
private final TimestampField timestampField;
|
||||
|
||||
public DataStreamTemplate() {
|
||||
this(DataStreamFieldMapper.Defaults.TIMESTAMP_FIELD);
|
||||
}
|
||||
|
||||
public String getTimestampField() {
|
||||
return FIXED_TIMESTAMP_FIELD;
|
||||
public DataStreamTemplate(TimestampField timestampField) {
|
||||
this.timestampField = timestampField;
|
||||
}
|
||||
|
||||
DataStreamTemplate(StreamInput in) {
|
||||
this();
|
||||
public DataStreamTemplate(StreamInput in) throws IOException {
|
||||
if (in.getVersion().onOrAfter(Version.V_1_0_0)) {
|
||||
this.timestampField = in.readOptionalWriteable(TimestampField::new);
|
||||
} else {
|
||||
this.timestampField = DataStreamFieldMapper.Defaults.TIMESTAMP_FIELD;
|
||||
}
|
||||
}
|
||||
|
||||
public TimestampField getTimestampField() {
|
||||
return timestampField == null ? DataStreamFieldMapper.Defaults.TIMESTAMP_FIELD : timestampField;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a mapping snippet for a backing index with `_data_stream_timestamp` meta field mapper properly configured.
|
||||
*/
|
||||
public Map<String, Object> getDataStreamMappingSnippet() {
|
||||
// _data_stream_timestamp meta fields default to @timestamp:
|
||||
return singletonMap(MapperService.SINGLE_MAPPING_NAME, singletonMap("_data_stream_timestamp",
|
||||
singletonMap("enabled", true)));
|
||||
return singletonMap(
|
||||
MapperService.SINGLE_MAPPING_NAME, singletonMap(
|
||||
"_data_stream_timestamp", unmodifiableMap(of(
|
||||
"enabled", true,
|
||||
"timestamp_field", getTimestampField().toMap()
|
||||
))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
if (out.getVersion().onOrAfter(Version.V_1_0_0)) {
|
||||
out.writeOptionalWriteable(timestampField);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
return builder
|
||||
.startObject()
|
||||
.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), getTimestampField())
|
||||
.endObject();
|
||||
}
|
||||
|
||||
public static DataStreamTemplate fromXContent(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
return o != null && getClass() == o.getClass();
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
DataStreamTemplate that = (DataStreamTemplate) o;
|
||||
return Objects.equals(timestampField, that.timestampField);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return DataStreamTemplate.class.hashCode();
|
||||
return Objects.hash(timestampField);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
|
||||
|
@ -219,12 +220,10 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
|
|||
|
||||
public static final class TimestampField implements Writeable, ToXContentObject {
|
||||
|
||||
public static final String FIXED_TIMESTAMP_FIELD = "@timestamp";
|
||||
|
||||
static ParseField NAME_FIELD = new ParseField("name");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<TimestampField, Void> PARSER = new ConstructingObjectParser<>(
|
||||
public static final ConstructingObjectParser<TimestampField, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"timestamp_field",
|
||||
args -> new TimestampField((String) args[0])
|
||||
);
|
||||
|
@ -236,14 +235,11 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
|
|||
private final String name;
|
||||
|
||||
public TimestampField(String name) {
|
||||
if (FIXED_TIMESTAMP_FIELD.equals(name) == false) {
|
||||
throw new IllegalArgumentException("unexpected timestamp field [" + name + "]");
|
||||
}
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public TimestampField(StreamInput in) throws IOException {
|
||||
this(in.readString());
|
||||
this.name = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -259,6 +255,10 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
|
|||
return builder;
|
||||
}
|
||||
|
||||
public Map<String, Object> toMap() {
|
||||
return Collections.singletonMap(NAME_FIELD.getPreferredName(), name);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
|
|
@ -175,7 +175,7 @@ public class MetadataCreateDataStreamService {
|
|||
assert firstBackingIndex != null;
|
||||
assert firstBackingIndex.mapping() != null : "no mapping found for backing index [" + firstBackingIndexName + "]";
|
||||
|
||||
String fieldName = template.getDataStreamTemplate().getTimestampField();
|
||||
String fieldName = template.getDataStreamTemplate().getTimestampField().getName();
|
||||
DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
|
||||
DataStream newDataStream = new DataStream(request.name, timestampField,
|
||||
Collections.singletonList(firstBackingIndex.getIndex()));
|
||||
|
@ -197,7 +197,7 @@ public class MetadataCreateDataStreamService {
|
|||
return composableIndexTemplate;
|
||||
}
|
||||
|
||||
public static void validateTimestampFieldMapping(String timestampFieldName, MapperService mapperService) throws IOException {
|
||||
public static void validateTimestampFieldMapping(MapperService mapperService) throws IOException {
|
||||
MetadataFieldMapper fieldMapper =
|
||||
(MetadataFieldMapper) mapperService.documentMapper().mappers().getMapper("_data_stream_timestamp");
|
||||
assert fieldMapper != null : "[_data_stream_timestamp] meta field mapper must exist";
|
||||
|
|
|
@ -1000,7 +1000,7 @@ public class MetadataCreateIndexService {
|
|||
indexService.getIndexSortSupplier().get();
|
||||
}
|
||||
if (request.dataStreamName() != null) {
|
||||
MetadataCreateDataStreamService.validateTimestampFieldMapping("@timestamp", mapperService);
|
||||
MetadataCreateDataStreamService.validateTimestampFieldMapping(mapperService);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ import org.opensearch.common.util.set.Sets;
|
|||
import org.opensearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.opensearch.common.xcontent.XContentBuilder;
|
||||
import org.opensearch.common.xcontent.XContentFactory;
|
||||
import org.opensearch.common.xcontent.XContentParseException;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.IndexService;
|
||||
|
@ -101,16 +100,6 @@ import static org.opensearch.indices.cluster.IndicesClusterStateService.Allocate
|
|||
*/
|
||||
public class MetadataIndexTemplateService {
|
||||
|
||||
public static final String DEFAULT_TIMESTAMP_FIELD = "@timestamp";
|
||||
public static final String DEFAULT_TIMESTAMP_MAPPING = "{\n" +
|
||||
" \"_doc\": {\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"@timestamp\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
private static final Logger logger = LogManager.getLogger(MetadataIndexTemplateService.class);
|
||||
private final ClusterService clusterService;
|
||||
private final AliasValidator aliasValidator;
|
||||
|
@ -959,15 +948,16 @@ public class MetadataIndexTemplateService {
|
|||
.map(Template::mappings)
|
||||
.ifPresent(mappings::add);
|
||||
if (template.getDataStreamTemplate() != null && indexName.startsWith(DataStream.BACKING_INDEX_PREFIX)) {
|
||||
// add a default mapping for the `@timestamp` field, at the lowest precedence, to make bootstrapping data streams more
|
||||
// add a default mapping for the timestamp field, at the lowest precedence, to make bootstrapping data streams more
|
||||
// straightforward as all backing indices are required to have a timestamp field
|
||||
mappings.add(0, new CompressedXContent(DEFAULT_TIMESTAMP_MAPPING));
|
||||
String timestampFieldName = template.getDataStreamTemplate().getTimestampField().getName();
|
||||
mappings.add(0, new CompressedXContent(getTimestampFieldMapping(timestampFieldName)));
|
||||
}
|
||||
|
||||
// Only include _timestamp mapping snippet if creating backing index.
|
||||
// Only include timestamp mapping snippet if creating backing index.
|
||||
if (indexName.startsWith(DataStream.BACKING_INDEX_PREFIX)) {
|
||||
// Only if template has data stream definition this should be added and
|
||||
// adding this template last, since _timestamp field should have highest precedence:
|
||||
// adding this template last, since timestamp field should have highest precedence:
|
||||
Optional.ofNullable(template.getDataStreamTemplate())
|
||||
.map(ComposableIndexTemplate.DataStreamTemplate::getDataStreamMappingSnippet)
|
||||
.map(mapping -> {
|
||||
|
@ -983,6 +973,22 @@ public class MetadataIndexTemplateService {
|
|||
return Collections.unmodifiableList(mappings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default mapping snippet for the timestamp field by configuring it as a 'date' type.
|
||||
* This is added at the lowest precedence to allow users to override this mapping.
|
||||
*/
|
||||
private static String getTimestampFieldMapping(String timestampFieldName) {
|
||||
return "{\n" +
|
||||
" \"_doc\": {\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"" + timestampFieldName + "\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve index settings for the given list of v1 templates, templates are apply in reverse
|
||||
* order since they should be provided in order of priority/order
|
||||
|
@ -1129,16 +1135,6 @@ public class MetadataIndexTemplateService {
|
|||
String indexName = DataStream.BACKING_INDEX_PREFIX + temporaryIndexName;
|
||||
// Parse mappings to ensure they are valid after being composed
|
||||
|
||||
if (template.getDataStreamTemplate() != null) {
|
||||
// If there is no _data_stream meta field mapper and a data stream should be created then
|
||||
// fail as if the data_stream field can't be parsed:
|
||||
if (tempIndexService.mapperService().isMetadataField("_data_stream_timestamp") == false) {
|
||||
// Fail like a parsing expection, since we will be moving data_stream template out of server module and
|
||||
// then we would fail with the same error message, like we do here.
|
||||
throw new XContentParseException("[index_template] unknown field [data_stream]");
|
||||
}
|
||||
}
|
||||
|
||||
List<CompressedXContent> mappings = collectMappings(stateWithIndex, templateName, indexName);
|
||||
try {
|
||||
MapperService mapperService = tempIndexService.mapperService();
|
||||
|
@ -1147,8 +1143,7 @@ public class MetadataIndexTemplateService {
|
|||
}
|
||||
|
||||
if (template.getDataStreamTemplate() != null) {
|
||||
String tsFieldName = template.getDataStreamTemplate().getTimestampField();
|
||||
validateTimestampFieldMapping(tsFieldName, mapperService);
|
||||
validateTimestampFieldMapping(mapperService);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("invalid composite mappings for [" + templateName + "]", e);
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.index.mapper;
|
||||
|
||||
import org.apache.lucene.index.DocValuesType;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.opensearch.cluster.metadata.DataStream.TimestampField;
|
||||
import org.opensearch.index.mapper.ParseContext.Document;
|
||||
import org.opensearch.index.query.QueryShardContext;
|
||||
import org.opensearch.search.lookup.SearchLookup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DataStreamFieldMapper extends MetadataFieldMapper {
|
||||
|
||||
public static final String NAME = "_data_stream_timestamp";
|
||||
public static final String CONTENT_TYPE = "_data_stream_timestamp";
|
||||
|
||||
public static final class Defaults {
|
||||
public static final boolean ENABLED = false;
|
||||
public static final TimestampField TIMESTAMP_FIELD = new TimestampField("@timestamp");
|
||||
}
|
||||
|
||||
public static final class Builder extends MetadataFieldMapper.Builder {
|
||||
final Parameter<Boolean> enabledParam = Parameter.boolParam(
|
||||
"enabled",
|
||||
false,
|
||||
mapper -> toType(mapper).enabled,
|
||||
Defaults.ENABLED
|
||||
);
|
||||
|
||||
final Parameter<TimestampField> timestampFieldParam = new Parameter<>(
|
||||
"timestamp_field",
|
||||
false,
|
||||
() -> Defaults.TIMESTAMP_FIELD,
|
||||
(n, c, o) -> new TimestampField((String) ((Map<?, ?>) o).get("name")),
|
||||
mapper -> toType(mapper).timestampField
|
||||
);
|
||||
|
||||
protected Builder() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Parameter<?>> getParameters() {
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
enabledParam,
|
||||
timestampFieldParam
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetadataFieldMapper build(BuilderContext context) {
|
||||
return new DataStreamFieldMapper(enabledParam.getValue(), timestampFieldParam.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DataStreamFieldType extends MappedFieldType {
|
||||
public static final DataStreamFieldType INSTANCE = new DataStreamFieldType();
|
||||
|
||||
private DataStreamFieldType() {
|
||||
super(NAME, false, false, false, TextSearchInfo.NONE, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueFetcher valueFetcher(MapperService mapperService, SearchLookup searchLookup, String format) {
|
||||
throw new UnsupportedOperationException("Cannot fetch values for internal field [" + typeName() + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, QueryShardContext context) {
|
||||
throw new UnsupportedOperationException("Cannot run term query on internal field [" + typeName() + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
throw new UnsupportedOperationException("Cannot run exists query on internal field [" + typeName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public static final TypeParser PARSER = new ConfigurableTypeParser(
|
||||
context -> new DataStreamFieldMapper(Defaults.ENABLED, Defaults.TIMESTAMP_FIELD),
|
||||
context -> new Builder()
|
||||
);
|
||||
|
||||
@Override
|
||||
public ParametrizedFieldMapper.Builder getMergeBuilder() {
|
||||
return new Builder().init(this);
|
||||
}
|
||||
|
||||
private static DataStreamFieldMapper toType(FieldMapper in) {
|
||||
return (DataStreamFieldMapper) in;
|
||||
}
|
||||
|
||||
private final boolean enabled;
|
||||
private final TimestampField timestampField;
|
||||
|
||||
protected DataStreamFieldMapper(boolean enabled, TimestampField timestampField) {
|
||||
super(DataStreamFieldType.INSTANCE);
|
||||
this.enabled = enabled;
|
||||
this.timestampField = timestampField;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String contentType() {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postParse(ParseContext context) throws IOException {
|
||||
// If _data_stream_timestamp metadata mapping is disabled, then skip all the remaining checks.
|
||||
if (enabled == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
// It is expected that the timestamp field will be parsed by the DateFieldMapper during the parseCreateField step.
|
||||
// The parsed field will be added to the document as:
|
||||
// 1. LongPoint (indexed = true; an indexed long field to allow fast range filters on the timestamp field value)
|
||||
// 2. SortedNumericDocValuesField (hasDocValues = true; allows sorting, aggregations and access to the timestamp field value)
|
||||
|
||||
Document document = context.doc();
|
||||
IndexableField[] fields = document.getFields(timestampField.getName());
|
||||
|
||||
// Documents must contain exactly one value for the timestamp field.
|
||||
long numTimestampValues = Arrays.stream(fields)
|
||||
.filter(field -> field.fieldType().docValuesType() == DocValuesType.SORTED_NUMERIC)
|
||||
.count();
|
||||
|
||||
if (numTimestampValues != 1) {
|
||||
throw new IllegalArgumentException(
|
||||
"documents must contain a single-valued timestamp field '" + timestampField.getName() + "' of date type"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -45,6 +45,7 @@ import org.opensearch.common.xcontent.NamedXContentRegistry;
|
|||
import org.opensearch.index.mapper.BinaryFieldMapper;
|
||||
import org.opensearch.index.mapper.BooleanFieldMapper;
|
||||
import org.opensearch.index.mapper.CompletionFieldMapper;
|
||||
import org.opensearch.index.mapper.DataStreamFieldMapper;
|
||||
import org.opensearch.index.mapper.DateFieldMapper;
|
||||
import org.opensearch.index.mapper.FieldAliasMapper;
|
||||
import org.opensearch.index.mapper.FieldNamesFieldMapper;
|
||||
|
@ -171,6 +172,7 @@ public class IndicesModule extends AbstractModule {
|
|||
builtInMetadataMappers.put(IdFieldMapper.NAME, IdFieldMapper.PARSER);
|
||||
builtInMetadataMappers.put(RoutingFieldMapper.NAME, RoutingFieldMapper.PARSER);
|
||||
builtInMetadataMappers.put(IndexFieldMapper.NAME, IndexFieldMapper.PARSER);
|
||||
builtInMetadataMappers.put(DataStreamFieldMapper.NAME, DataStreamFieldMapper.PARSER);
|
||||
builtInMetadataMappers.put(SourceFieldMapper.NAME, SourceFieldMapper.PARSER);
|
||||
builtInMetadataMappers.put(TypeFieldMapper.NAME, TypeFieldMapper.PARSER);
|
||||
builtInMetadataMappers.put(VersionFieldMapper.NAME, VersionFieldMapper.PARSER);
|
||||
|
|
|
@ -63,12 +63,12 @@ public class AutoCreateActionTests extends OpenSearchTestCase {
|
|||
CreateIndexRequest request = new CreateIndexRequest("logs-foobar");
|
||||
DataStreamTemplate result = AutoCreateAction.resolveAutoCreateDataStream(request, metadata);
|
||||
assertThat(result, notNullValue());
|
||||
assertThat(result.getTimestampField(), equalTo("@timestamp"));
|
||||
assertThat(result.getTimestampField().getName(), equalTo("@timestamp"));
|
||||
|
||||
request = new CreateIndexRequest("logs-barbaz");
|
||||
result = AutoCreateAction.resolveAutoCreateDataStream(request, metadata);
|
||||
assertThat(result, notNullValue());
|
||||
assertThat(result.getTimestampField(), equalTo("@timestamp"));
|
||||
assertThat(result.getTimestampField().getName(), equalTo("@timestamp"));
|
||||
|
||||
// An index that matches with a template without a data steam definition
|
||||
request = new CreateIndexRequest("legacy-logs-foobaz");
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.cluster.metadata;
|
||||
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate;
|
||||
import org.opensearch.common.io.stream.BytesStreamOutput;
|
||||
import org.opensearch.common.io.stream.StreamInput;
|
||||
import org.opensearch.common.io.stream.Writeable;
|
||||
import org.opensearch.common.xcontent.XContentParser;
|
||||
import org.opensearch.test.AbstractSerializingTestCase;
|
||||
import org.opensearch.test.VersionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DataStreamTemplateTests extends AbstractSerializingTestCase<DataStreamTemplate> {
|
||||
|
||||
@Override
|
||||
protected DataStreamTemplate doParseInstance(XContentParser parser) throws IOException {
|
||||
return DataStreamTemplate.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<DataStreamTemplate> instanceReader() {
|
||||
return DataStreamTemplate::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataStreamTemplate createTestInstance() {
|
||||
return new DataStreamTemplate(new DataStream.TimestampField("timestamp_" + randomAlphaOfLength(5)));
|
||||
}
|
||||
|
||||
public void testBackwardCompatibleSerialization() throws Exception {
|
||||
Version version = VersionUtils.getPreviousVersion(Version.V_1_0_0);
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(version);
|
||||
|
||||
DataStreamTemplate outTemplate = new DataStreamTemplate();
|
||||
outTemplate.writeTo(out);
|
||||
assertThat(out.size(), equalTo(0));
|
||||
|
||||
StreamInput in = out.bytes().streamInput();
|
||||
in.setVersion(version);
|
||||
DataStreamTemplate inTemplate = new DataStreamTemplate(in);
|
||||
|
||||
assertThat(inTemplate, equalTo(outTemplate));
|
||||
}
|
||||
|
||||
}
|
|
@ -32,7 +32,6 @@
|
|||
|
||||
package org.opensearch.cluster.metadata;
|
||||
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.action.ActionListener;
|
||||
import org.opensearch.action.admin.indices.alias.Alias;
|
||||
|
@ -52,29 +51,17 @@ import org.opensearch.common.xcontent.XContentParser;
|
|||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.env.Environment;
|
||||
import org.opensearch.index.Index;
|
||||
import org.opensearch.index.mapper.FieldMapper;
|
||||
import org.opensearch.index.mapper.MappedFieldType;
|
||||
import org.opensearch.index.mapper.Mapper;
|
||||
import org.opensearch.index.mapper.MapperParsingException;
|
||||
import org.opensearch.index.mapper.MapperService;
|
||||
import org.opensearch.index.mapper.MetadataFieldMapper;
|
||||
import org.opensearch.index.mapper.ParametrizedFieldMapper;
|
||||
import org.opensearch.index.mapper.TextSearchInfo;
|
||||
import org.opensearch.index.mapper.ValueFetcher;
|
||||
import org.opensearch.index.query.QueryShardContext;
|
||||
import org.opensearch.indices.IndexTemplateMissingException;
|
||||
import org.opensearch.indices.IndicesService;
|
||||
import org.opensearch.indices.InvalidIndexTemplateException;
|
||||
import org.opensearch.indices.SystemIndices;
|
||||
import org.opensearch.plugins.MapperPlugin;
|
||||
import org.opensearch.plugins.Plugin;
|
||||
import org.opensearch.search.lookup.SearchLookup;
|
||||
import org.opensearch.test.OpenSearchSingleNodeTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -88,10 +75,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
|
||||
import static org.opensearch.common.settings.Settings.builder;
|
||||
import static org.opensearch.index.mapper.ParametrizedFieldMapper.Parameter;
|
||||
import static org.opensearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.containsStringIgnoringCase;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
@ -102,14 +85,12 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
|
|||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.matchesRegex;
|
||||
import static org.opensearch.common.settings.Settings.builder;
|
||||
import static org.opensearch.index.mapper.DataStreamFieldMapper.Defaults.TIMESTAMP_FIELD;
|
||||
import static org.opensearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
|
||||
|
||||
public class MetadataIndexTemplateServiceTests extends OpenSearchSingleNodeTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Collections.singletonList(DummyPlugin.class);
|
||||
}
|
||||
|
||||
public void testIndexTemplateInvalidNumberOfShards() {
|
||||
PutRequest request = new PutRequest("test", "test_shards");
|
||||
request.patterns(singletonList("test_shards*"));
|
||||
|
@ -886,7 +867,7 @@ public class MetadataIndexTemplateServiceTests extends OpenSearchSingleNodeTestC
|
|||
|
||||
Map<String, Object> firstParsedMapping = org.opensearch.common.collect.Map.of("_doc",
|
||||
org.opensearch.common.collect.Map.of("properties",
|
||||
org.opensearch.common.collect.Map.of(DEFAULT_TIMESTAMP_FIELD,
|
||||
org.opensearch.common.collect.Map.of(TIMESTAMP_FIELD.getName(),
|
||||
org.opensearch.common.collect.Map.of("type", "date"))));
|
||||
assertThat(parsedMappings.get(0), equalTo(firstParsedMapping));
|
||||
|
||||
|
@ -1024,13 +1005,13 @@ public class MetadataIndexTemplateServiceTests extends OpenSearchSingleNodeTestC
|
|||
|
||||
Map<String, Object> firstMapping = org.opensearch.common.collect.Map.of("_doc",
|
||||
org.opensearch.common.collect.Map.of("properties",
|
||||
org.opensearch.common.collect.Map.of(DEFAULT_TIMESTAMP_FIELD,
|
||||
org.opensearch.common.collect.Map.of(TIMESTAMP_FIELD.getName(),
|
||||
org.opensearch.common.collect.Map.of("type", "date"))));
|
||||
assertThat(parsedMappings.get(0), equalTo(firstMapping));
|
||||
|
||||
Map<String, Object> secondMapping = org.opensearch.common.collect.Map.of("_doc",
|
||||
org.opensearch.common.collect.Map.of("properties",
|
||||
org.opensearch.common.collect.Map.of(DEFAULT_TIMESTAMP_FIELD,
|
||||
org.opensearch.common.collect.Map.of(TIMESTAMP_FIELD.getName(),
|
||||
org.opensearch.common.collect.Map.of("type", "date_nanos"))));
|
||||
assertThat(parsedMappings.get(1), equalTo(secondMapping));
|
||||
}
|
||||
|
@ -1068,13 +1049,13 @@ public class MetadataIndexTemplateServiceTests extends OpenSearchSingleNodeTestC
|
|||
.collect(Collectors.toList());
|
||||
Map<String, Object> firstMapping = org.opensearch.common.collect.Map.of("_doc",
|
||||
org.opensearch.common.collect.Map.of("properties",
|
||||
org.opensearch.common.collect.Map.of(DEFAULT_TIMESTAMP_FIELD,
|
||||
org.opensearch.common.collect.Map.of(TIMESTAMP_FIELD.getName(),
|
||||
org.opensearch.common.collect.Map.of("type", "date"))));
|
||||
assertThat(parsedMappings.get(0), equalTo(firstMapping));
|
||||
|
||||
Map<String, Object> secondMapping = org.opensearch.common.collect.Map.of("_doc",
|
||||
org.opensearch.common.collect.Map.of("properties",
|
||||
org.opensearch.common.collect.Map.of(DEFAULT_TIMESTAMP_FIELD,
|
||||
org.opensearch.common.collect.Map.of(TIMESTAMP_FIELD.getName(),
|
||||
org.opensearch.common.collect.Map.of("type", "date_nanos"))));
|
||||
assertThat(parsedMappings.get(1), equalTo(secondMapping));
|
||||
}
|
||||
|
@ -1558,80 +1539,4 @@ public class MetadataIndexTemplateServiceTests extends OpenSearchSingleNodeTestC
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Composable index template with data_stream definition need _timestamp meta field mapper,
|
||||
// this is a dummy impl, so that tests don't fail with the fact that the _timestamp field can't be found.
|
||||
// (tests using this dummy impl doesn't test the _timestamp validation, but need it to tests other functionality)
|
||||
public static class DummyPlugin extends Plugin implements MapperPlugin {
|
||||
|
||||
@Override
|
||||
public Map<String, MetadataFieldMapper.TypeParser> getMetadataMappers() {
|
||||
return Collections.singletonMap("_data_stream_timestamp", new MetadataFieldMapper.ConfigurableTypeParser(
|
||||
c -> new MetadataTimestampFieldMapper(false),
|
||||
c -> new MetadataTimestampFieldBuilder())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static MetadataTimestampFieldMapper toType(FieldMapper in) {
|
||||
return (MetadataTimestampFieldMapper) in;
|
||||
}
|
||||
|
||||
public static class MetadataTimestampFieldBuilder extends MetadataFieldMapper.Builder {
|
||||
|
||||
private final Parameter<Boolean> enabled = Parameter.boolParam("enabled", true, m -> toType(m).enabled, false);
|
||||
|
||||
protected MetadataTimestampFieldBuilder() {
|
||||
super("_data_stream_timestamp");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<ParametrizedFieldMapper.Parameter<?>> getParameters() {
|
||||
return Collections.singletonList(enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetadataFieldMapper build(Mapper.BuilderContext context) {
|
||||
return new MetadataTimestampFieldMapper(enabled.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
public static class MetadataTimestampFieldMapper extends MetadataFieldMapper {
|
||||
final boolean enabled;
|
||||
|
||||
public MetadataTimestampFieldMapper(boolean enabled) {
|
||||
super(new MappedFieldType("_data_stream_timestamp", false, false, false, TextSearchInfo.NONE, Collections.emptyMap()) {
|
||||
@Override
|
||||
public ValueFetcher valueFetcher(MapperService mapperService, SearchLookup searchLookup, String format) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return "_data_stream_timestamp";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParametrizedFieldMapper.Builder getMergeBuilder() {
|
||||
return new MetadataTimestampFieldBuilder().init(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String contentType() {
|
||||
return "_data_stream_timestamp";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.index.mapper;
|
||||
|
||||
import org.opensearch.common.Strings;
|
||||
import org.opensearch.common.bytes.BytesReference;
|
||||
import org.opensearch.common.compress.CompressedXContent;
|
||||
import org.opensearch.common.xcontent.XContentFactory;
|
||||
import org.opensearch.common.xcontent.XContentType;
|
||||
import org.opensearch.test.OpenSearchSingleNodeTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class DataStreamFieldMapperTests extends OpenSearchSingleNodeTestCase {
|
||||
|
||||
public void testDefaultTimestampField() throws Exception {
|
||||
String mapping = Strings.toString(XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.startObject("_doc")
|
||||
.startObject("_data_stream_timestamp")
|
||||
.field("enabled", true)
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject());
|
||||
|
||||
assertDataStreamFieldMapper(mapping, "@timestamp");
|
||||
}
|
||||
|
||||
public void testCustomTimestampField() throws Exception {
|
||||
String timestampFieldName = "timestamp_" + randomAlphaOfLength(5);
|
||||
|
||||
String mapping = Strings.toString(XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.startObject("_doc")
|
||||
.startObject("_data_stream_timestamp")
|
||||
.field("enabled", true)
|
||||
.startObject("timestamp_field")
|
||||
.field("name", timestampFieldName)
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject());
|
||||
|
||||
assertDataStreamFieldMapper(mapping, timestampFieldName);
|
||||
}
|
||||
|
||||
public void testDeeplyNestedCustomTimestampField() throws Exception {
|
||||
String mapping = Strings.toString(XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.startObject("_doc")
|
||||
.startObject("_data_stream_timestamp")
|
||||
.field("enabled", true)
|
||||
.startObject("timestamp_field")
|
||||
.field("name", "event.meta.created_at")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject());
|
||||
|
||||
DocumentMapper mapper = createIndex("test")
|
||||
.mapperService()
|
||||
.merge("_doc", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE);
|
||||
|
||||
ParsedDocument doc = mapper.parse(new SourceToParse("test", "_doc", "1", BytesReference.bytes(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.startObject("event")
|
||||
.startObject("meta")
|
||||
.field("created_at", "2020-12-06T11:04:05.000Z")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
), XContentType.JSON));
|
||||
assertThat(doc.rootDoc().getFields("event.meta.created_at").length, equalTo(2));
|
||||
|
||||
MapperException exception = expectThrows(MapperException.class, () -> {
|
||||
mapper.parse(new SourceToParse("test", "_doc", "3", BytesReference.bytes(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.startObject("event")
|
||||
.startObject("meta")
|
||||
.array("created_at", "2020-12-06T11:04:05.000Z", "2020-12-07T11:04:05.000Z")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject()
|
||||
), XContentType.JSON));
|
||||
});
|
||||
assertThat(
|
||||
exception.getCause().getMessage(),
|
||||
containsString("documents must contain a single-valued timestamp field 'event.meta.created_at' of date type")
|
||||
);
|
||||
}
|
||||
|
||||
private void assertDataStreamFieldMapper(String mapping, String timestampFieldName) throws Exception {
|
||||
DocumentMapper mapper = createIndex("test")
|
||||
.mapperService()
|
||||
.merge("_doc", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE);
|
||||
|
||||
// Success case - document has timestamp field correctly populated.
|
||||
ParsedDocument doc = mapper.parse(new SourceToParse("test", "_doc", "1", BytesReference.bytes(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.field(timestampFieldName, "2020-12-06T11:04:05.000Z")
|
||||
.endObject()
|
||||
), XContentType.JSON));
|
||||
|
||||
// A valid timestamp field will be parsed as LongPoint and SortedNumericDocValuesField.
|
||||
assertThat(doc.rootDoc().getFields(timestampFieldName).length, equalTo(2));
|
||||
|
||||
MapperException exception;
|
||||
|
||||
// Failure case - document doesn't have a valid timestamp field.
|
||||
exception = expectThrows(MapperException.class, () -> {
|
||||
mapper.parse(new SourceToParse("test", "_doc", "2", BytesReference.bytes(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.field("invalid-field-name", "2020-12-06T11:04:05.000Z")
|
||||
.endObject()
|
||||
), XContentType.JSON));
|
||||
});
|
||||
assertThat(
|
||||
exception.getCause().getMessage(),
|
||||
containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type")
|
||||
);
|
||||
|
||||
// Failure case - document contains multiple values for the timestamp field.
|
||||
exception = expectThrows(MapperException.class, () -> {
|
||||
mapper.parse(new SourceToParse("test", "_doc", "3", BytesReference.bytes(
|
||||
XContentFactory
|
||||
.jsonBuilder()
|
||||
.startObject()
|
||||
.array(timestampFieldName, "2020-12-06T11:04:05.000Z", "2020-12-07T11:04:05.000Z")
|
||||
.endObject()
|
||||
), XContentType.JSON));
|
||||
});
|
||||
assertThat(
|
||||
exception.getCause().getMessage(),
|
||||
containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type")
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -330,7 +330,7 @@ public class MapperServiceTests extends OpenSearchSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testFieldNameLengthLimit() throws Throwable {
|
||||
int maxFieldNameLength = randomIntBetween(15, 20);
|
||||
int maxFieldNameLength = randomIntBetween(25, 30);
|
||||
String testString = new String(new char[maxFieldNameLength + 1]).replace("\0", "a");
|
||||
Settings settings = Settings.builder().put(MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING.getKey(), maxFieldNameLength)
|
||||
.build();
|
||||
|
@ -389,7 +389,7 @@ public class MapperServiceTests extends OpenSearchSingleNodeTestCase {
|
|||
}
|
||||
|
||||
public void testAliasFieldNameLengthLimit() throws Throwable {
|
||||
int maxFieldNameLength = randomIntBetween(15, 20);
|
||||
int maxFieldNameLength = randomIntBetween(25, 30);
|
||||
String testString = new String(new char[maxFieldNameLength + 1]).replace("\0", "a");
|
||||
Settings settings = Settings.builder().put(MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING.getKey(), maxFieldNameLength)
|
||||
.build();
|
||||
|
|
|
@ -35,6 +35,7 @@ package org.opensearch.indices;
|
|||
import org.opensearch.LegacyESVersion;
|
||||
import org.opensearch.Version;
|
||||
import org.opensearch.index.mapper.AllFieldMapper;
|
||||
import org.opensearch.index.mapper.DataStreamFieldMapper;
|
||||
import org.opensearch.index.mapper.FieldNamesFieldMapper;
|
||||
import org.opensearch.index.mapper.IdFieldMapper;
|
||||
import org.opensearch.index.mapper.IgnoredFieldMapper;
|
||||
|
@ -90,12 +91,12 @@ public class IndicesModuleTests extends OpenSearchTestCase {
|
|||
});
|
||||
|
||||
private static String[] EXPECTED_METADATA_FIELDS = new String[]{IgnoredFieldMapper.NAME, IdFieldMapper.NAME,
|
||||
RoutingFieldMapper.NAME, IndexFieldMapper.NAME, SourceFieldMapper.NAME, TypeFieldMapper.NAME,
|
||||
VersionFieldMapper.NAME, SeqNoFieldMapper.NAME, FieldNamesFieldMapper.NAME};
|
||||
RoutingFieldMapper.NAME, IndexFieldMapper.NAME, DataStreamFieldMapper.NAME, SourceFieldMapper.NAME,
|
||||
TypeFieldMapper.NAME, VersionFieldMapper.NAME, SeqNoFieldMapper.NAME, FieldNamesFieldMapper.NAME};
|
||||
|
||||
private static String[] EXPECTED_METADATA_FIELDS_6x = new String[]{AllFieldMapper.NAME, IgnoredFieldMapper.NAME,
|
||||
IdFieldMapper.NAME, RoutingFieldMapper.NAME, IndexFieldMapper.NAME, SourceFieldMapper.NAME, TypeFieldMapper.NAME,
|
||||
VersionFieldMapper.NAME, SeqNoFieldMapper.NAME, FieldNamesFieldMapper.NAME};
|
||||
IdFieldMapper.NAME, RoutingFieldMapper.NAME, IndexFieldMapper.NAME, DataStreamFieldMapper.NAME, SourceFieldMapper.NAME,
|
||||
TypeFieldMapper.NAME, VersionFieldMapper.NAME, SeqNoFieldMapper.NAME, FieldNamesFieldMapper.NAME};
|
||||
|
||||
public void testBuiltinMappers() {
|
||||
IndicesModule module = new IndicesModule(Collections.emptyList());
|
||||
|
|
|
@ -81,19 +81,14 @@ public final class DataStreamTestHelper {
|
|||
}
|
||||
|
||||
public static String generateMapping(String timestampFieldName) {
|
||||
return "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"" + timestampFieldName + "\": {\n" +
|
||||
" \"type\": \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
return generateMapping(timestampFieldName, "date");
|
||||
}
|
||||
|
||||
public static String generateMapping(String timestampFieldName, String type) {
|
||||
return "{\n" +
|
||||
" \"_data_stream_timestamp\": {\n" +
|
||||
" \"enabled\": true\n" +
|
||||
" \"enabled\": true,\n" +
|
||||
" \"timestamp_field\": { \"name\": \"" + timestampFieldName + "\" }" +
|
||||
" }," +
|
||||
" \"properties\": {\n" +
|
||||
" \"" + timestampFieldName + "\": {\n" +
|
||||
|
|
Loading…
Reference in New Issue