* [ML][Data Frame] Add version and create_time to transform config (#43384) * [ML][Data Frame] Add version and create_time to transform config * s/transform_version/version s/Date/Instant * fixing getter/setter for version * adjusting for backport
This commit is contained in:
parent
e4fd0ce730
commit
f4b75d6d14
|
@ -19,16 +19,20 @@
|
|||
|
||||
package org.elasticsearch.client.dataframe.transforms;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
|
||||
import org.elasticsearch.client.dataframe.transforms.util.TimeUtil;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
@ -40,6 +44,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
public static final ParseField SOURCE = new ParseField("source");
|
||||
public static final ParseField DEST = new ParseField("dest");
|
||||
public static final ParseField DESCRIPTION = new ParseField("description");
|
||||
public static final ParseField VERSION = new ParseField("version");
|
||||
public static final ParseField CREATE_TIME = new ParseField("create_time");
|
||||
// types of transforms
|
||||
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
|
||||
|
||||
|
@ -48,6 +54,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
private final DestConfig dest;
|
||||
private final PivotConfig pivotConfig;
|
||||
private final String description;
|
||||
private final Version transformVersion;
|
||||
private final Instant createTime;
|
||||
|
||||
public static final ConstructingObjectParser<DataFrameTransformConfig, Void> PARSER =
|
||||
new ConstructingObjectParser<>("data_frame_transform", true,
|
||||
|
@ -57,7 +65,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
DestConfig dest = (DestConfig) args[2];
|
||||
PivotConfig pivotConfig = (PivotConfig) args[3];
|
||||
String description = (String)args[4];
|
||||
return new DataFrameTransformConfig(id, source, dest, pivotConfig, description);
|
||||
Instant createTime = (Instant)args[5];
|
||||
String transformVersion = (String)args[6];
|
||||
return new DataFrameTransformConfig(id, source, dest, pivotConfig, description, createTime, transformVersion);
|
||||
});
|
||||
|
||||
static {
|
||||
|
@ -66,6 +76,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
|
||||
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
|
||||
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
|
||||
PARSER.declareField(optionalConstructorArg(),
|
||||
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
|
||||
PARSER.declareString(optionalConstructorArg(), VERSION);
|
||||
}
|
||||
|
||||
public static DataFrameTransformConfig fromXContent(final XContentParser parser) {
|
||||
|
@ -84,19 +97,23 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
* @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
|
||||
*/
|
||||
public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
|
||||
return new DataFrameTransformConfig(null, source, null, pivotConfig, null);
|
||||
return new DataFrameTransformConfig(null, source, null, pivotConfig, null, null, null);
|
||||
}
|
||||
|
||||
DataFrameTransformConfig(final String id,
|
||||
final SourceConfig source,
|
||||
final DestConfig dest,
|
||||
final PivotConfig pivotConfig,
|
||||
final String description) {
|
||||
final String description,
|
||||
final Instant createTime,
|
||||
final String version) {
|
||||
this.id = id;
|
||||
this.source = source;
|
||||
this.dest = dest;
|
||||
this.pivotConfig = pivotConfig;
|
||||
this.description = description;
|
||||
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
|
||||
this.transformVersion = version == null ? null : Version.fromString(version);
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -115,6 +132,14 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
return pivotConfig;
|
||||
}
|
||||
|
||||
public Version getVersion() {
|
||||
return transformVersion;
|
||||
}
|
||||
|
||||
public Instant getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getDescription() {
|
||||
return description;
|
||||
|
@ -138,6 +163,12 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
if (description != null) {
|
||||
builder.field(DESCRIPTION.getPreferredName(), description);
|
||||
}
|
||||
if (createTime != null) {
|
||||
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
|
||||
}
|
||||
if (transformVersion != null) {
|
||||
builder.field(VERSION.getPreferredName(), transformVersion);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -155,15 +186,17 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
final DataFrameTransformConfig that = (DataFrameTransformConfig) other;
|
||||
|
||||
return Objects.equals(this.id, that.id)
|
||||
&& Objects.equals(this.source, that.source)
|
||||
&& Objects.equals(this.dest, that.dest)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.pivotConfig, that.pivotConfig);
|
||||
&& Objects.equals(this.source, that.source)
|
||||
&& Objects.equals(this.dest, that.dest)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.transformVersion, that.transformVersion)
|
||||
&& Objects.equals(this.createTime, that.createTime)
|
||||
&& Objects.equals(this.pivotConfig, that.pivotConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, source, dest, pivotConfig, description);
|
||||
return Objects.hash(id, source, dest, pivotConfig, description, createTime, transformVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -209,7 +242,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
|||
}
|
||||
|
||||
public DataFrameTransformConfig build() {
|
||||
return new DataFrameTransformConfig(id, source, dest, pivotConfig, description);
|
||||
return new DataFrameTransformConfig(id, source, dest, pivotConfig, description, null, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.client.dataframe.transforms.util;
|
||||
|
||||
import org.elasticsearch.common.time.DateFormatters;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Date;
|
||||
|
||||
public final class TimeUtil {
|
||||
|
||||
/**
|
||||
* Parse out a Date object given the current parser and field name.
|
||||
*
|
||||
* @param parser current XContentParser
|
||||
* @param fieldName the field's preferred name (utilized in exception)
|
||||
* @return parsed Date object
|
||||
* @throws IOException from XContentParser
|
||||
*/
|
||||
public static Date parseTimeField(XContentParser parser, String fieldName) throws IOException {
|
||||
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
|
||||
return new Date(parser.longValue());
|
||||
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
|
||||
return new Date(DateFormatters.from(DateTimeFormatter.ISO_INSTANT.parse(parser.text())).toInstant().toEpochMilli());
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
|
||||
}
|
||||
|
||||
public static Instant parseTimeFieldToInstant(XContentParser parser, String fieldName) throws IOException {
|
||||
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
|
||||
return Instant.ofEpochMilli(parser.longValue());
|
||||
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
|
||||
return DateFormatters.from(DateTimeFormatter.ISO_INSTANT.parse(parser.text())).toInstant();
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
|
||||
}
|
||||
|
||||
}
|
|
@ -195,7 +195,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|||
client::getDataFrameTransformAsync);
|
||||
assertNull(getResponse.getInvalidTransforms());
|
||||
assertThat(getResponse.getTransformConfigurations(), hasSize(1));
|
||||
assertEquals(transform, getResponse.getTransformConfigurations().get(0));
|
||||
assertEquals(transform.getId(), getResponse.getTransformConfigurations().get(0).getId());
|
||||
}
|
||||
|
||||
public void testGetAllAndPageTransforms() throws IOException {
|
||||
|
@ -219,7 +219,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|||
client::getDataFrameTransformAsync);
|
||||
assertNull(getResponse.getInvalidTransforms());
|
||||
assertThat(getResponse.getTransformConfigurations(), hasSize(2));
|
||||
assertEquals(transform, getResponse.getTransformConfigurations().get(1));
|
||||
assertEquals(transform.getId(), getResponse.getTransformConfigurations().get(1).getId());
|
||||
|
||||
getRequest.setPageParams(new PageParams(0,1));
|
||||
getResponse = execute(getRequest, client::getDataFrameTransform,
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.client.dataframe.transforms;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
|
@ -27,6 +28,7 @@ import org.elasticsearch.search.SearchModule;
|
|||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
@ -36,8 +38,13 @@ import static org.elasticsearch.client.dataframe.transforms.SourceConfigTests.ra
|
|||
public class DataFrameTransformConfigTests extends AbstractXContentTestCase<DataFrameTransformConfig> {
|
||||
|
||||
public static DataFrameTransformConfig randomDataFrameTransformConfig() {
|
||||
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(),
|
||||
randomDestConfig(), PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100));
|
||||
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10),
|
||||
randomSourceConfig(),
|
||||
randomDestConfig(),
|
||||
PivotConfigTests.randomPivotConfig(),
|
||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
|
||||
randomBoolean() ? null : Instant.now(),
|
||||
randomBoolean() ? null : Version.CURRENT.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -478,7 +478,6 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
|
|||
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
|
||||
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
|
||||
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
|
||||
TermsGroupSource.builder().setField("user_id").build()).build();
|
||||
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
||||
|
@ -564,7 +563,6 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
|
|||
public void testGetDataFrameTransform() throws IOException, InterruptedException {
|
||||
createIndex("source-data");
|
||||
|
||||
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
|
||||
GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
|
||||
TermsGroupSource.builder().setField("user_id").build()).build();
|
||||
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.core.dataframe.transforms;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -14,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -21,8 +23,10 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
|
|||
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
|
||||
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.dataframe.utils.TimeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -42,6 +46,8 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
|
||||
|
||||
public static final ParseField DESCRIPTION = new ParseField("description");
|
||||
public static final ParseField VERSION = new ParseField("version");
|
||||
public static final ParseField CREATE_TIME = new ParseField("create_time");
|
||||
private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
|
||||
private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
|
||||
private static final int MAX_DESCRIPTION_LENGTH = 1_000;
|
||||
|
@ -53,9 +59,17 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
// headers store the user context from the creating user, which allows us to run the transform as this user
|
||||
// the header only contains name, groups and other context but no authorization keys
|
||||
private Map<String, String> headers;
|
||||
private Version transformVersion;
|
||||
private Instant createTime;
|
||||
|
||||
private final PivotConfig pivotConfig;
|
||||
|
||||
private static void validateStrictParsingParams(Object arg, String parameterName) {
|
||||
if (arg != null) {
|
||||
throw new IllegalArgumentException("Found [" + parameterName + "], not allowed for strict parsing");
|
||||
}
|
||||
}
|
||||
|
||||
private static ConstructingObjectParser<DataFrameTransformConfig, String> createParser(boolean lenient) {
|
||||
ConstructingObjectParser<DataFrameTransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient,
|
||||
(args, optionalId) -> {
|
||||
|
@ -74,9 +88,11 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
|
||||
// ignored, only for internal storage: String docType = (String) args[3];
|
||||
|
||||
// on strict parsing do not allow injection of headers
|
||||
if (lenient == false && args[4] != null) {
|
||||
throw new IllegalArgumentException("Found [headers], not allowed for strict parsing");
|
||||
// on strict parsing do not allow injection of headers, transform version, or create time
|
||||
if (lenient == false) {
|
||||
validateStrictParsingParams(args[4], HEADERS.getPreferredName());
|
||||
validateStrictParsingParams(args[7], CREATE_TIME.getPreferredName());
|
||||
validateStrictParsingParams(args[8], VERSION.getPreferredName());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -84,7 +100,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
|
||||
PivotConfig pivotConfig = (PivotConfig) args[5];
|
||||
String description = (String)args[6];
|
||||
return new DataFrameTransformConfig(id, source, dest, headers, pivotConfig, description);
|
||||
return new DataFrameTransformConfig(id,
|
||||
source,
|
||||
dest,
|
||||
headers,
|
||||
pivotConfig,
|
||||
description,
|
||||
(Instant)args[7],
|
||||
(String)args[8]);
|
||||
});
|
||||
|
||||
parser.declareString(optionalConstructorArg(), DataFrameField.ID);
|
||||
|
@ -95,7 +118,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS);
|
||||
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
|
||||
parser.declareString(optionalConstructorArg(), DESCRIPTION);
|
||||
|
||||
parser.declareField(optionalConstructorArg(),
|
||||
p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
|
||||
parser.declareString(optionalConstructorArg(), VERSION);
|
||||
return parser;
|
||||
}
|
||||
|
||||
|
@ -103,12 +128,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
return NAME + "-" + transformId;
|
||||
}
|
||||
|
||||
public DataFrameTransformConfig(final String id,
|
||||
final SourceConfig source,
|
||||
final DestConfig dest,
|
||||
final Map<String, String> headers,
|
||||
final PivotConfig pivotConfig,
|
||||
final String description) {
|
||||
DataFrameTransformConfig(final String id,
|
||||
final SourceConfig source,
|
||||
final DestConfig dest,
|
||||
final Map<String, String> headers,
|
||||
final PivotConfig pivotConfig,
|
||||
final String description,
|
||||
final Instant createTime,
|
||||
final String version){
|
||||
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
|
||||
this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName());
|
||||
this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName());
|
||||
|
@ -123,6 +150,17 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
if (this.description != null && this.description.length() > MAX_DESCRIPTION_LENGTH) {
|
||||
throw new IllegalArgumentException("[description] must be less than 1000 characters in length.");
|
||||
}
|
||||
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
|
||||
this.transformVersion = version == null ? null : Version.fromString(version);
|
||||
}
|
||||
|
||||
public DataFrameTransformConfig(final String id,
|
||||
final SourceConfig source,
|
||||
final DestConfig dest,
|
||||
final Map<String, String> headers,
|
||||
final PivotConfig pivotConfig,
|
||||
final String description) {
|
||||
this(id, source, dest, headers, pivotConfig, description, null, null);
|
||||
}
|
||||
|
||||
public DataFrameTransformConfig(final StreamInput in) throws IOException {
|
||||
|
@ -132,6 +170,13 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
|
||||
pivotConfig = in.readOptionalWriteable(PivotConfig::new);
|
||||
description = in.readOptionalString();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
createTime = in.readOptionalInstant();
|
||||
transformVersion = in.readBoolean() ? Version.readVersion(in) : null;
|
||||
} else {
|
||||
createTime = null;
|
||||
transformVersion = null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -150,8 +195,28 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
return headers;
|
||||
}
|
||||
|
||||
public void setHeaders(Map<String, String> headers) {
|
||||
public DataFrameTransformConfig setHeaders(Map<String, String> headers) {
|
||||
this.headers = headers;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Version getVersion() {
|
||||
return transformVersion;
|
||||
}
|
||||
|
||||
public DataFrameTransformConfig setVersion(Version transformVersion) {
|
||||
this.transformVersion = transformVersion;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Instant getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public DataFrameTransformConfig setCreateTime(Instant createTime) {
|
||||
ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
|
||||
this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
|
||||
return this;
|
||||
}
|
||||
|
||||
public PivotConfig getPivotConfig() {
|
||||
|
@ -179,6 +244,15 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
|
||||
out.writeOptionalWriteable(pivotConfig);
|
||||
out.writeOptionalString(description);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||
out.writeOptionalInstant(createTime);
|
||||
if (transformVersion != null) {
|
||||
out.writeBoolean(true);
|
||||
Version.writeVersion(transformVersion, out);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -199,6 +273,12 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
if (description != null) {
|
||||
builder.field(DESCRIPTION.getPreferredName(), description);
|
||||
}
|
||||
if (transformVersion != null) {
|
||||
builder.field(VERSION.getPreferredName(), transformVersion);
|
||||
}
|
||||
if (createTime != null) {
|
||||
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -220,12 +300,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|||
&& Objects.equals(this.dest, that.dest)
|
||||
&& Objects.equals(this.headers, that.headers)
|
||||
&& Objects.equals(this.pivotConfig, that.pivotConfig)
|
||||
&& Objects.equals(this.description, that.description);
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.createTime, that.createTime)
|
||||
&& Objects.equals(this.transformVersion, that.transformVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode(){
|
||||
return Objects.hash(id, source, dest, headers, pivotConfig, description);
|
||||
return Objects.hash(id, source, dest, headers, pivotConfig, description, createTime, transformVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -31,6 +32,16 @@ public final class TimeUtils {
|
|||
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
|
||||
}
|
||||
|
||||
public static Instant parseTimeFieldToInstant(XContentParser parser, String fieldName) throws IOException {
|
||||
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
|
||||
return Instant.ofEpochMilli(parser.longValue());
|
||||
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
|
||||
return Instant.ofEpochMilli(TimeUtils.dateStringToEpoch(parser.text()));
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
|
||||
}
|
||||
|
||||
/**
|
||||
* First tries to parse the date first as a Long and convert that to an
|
||||
* epoch time. If the long number has more than 10 digits it is considered a
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.core.dataframe.transforms;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
|
@ -18,6 +19,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -41,13 +43,25 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
|||
}
|
||||
|
||||
public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
|
||||
return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), null,
|
||||
PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
||||
return new DataFrameTransformConfig(id,
|
||||
randomSourceConfig(),
|
||||
randomDestConfig(),
|
||||
null,
|
||||
PivotConfigTests.randomPivotConfig(),
|
||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
|
||||
null,
|
||||
null);
|
||||
}
|
||||
|
||||
public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) {
|
||||
return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), randomHeaders(),
|
||||
PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
||||
return new DataFrameTransformConfig(id,
|
||||
randomSourceConfig(),
|
||||
randomDestConfig(),
|
||||
randomHeaders(),
|
||||
PivotConfigTests.randomPivotConfig(),
|
||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
|
||||
randomBoolean() ? null : Instant.now(),
|
||||
randomBoolean() ? null : Version.CURRENT.toString());
|
||||
}
|
||||
|
||||
public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
|
||||
|
@ -147,6 +161,48 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
|||
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
|
||||
}
|
||||
|
||||
public void testPreventCreateTimeInjection() throws IOException {
|
||||
String pivotTransform = "{"
|
||||
+ " \"create_time\" : " + Instant.now().toEpochMilli() + " },"
|
||||
+ " \"source\" : {\"index\":\"src\"},"
|
||||
+ " \"dest\" : {\"index\": \"dest\"},"
|
||||
+ " \"pivot\" : {"
|
||||
+ " \"group_by\": {"
|
||||
+ " \"id\": {"
|
||||
+ " \"terms\": {"
|
||||
+ " \"field\": \"id\""
|
||||
+ "} } },"
|
||||
+ " \"aggs\": {"
|
||||
+ " \"avg\": {"
|
||||
+ " \"avg\": {"
|
||||
+ " \"field\": \"points\""
|
||||
+ "} } } } }";
|
||||
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
|
||||
}
|
||||
|
||||
public void testPreventVersionInjection() throws IOException {
|
||||
String pivotTransform = "{"
|
||||
+ " \"version\" : \"7.3.0\","
|
||||
+ " \"source\" : {\"index\":\"src\"},"
|
||||
+ " \"dest\" : {\"index\": \"dest\"},"
|
||||
+ " \"pivot\" : {"
|
||||
+ " \"group_by\": {"
|
||||
+ " \"id\": {"
|
||||
+ " \"terms\": {"
|
||||
+ " \"field\": \"id\""
|
||||
+ "} } },"
|
||||
+ " \"aggs\": {"
|
||||
+ " \"avg\": {"
|
||||
+ " \"avg\": {"
|
||||
+ " \"field\": \"points\""
|
||||
+ "} } } } }";
|
||||
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
|
||||
}
|
||||
|
||||
public void testXContentForInternalStorage() throws IOException {
|
||||
DataFrameTransformConfig dataFrameTransformConfig = randomDataFrameTransformConfig();
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@ import org.elasticsearch.client.RequestOptions;
|
|||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.client.core.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
|
||||
import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
|
||||
import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse;
|
||||
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
|
||||
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
|
||||
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
|
||||
|
@ -57,6 +59,7 @@ import java.time.ZoneId;
|
|||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -118,6 +121,11 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
|
|||
return restClient.dataFrame().getDataFrameTransformStats(new GetDataFrameTransformStatsRequest(id), RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
protected GetDataFrameTransformResponse getDataFrameTransform(String id) throws IOException {
|
||||
RestHighLevelClient restClient = new TestRestHighLevelClient();
|
||||
return restClient.dataFrame().getDataFrameTransform(new GetDataFrameTransformRequest(id), RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception {
|
||||
waitUntilCheckpoint(id, checkpoint, TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
|
@ -321,9 +329,11 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
|
|||
.build();
|
||||
}
|
||||
|
||||
private class TestRestHighLevelClient extends RestHighLevelClient {
|
||||
private static class TestRestHighLevelClient extends RestHighLevelClient {
|
||||
private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES =
|
||||
new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents();
|
||||
TestRestHighLevelClient() {
|
||||
super(client(), restClient -> {}, Collections.emptyList());
|
||||
super(client(), restClient -> {}, X_CONTENT_ENTRIES);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.dataframe.integration;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.core.IndexerState;
|
||||
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
|
||||
|
@ -17,6 +18,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
|
|||
import org.junit.After;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -58,6 +60,11 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
|
|||
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0).getTransformState().getIndexerState(),
|
||||
equalTo(IndexerState.STOPPED)));
|
||||
stopDataFrameTransform(config.getId());
|
||||
|
||||
DataFrameTransformConfig storedConfig = getDataFrameTransform(config.getId()).getTransformConfigurations().get(0);
|
||||
assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT));
|
||||
Instant now = Instant.now();
|
||||
assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now));
|
||||
deleteDataFrameTransform(config.getId());
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.action;
|
|||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
@ -51,6 +52,7 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
|
|||
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
|
@ -110,8 +112,10 @@ public class TransportPutDataFrameTransformAction
|
|||
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
DataFrameTransformConfig config = request.getConfig();
|
||||
config.setHeaders(filteredHeaders);
|
||||
DataFrameTransformConfig config = request.getConfig()
|
||||
.setHeaders(filteredHeaders)
|
||||
.setCreateTime(Instant.now())
|
||||
.setVersion(Version.CURRENT);
|
||||
|
||||
String transformId = config.getId();
|
||||
// quick check whether a transform has already been created under that name
|
||||
|
|
|
@ -90,6 +90,8 @@ setup:
|
|||
- match: { transforms.0.source.index.0: "airline-data" }
|
||||
- match: { transforms.0.dest.index: "airline-data-by-airline" }
|
||||
- is_true: transforms.0.source.query.match_all
|
||||
- is_true: transforms.0.create_time
|
||||
- is_true: transforms.0.version
|
||||
- match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
|
||||
- match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
|
||||
- match: { transforms.0.description: "yaml test transform on airline-data" }
|
||||
|
|
Loading…
Reference in New Issue