[ML][Data Frame] Fixing default delay set in timesync (#44281) (#44293)

* [ML][Data Frame] Fixing default delay set in timesync

* disallowing explicit null, don't do duration check on write
This commit is contained in:
Benjamin Trent 2019-07-12 15:21:47 -05:00 committed by GitHub
parent d187fcb9de
commit 79c62fd724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 19 deletions

View File

@ -27,6 +27,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class TimeSyncConfig implements SyncConfig { public class TimeSyncConfig implements SyncConfig {
public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60);
private static final String NAME = "data_frame_transform_pivot_sync_time"; private static final String NAME = "data_frame_transform_pivot_sync_time";
private final String field; private final String field;
@ -39,16 +40,14 @@ public class TimeSyncConfig implements SyncConfig {
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient,
args -> { args -> {
String field = (String) args[0]; String field = (String) args[0];
TimeValue delay = args[1] != null ? (TimeValue) args[1] : TimeValue.ZERO; TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay); return new TimeSyncConfig(field, delay);
}); });
parser.declareString(constructorArg(), DataFrameField.FIELD); parser.declareString(constructorArg(), DataFrameField.FIELD);
parser.declareField(optionalConstructorArg(), parser.declareField(optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), DataFrameField.DELAY.getPreferredName()), DataFrameField.DELAY, (p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, DataFrameField.DELAY.getPreferredName()),
ObjectParser.ValueType.STRING_OR_NULL); DataFrameField.DELAY,
ObjectParser.ValueType.STRING);
return parser; return parser;
} }
@ -58,7 +57,7 @@ public class TimeSyncConfig implements SyncConfig {
public TimeSyncConfig(final String field, final TimeValue delay) { public TimeSyncConfig(final String field, final TimeValue delay) {
this.field = ExceptionsHelper.requireNonNull(field, DataFrameField.FIELD.getPreferredName()); this.field = ExceptionsHelper.requireNonNull(field, DataFrameField.FIELD.getPreferredName());
this.delay = ExceptionsHelper.requireNonNull(delay, DataFrameField.DELAY.getPreferredName()); this.delay = delay == null ? DEFAULT_DELAY : delay;
} }
public TimeSyncConfig(StreamInput in) throws IOException { public TimeSyncConfig(StreamInput in) throws IOException {
@ -89,9 +88,7 @@ public class TimeSyncConfig implements SyncConfig {
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(DataFrameField.FIELD.getPreferredName(), field); builder.field(DataFrameField.FIELD.getPreferredName(), field);
if (delay.duration() > 0) {
builder.field(DataFrameField.DELAY.getPreferredName(), delay.getStringRep()); builder.field(DataFrameField.DELAY.getPreferredName(), delay.getStringRep());
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -14,6 +14,8 @@ import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
import java.io.IOException; import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class TimeSyncConfigTests extends AbstractSerializingTestCase<TimeSyncConfig> { public class TimeSyncConfigTests extends AbstractSerializingTestCase<TimeSyncConfig> {
public static TimeSyncConfig randomTimeSyncConfig() { public static TimeSyncConfig randomTimeSyncConfig() {
@ -35,4 +37,8 @@ public class TimeSyncConfigTests extends AbstractSerializingTestCase<TimeSyncCon
return TimeSyncConfig::new; return TimeSyncConfig::new;
} }
public void testDefaultDelay() {
TimeSyncConfig config = new TimeSyncConfig(randomAlphaOfLength(10), null);
assertThat(config.getDelay(), equalTo(TimeSyncConfig.DEFAULT_DELAY));
}
} }

View File

@ -205,8 +205,7 @@ public class TransportPutDataFrameTransformAction
final Pivot pivot = new Pivot(config.getPivotConfig()); final Pivot pivot = new Pivot(config.getPivotConfig());
// <3> Return to the listener
// <5> Return the listener, or clean up destination index on failure.
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap( ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
putTransformConfigurationResult -> { putTransformConfigurationResult -> {
auditor.info(config.getId(), "Created data frame transform."); auditor.info(config.getId(), "Created data frame transform.");
@ -215,7 +214,7 @@ public class TransportPutDataFrameTransformAction
listener::onFailure listener::onFailure
); );
// <4> Put our transform // <2> Put our transform
ActionListener<Boolean> pivotValidationListener = ActionListener.wrap( ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
validationException -> listener.onFailure( validationException -> listener.onFailure(

View File

@ -259,6 +259,39 @@ setup:
- match: { transforms.0.sync.time.field: "time" } - match: { transforms.0.sync.time.field: "time" }
- match: { transforms.0.sync.time.delay: "90m" } - match: { transforms.0.sync.time.delay: "90m" }
--- ---
"Test PUT continuous transform without delay set":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-continuous"
body: >
{
"source": {
"index": "airline-data"
},
"dest": { "index": "airline-data-by-airline-continuous" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": {
"time": {
"field": "time"
}
}
}
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform:
transform_id: "airline-transform-continuous"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-continuous" }
- match: { transforms.0.source.index.0: "airline-data" }
- match: { transforms.0.dest.index: "airline-data-by-airline-continuous" }
- match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
- match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
- match: { transforms.0.sync.time.field: "time" }
- match: { transforms.0.sync.time.delay: "60s" }
---
"Test transform with invalid page parameter": "Test transform with invalid page parameter":
- do: - do:
catch: /Param \[size\] has a max acceptable value of \[1000\]/ catch: /Param \[size\] has a max acceptable value of \[1000\]/