[ML] Change chunking_config.time_span into a TimeValue (elastic/x-pack-elasticsearch#808)
Original commit: elastic/x-pack-elasticsearch@42d7b06e3f
This commit is contained in:
parent
e739d86f00
commit
99e3508267
|
@ -244,7 +244,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
||||||
builder.field("assigment_explanation", assignmentExplanation);
|
builder.field("assigment_explanation", assignmentExplanation);
|
||||||
}
|
}
|
||||||
if (openTime != null) {
|
if (openTime != null) {
|
||||||
builder.timeValueField("open_time", "open_time_string", openTime);
|
builder.field("open_time", openTime.getStringRep());
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
@ -30,7 +31,7 @@ public class ChunkingConfig extends ToXContentToBytes implements Writeable {
|
||||||
public static final ParseField TIME_SPAN_FIELD = new ParseField("time_span");
|
public static final ParseField TIME_SPAN_FIELD = new ParseField("time_span");
|
||||||
|
|
||||||
public static final ConstructingObjectParser<ChunkingConfig, Void> PARSER = new ConstructingObjectParser<>(
|
public static final ConstructingObjectParser<ChunkingConfig, Void> PARSER = new ConstructingObjectParser<>(
|
||||||
"chunking_config", a -> new ChunkingConfig((Mode) a[0], (Long) a[1]));
|
"chunking_config", a -> new ChunkingConfig((Mode) a[0], (TimeValue) a[1]));
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
|
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
|
||||||
|
@ -39,31 +40,36 @@ public class ChunkingConfig extends ToXContentToBytes implements Writeable {
|
||||||
}
|
}
|
||||||
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
|
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
|
||||||
}, MODE_FIELD, ValueType.STRING);
|
}, MODE_FIELD, ValueType.STRING);
|
||||||
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_SPAN_FIELD);
|
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
|
||||||
|
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
|
||||||
|
return TimeValue.parseTimeValue(p.text(), TIME_SPAN_FIELD.getPreferredName());
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
|
||||||
|
}, TIME_SPAN_FIELD, ValueType.STRING);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Mode mode;
|
private final Mode mode;
|
||||||
private final Long timeSpan;
|
private final TimeValue timeSpan;
|
||||||
|
|
||||||
public ChunkingConfig(StreamInput in) throws IOException {
|
public ChunkingConfig(StreamInput in) throws IOException {
|
||||||
mode = Mode.readFromStream(in);
|
mode = Mode.readFromStream(in);
|
||||||
timeSpan = in.readOptionalLong();
|
timeSpan = in.readOptionalWriteable(TimeValue::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
mode.writeTo(out);
|
mode.writeTo(out);
|
||||||
out.writeOptionalLong(timeSpan);
|
out.writeOptionalWriteable(timeSpan);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChunkingConfig(Mode mode, @Nullable Long timeSpan) {
|
ChunkingConfig(Mode mode, @Nullable TimeValue timeSpan) {
|
||||||
this.mode = ExceptionsHelper.requireNonNull(mode, MODE_FIELD.getPreferredName());
|
this.mode = ExceptionsHelper.requireNonNull(mode, MODE_FIELD.getPreferredName());
|
||||||
this.timeSpan = timeSpan;
|
this.timeSpan = timeSpan;
|
||||||
if (mode == Mode.MANUAL) {
|
if (mode == Mode.MANUAL) {
|
||||||
if (timeSpan == null) {
|
if (timeSpan == null) {
|
||||||
throw new IllegalArgumentException("when chunk mode is manual time_span is required");
|
throw new IllegalArgumentException("when chunk mode is manual time_span is required");
|
||||||
}
|
}
|
||||||
if (timeSpan <= 0) {
|
if (timeSpan.getMillis() <= 0) {
|
||||||
throw new IllegalArgumentException("chunk time_span has to be positive");
|
throw new IllegalArgumentException("chunk time_span has to be positive");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -74,7 +80,7 @@ public class ChunkingConfig extends ToXContentToBytes implements Writeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public Long getTimeSpan() {
|
public TimeValue getTimeSpan() {
|
||||||
return timeSpan;
|
return timeSpan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +93,7 @@ public class ChunkingConfig extends ToXContentToBytes implements Writeable {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.field(MODE_FIELD.getPreferredName(), mode);
|
builder.field(MODE_FIELD.getPreferredName(), mode);
|
||||||
if (timeSpan != null) {
|
if (timeSpan != null) {
|
||||||
builder.field(TIME_SPAN_FIELD.getPreferredName(), timeSpan);
|
builder.field(TIME_SPAN_FIELD.getPreferredName(), timeSpan.getStringRep());
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
@ -124,7 +130,7 @@ public class ChunkingConfig extends ToXContentToBytes implements Writeable {
|
||||||
return new ChunkingConfig(Mode.OFF, null);
|
return new ChunkingConfig(Mode.OFF, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ChunkingConfig newManual(long timeSpan) {
|
public static ChunkingConfig newManual(TimeValue timeSpan) {
|
||||||
return new ChunkingConfig(Mode.MANUAL, timeSpan);
|
return new ChunkingConfig(Mode.MANUAL, timeSpan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class ChunkedDataExtractor implements DataExtractor {
|
||||||
if (dataSummary.totalHits > 0) {
|
if (dataSummary.totalHits > 0) {
|
||||||
currentStart = dataSummary.earliestTime;
|
currentStart = dataSummary.earliestTime;
|
||||||
currentEnd = currentStart;
|
currentEnd = currentStart;
|
||||||
chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan;
|
chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis();
|
||||||
LOGGER.info("Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms",
|
LOGGER.info("Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms",
|
||||||
dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan);
|
dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
|
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.internal.Nullable;
|
import org.elasticsearch.common.inject.internal.Nullable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilder;
|
import org.elasticsearch.index.query.QueryBuilder;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -21,10 +22,10 @@ class ChunkedDataExtractorContext {
|
||||||
final int scrollSize;
|
final int scrollSize;
|
||||||
final long start;
|
final long start;
|
||||||
final long end;
|
final long end;
|
||||||
final Long chunkSpan;
|
final TimeValue chunkSpan;
|
||||||
|
|
||||||
ChunkedDataExtractorContext(String jobId, String timeField, List<String> indexes, List<String> types,
|
ChunkedDataExtractorContext(String jobId, String timeField, List<String> indexes, List<String> types,
|
||||||
QueryBuilder query, int scrollSize, long start, long end, @Nullable Long chunkSpan) {
|
QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan) {
|
||||||
this.jobId = Objects.requireNonNull(jobId);
|
this.jobId = Objects.requireNonNull(jobId);
|
||||||
this.timeField = Objects.requireNonNull(timeField);
|
this.timeField = Objects.requireNonNull(timeField);
|
||||||
this.indexes = indexes.toArray(new String[indexes.size()]);
|
this.indexes = indexes.toArray(new String[indexes.size()]);
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.xpack.ml.datafeed;
|
package org.elasticsearch.xpack.ml.datafeed;
|
||||||
|
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
|
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
|
||||||
|
|
||||||
|
@ -29,11 +30,11 @@ public class ChunkingConfigTests extends AbstractSerializingTestCase<ChunkingCon
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConstructorGivenAutoAndTimeSpan() {
|
public void testConstructorGivenAutoAndTimeSpan() {
|
||||||
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.AUTO, 1000L));
|
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.AUTO, TimeValue.timeValueMillis(1000)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConstructorGivenOffAndTimeSpan() {
|
public void testConstructorGivenOffAndTimeSpan() {
|
||||||
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.OFF, 1000L));
|
expectThrows(IllegalArgumentException.class, () ->new ChunkingConfig(ChunkingConfig.Mode.OFF, TimeValue.timeValueMillis(1000)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConstructorGivenManualAndNoTimeSpan() {
|
public void testConstructorGivenManualAndNoTimeSpan() {
|
||||||
|
@ -42,18 +43,15 @@ public class ChunkingConfigTests extends AbstractSerializingTestCase<ChunkingCon
|
||||||
|
|
||||||
public void testIsEnabled() {
|
public void testIsEnabled() {
|
||||||
assertThat(ChunkingConfig.newAuto().isEnabled(), is(true));
|
assertThat(ChunkingConfig.newAuto().isEnabled(), is(true));
|
||||||
assertThat(ChunkingConfig.newManual(1000).isEnabled(), is(true));
|
assertThat(ChunkingConfig.newManual(TimeValue.timeValueMillis(1000)).isEnabled(), is(true));
|
||||||
assertThat(ChunkingConfig.newOff().isEnabled(), is(false));
|
assertThat(ChunkingConfig.newOff().isEnabled(), is(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ChunkingConfig createRandomizedChunk() {
|
public static ChunkingConfig createRandomizedChunk() {
|
||||||
ChunkingConfig.Mode mode = randomFrom(ChunkingConfig.Mode.values());
|
ChunkingConfig.Mode mode = randomFrom(ChunkingConfig.Mode.values());
|
||||||
Long timeSpan = null;
|
TimeValue timeSpan = null;
|
||||||
if (mode == ChunkingConfig.Mode.MANUAL) {
|
if (mode == ChunkingConfig.Mode.MANUAL) {
|
||||||
timeSpan = randomNonNegativeLong();
|
timeSpan = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test");
|
||||||
if (timeSpan == 0L) {
|
|
||||||
timeSpan = 1L;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return new ChunkingConfig(mode, timeSpan);
|
return new ChunkingConfig(mode, timeSpan);
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
||||||
update.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false)));
|
update.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false)));
|
||||||
update.setScrollSize(8000);
|
update.setScrollSize(8000);
|
||||||
update.setSource(true);
|
update.setSource(true);
|
||||||
update.setChunkingConfig(ChunkingConfig.newManual(3600L));
|
update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
|
||||||
|
|
||||||
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
|
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
|
||||||
equalTo(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false))));
|
equalTo(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false))));
|
||||||
assertThat(updatedDatafeed.getScrollSize(), equalTo(8000));
|
assertThat(updatedDatafeed.getScrollSize(), equalTo(8000));
|
||||||
assertThat(updatedDatafeed.isSource(), is(true));
|
assertThat(updatedDatafeed.isSource(), is(true));
|
||||||
assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(3600L)));
|
assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(TimeValue.timeValueHours(1))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testApply_givenAggregations() {
|
public void testApply_givenAggregations() {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilder;
|
import org.elasticsearch.index.query.QueryBuilder;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.mock.orig.Mockito;
|
import org.elasticsearch.mock.orig.Mockito;
|
||||||
|
@ -46,7 +47,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
private List<String> indexes;
|
private List<String> indexes;
|
||||||
private QueryBuilder query;
|
private QueryBuilder query;
|
||||||
private int scrollSize;
|
private int scrollSize;
|
||||||
private Long chunkSpan;
|
private TimeValue chunkSpan;
|
||||||
private DataExtractorFactory dataExtractorFactory;
|
private DataExtractorFactory dataExtractorFactory;
|
||||||
|
|
||||||
private class TestDataExtractor extends ChunkedDataExtractor {
|
private class TestDataExtractor extends ChunkedDataExtractor {
|
||||||
|
@ -93,7 +94,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExtractionGivenSpecifiedChunk() throws IOException {
|
public void testExtractionGivenSpecifiedChunk() throws IOException {
|
||||||
chunkSpan = 1000L;
|
chunkSpan = TimeValue.timeValueSeconds(1);
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
||||||
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
||||||
|
|
||||||
|
@ -317,7 +318,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCancelGivenNextWasNeverCalled() throws IOException {
|
public void testCancelGivenNextWasNeverCalled() throws IOException {
|
||||||
chunkSpan = 1000L;
|
chunkSpan = TimeValue.timeValueSeconds(1);
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
||||||
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
||||||
|
|
||||||
|
@ -336,7 +337,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCancelGivenCurrentSubExtractorHasMore() throws IOException {
|
public void testCancelGivenCurrentSubExtractorHasMore() throws IOException {
|
||||||
chunkSpan = 1000L;
|
chunkSpan = TimeValue.timeValueSeconds(1);
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
||||||
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
||||||
|
|
||||||
|
@ -363,7 +364,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCancelGivenCurrentSubExtractorIsDone() throws IOException {
|
public void testCancelGivenCurrentSubExtractorIsDone() throws IOException {
|
||||||
chunkSpan = 1000L;
|
chunkSpan = TimeValue.timeValueSeconds(1);
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
||||||
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
extractor.setNextResponse(createSearchResponse(10L, 1000L, 2200L));
|
||||||
|
|
||||||
|
@ -387,7 +388,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDataSummaryRequestIsNotOk() {
|
public void testDataSummaryRequestIsNotOk() {
|
||||||
chunkSpan = 2000L;
|
chunkSpan = TimeValue.timeValueSeconds(2);
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
||||||
extractor.setNextResponse(createErrorResponse());
|
extractor.setNextResponse(createErrorResponse());
|
||||||
|
|
||||||
|
@ -396,7 +397,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDataSummaryRequestHasShardFailures() {
|
public void testDataSummaryRequestHasShardFailures() {
|
||||||
chunkSpan = 2000L;
|
chunkSpan = TimeValue.timeValueSeconds(2);
|
||||||
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L);
|
||||||
extractor.setNextResponse(createResponseWithShardFailures());
|
extractor.setNextResponse(createResponseWithShardFailures());
|
||||||
|
|
||||||
|
|
|
@ -271,11 +271,11 @@ setup:
|
||||||
"job_id":"job-1",
|
"job_id":"job-1",
|
||||||
"indexes":["index-foo"],
|
"indexes":["index-foo"],
|
||||||
"types":["type-bar"],
|
"types":["type-bar"],
|
||||||
"chunking_config": {"mode":"manual","time_span": 3600}
|
"chunking_config": {"mode":"manual","time_span": "1h"}
|
||||||
}
|
}
|
||||||
- match: { datafeed_id: "test-datafeed-1" }
|
- match: { datafeed_id: "test-datafeed-1" }
|
||||||
- match: { chunking_config.mode: "manual" }
|
- match: { chunking_config.mode: "manual" }
|
||||||
- match: { chunking_config.time_span: 3600 }
|
- match: { chunking_config.time_span: "1h" }
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test delete datafeed":
|
"Test delete datafeed":
|
||||||
|
|
Loading…
Reference in New Issue