[7.x] Add DatafeedTimingStats.average_search_time_per_bucket_ms and TimingStats.total_bucket_processing_time_ms stats (#44125) (#44404)
This commit is contained in:
parent
4a79ccd324
commit
3f3a3d3f2b
|
@ -36,7 +36,9 @@ public class DatafeedTimingStats implements ToXContentObject {
|
|||
|
||||
public static final ParseField JOB_ID = new ParseField("job_id");
|
||||
public static final ParseField SEARCH_COUNT = new ParseField("search_count");
|
||||
public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
|
||||
public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms");
|
||||
public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms");
|
||||
|
||||
public static final ParseField TYPE = new ParseField("datafeed_timing_stats");
|
||||
|
||||
|
@ -50,23 +52,37 @@ public class DatafeedTimingStats implements ToXContentObject {
|
|||
args -> {
|
||||
String jobId = (String) args[0];
|
||||
Long searchCount = (Long) args[1];
|
||||
Double totalSearchTimeMs = (Double) args[2];
|
||||
return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
|
||||
Long bucketCount = (Long) args[2];
|
||||
Double totalSearchTimeMs = (Double) args[3];
|
||||
Double avgSearchTimePerBucketMs = (Double) args[4];
|
||||
return new DatafeedTimingStats(
|
||||
jobId,
|
||||
getOrDefault(searchCount, 0L),
|
||||
getOrDefault(bucketCount, 0L),
|
||||
getOrDefault(totalSearchTimeMs, 0.0),
|
||||
avgSearchTimePerBucketMs);
|
||||
});
|
||||
parser.declareString(constructorArg(), JOB_ID);
|
||||
parser.declareLong(optionalConstructorArg(), SEARCH_COUNT);
|
||||
parser.declareLong(optionalConstructorArg(), BUCKET_COUNT);
|
||||
parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS);
|
||||
parser.declareDouble(optionalConstructorArg(), AVG_SEARCH_TIME_PER_BUCKET_MS);
|
||||
return parser;
|
||||
}
|
||||
|
||||
private final String jobId;
|
||||
private long searchCount;
|
||||
private long bucketCount;
|
||||
private double totalSearchTimeMs;
|
||||
private Double avgSearchTimePerBucketMs;
|
||||
|
||||
public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) {
|
||||
public DatafeedTimingStats(
|
||||
String jobId, long searchCount, long bucketCount, double totalSearchTimeMs, @Nullable Double avgSearchTimePerBucketMs) {
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
this.searchCount = searchCount;
|
||||
this.bucketCount = bucketCount;
|
||||
this.totalSearchTimeMs = totalSearchTimeMs;
|
||||
this.avgSearchTimePerBucketMs = avgSearchTimePerBucketMs;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
|
@ -77,16 +93,28 @@ public class DatafeedTimingStats implements ToXContentObject {
|
|||
return searchCount;
|
||||
}
|
||||
|
||||
public long getBucketCount() {
|
||||
return bucketCount;
|
||||
}
|
||||
|
||||
public double getTotalSearchTimeMs() {
|
||||
return totalSearchTimeMs;
|
||||
}
|
||||
|
||||
public Double getAvgSearchTimePerBucketMs() {
|
||||
return avgSearchTimePerBucketMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(JOB_ID.getPreferredName(), jobId);
|
||||
builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
|
||||
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
|
||||
builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs);
|
||||
if (avgSearchTimePerBucketMs != null) {
|
||||
builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), avgSearchTimePerBucketMs);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -103,12 +131,14 @@ public class DatafeedTimingStats implements ToXContentObject {
|
|||
DatafeedTimingStats other = (DatafeedTimingStats) obj;
|
||||
return Objects.equals(this.jobId, other.jobId)
|
||||
&& this.searchCount == other.searchCount
|
||||
&& this.totalSearchTimeMs == other.totalSearchTimeMs;
|
||||
&& this.bucketCount == other.bucketCount
|
||||
&& this.totalSearchTimeMs == other.totalSearchTimeMs
|
||||
&& Objects.equals(this.avgSearchTimePerBucketMs, other.avgSearchTimePerBucketMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, searchCount, totalSearchTimeMs);
|
||||
return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs, avgSearchTimePerBucketMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,6 +39,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
|
|||
public class TimingStats implements ToXContentObject {
|
||||
|
||||
public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
|
||||
public static final ParseField TOTAL_BUCKET_PROCESSING_TIME_MS = new ParseField("total_bucket_processing_time_ms");
|
||||
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
|
||||
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
|
||||
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
|
||||
|
@ -49,12 +50,28 @@ public class TimingStats implements ToXContentObject {
|
|||
new ConstructingObjectParser<>(
|
||||
"timing_stats",
|
||||
true,
|
||||
args ->
|
||||
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
|
||||
args -> {
|
||||
String jobId = (String) args[0];
|
||||
Long bucketCount = (Long) args[1];
|
||||
Double totalBucketProcessingTimeMs = (Double) args[2];
|
||||
Double minBucketProcessingTimeMs = (Double) args[3];
|
||||
Double maxBucketProcessingTimeMs = (Double) args[4];
|
||||
Double avgBucketProcessingTimeMs = (Double) args[5];
|
||||
Double exponentialAvgBucketProcessingTimeMs = (Double) args[6];
|
||||
return new TimingStats(
|
||||
jobId,
|
||||
getOrDefault(bucketCount, 0L),
|
||||
getOrDefault(totalBucketProcessingTimeMs, 0.0),
|
||||
minBucketProcessingTimeMs,
|
||||
maxBucketProcessingTimeMs,
|
||||
avgBucketProcessingTimeMs,
|
||||
exponentialAvgBucketProcessingTimeMs);
|
||||
});
|
||||
|
||||
static {
|
||||
PARSER.declareString(constructorArg(), Job.ID);
|
||||
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
|
||||
PARSER.declareLong(optionalConstructorArg(), BUCKET_COUNT);
|
||||
PARSER.declareDouble(optionalConstructorArg(), TOTAL_BUCKET_PROCESSING_TIME_MS);
|
||||
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
|
||||
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
|
||||
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
|
||||
|
@ -63,6 +80,7 @@ public class TimingStats implements ToXContentObject {
|
|||
|
||||
private final String jobId;
|
||||
private long bucketCount;
|
||||
private double totalBucketProcessingTimeMs;
|
||||
private Double minBucketProcessingTimeMs;
|
||||
private Double maxBucketProcessingTimeMs;
|
||||
private Double avgBucketProcessingTimeMs;
|
||||
|
@ -71,12 +89,14 @@ public class TimingStats implements ToXContentObject {
|
|||
public TimingStats(
|
||||
String jobId,
|
||||
long bucketCount,
|
||||
double totalBucketProcessingTimeMs,
|
||||
@Nullable Double minBucketProcessingTimeMs,
|
||||
@Nullable Double maxBucketProcessingTimeMs,
|
||||
@Nullable Double avgBucketProcessingTimeMs,
|
||||
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
|
||||
this.jobId = jobId;
|
||||
this.bucketCount = bucketCount;
|
||||
this.totalBucketProcessingTimeMs = totalBucketProcessingTimeMs;
|
||||
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
|
||||
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
|
||||
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
|
||||
|
@ -91,6 +111,10 @@ public class TimingStats implements ToXContentObject {
|
|||
return bucketCount;
|
||||
}
|
||||
|
||||
public double getTotalBucketProcessingTimeMs() {
|
||||
return totalBucketProcessingTimeMs;
|
||||
}
|
||||
|
||||
public Double getMinBucketProcessingTimeMs() {
|
||||
return minBucketProcessingTimeMs;
|
||||
}
|
||||
|
@ -112,6 +136,7 @@ public class TimingStats implements ToXContentObject {
|
|||
builder.startObject();
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
|
||||
builder.field(TOTAL_BUCKET_PROCESSING_TIME_MS.getPreferredName(), totalBucketProcessingTimeMs);
|
||||
if (minBucketProcessingTimeMs != null) {
|
||||
builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs);
|
||||
}
|
||||
|
@ -135,6 +160,7 @@ public class TimingStats implements ToXContentObject {
|
|||
TimingStats that = (TimingStats) o;
|
||||
return Objects.equals(this.jobId, that.jobId)
|
||||
&& this.bucketCount == that.bucketCount
|
||||
&& this.totalBucketProcessingTimeMs == that.totalBucketProcessingTimeMs
|
||||
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
|
||||
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
|
||||
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
|
||||
|
@ -146,6 +172,7 @@ public class TimingStats implements ToXContentObject {
|
|||
return Objects.hash(
|
||||
jobId,
|
||||
bucketCount,
|
||||
totalBucketProcessingTimeMs,
|
||||
minBucketProcessingTimeMs,
|
||||
maxBucketProcessingTimeMs,
|
||||
avgBucketProcessingTimeMs,
|
||||
|
@ -156,4 +183,8 @@ public class TimingStats implements ToXContentObject {
|
|||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
private static <T> T getOrDefault(@Nullable T value, T defaultValue) {
|
||||
return value != null ? value : defaultValue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,13 +27,15 @@ import org.elasticsearch.test.AbstractXContentTestCase;
|
|||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedTimingStats> {
|
||||
|
||||
private static final String JOB_ID = "my-job-id";
|
||||
|
||||
public static DatafeedTimingStats createRandomInstance() {
|
||||
return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble());
|
||||
return new DatafeedTimingStats(
|
||||
randomAlphaOfLength(10), randomLong(), randomLong(), randomDouble(), randomBoolean() ? null : randomDouble());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,14 +61,16 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedT
|
|||
DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(0L));
|
||||
assertThat(stats.getBucketCount(), equalTo(0L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void testEquals() {
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0, 20.0);
|
||||
|
||||
assertTrue(stats1.equals(stats1));
|
||||
assertTrue(stats1.equals(stats2));
|
||||
|
@ -74,9 +78,9 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedT
|
|||
}
|
||||
|
||||
public void testHashCode() {
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0, 20.0);
|
||||
|
||||
assertEquals(stats1.hashCode(), stats1.hashCode());
|
||||
assertEquals(stats1.hashCode(), stats2.hashCode());
|
||||
|
@ -84,9 +88,11 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedT
|
|||
}
|
||||
|
||||
public void testConstructorAndGetters() {
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456);
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456, 78.9);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(5L));
|
||||
assertThat(stats.getBucketCount(), equalTo(10L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(78.9));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,14 @@
|
|||
*/
|
||||
package org.elasticsearch.client.ml.job.process;
|
||||
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
@ -32,6 +37,7 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
|
|||
return new TimingStats(
|
||||
jobId,
|
||||
randomLong(),
|
||||
randomDouble(),
|
||||
randomBoolean() ? null : randomDouble(),
|
||||
randomBoolean() ? null : randomDouble(),
|
||||
randomBoolean() ? null : randomDouble(),
|
||||
|
@ -54,10 +60,11 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
|
|||
}
|
||||
|
||||
public void testConstructor() {
|
||||
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
|
||||
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getBucketCount(), equalTo(7L));
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61));
|
||||
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
|
||||
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
|
||||
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
|
||||
|
@ -65,20 +72,37 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
|
|||
}
|
||||
|
||||
public void testConstructor_NullValues() {
|
||||
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null);
|
||||
TimingStats stats = new TimingStats(JOB_ID, 7, 8.61, null, null, null, null);
|
||||
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getBucketCount(), equalTo(7L));
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61));
|
||||
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
|
||||
}
|
||||
|
||||
public void testParse_OptionalFieldsAbsent() throws IOException {
|
||||
String json = "{\"job_id\": \"my-job-id\"}";
|
||||
try (XContentParser parser =
|
||||
XContentFactory.xContent(XContentType.JSON).createParser(
|
||||
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json)) {
|
||||
TimingStats stats = TimingStats.PARSER.apply(parser, null);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getBucketCount(), equalTo(0L));
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0));
|
||||
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void testEquals() {
|
||||
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
|
||||
TimingStats stats1 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats2 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats3 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 3.0, 1.23, 7.89);
|
||||
|
||||
assertTrue(stats1.equals(stats1));
|
||||
assertTrue(stats1.equals(stats2));
|
||||
|
@ -86,9 +110,9 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
|
|||
}
|
||||
|
||||
public void testHashCode() {
|
||||
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
|
||||
TimingStats stats1 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats2 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
|
||||
TimingStats stats3 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 3.0, 1.23, 7.89);
|
||||
|
||||
assertEquals(stats1.hashCode(), stats1.hashCode());
|
||||
assertEquals(stats1.hashCode(), stats2.hashCode());
|
||||
|
|
|
@ -25,8 +25,10 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
|||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -39,6 +41,9 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
|
|||
|
||||
public static final String ALL = "_all";
|
||||
private static final String STATE = "state";
|
||||
private static final String NODE = "node";
|
||||
private static final String ASSIGNMENT_EXPLANATION = "assignment_explanation";
|
||||
private static final String TIMING_STATS = "timing_stats";
|
||||
|
||||
private GetDatafeedsStatsAction() {
|
||||
super(NAME);
|
||||
|
@ -186,7 +191,7 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
|
|||
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
|
||||
builder.field(STATE, datafeedState.toString());
|
||||
if (node != null) {
|
||||
builder.startObject("node");
|
||||
builder.startObject(NODE);
|
||||
builder.field("id", node.getId());
|
||||
builder.field("name", node.getName());
|
||||
builder.field("ephemeral_id", node.getEphemeralId());
|
||||
|
@ -202,10 +207,13 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
|
|||
builder.endObject();
|
||||
}
|
||||
if (assignmentExplanation != null) {
|
||||
builder.field("assignment_explanation", assignmentExplanation);
|
||||
builder.field(ASSIGNMENT_EXPLANATION, assignmentExplanation);
|
||||
}
|
||||
if (timingStats != null) {
|
||||
builder.field("timing_stats", timingStats);
|
||||
builder.field(
|
||||
TIMING_STATS,
|
||||
timingStats,
|
||||
new MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_CALCULATED_FIELDS, "true")));
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
|
|||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
|
||||
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -52,6 +53,8 @@ public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response>
|
|||
private static final String FORECASTS_STATS = "forecasts_stats";
|
||||
private static final String STATE = "state";
|
||||
private static final String NODE = "node";
|
||||
private static final String ASSIGNMENT_EXPLANATION = "assignment_explanation";
|
||||
private static final String OPEN_TIME = "open_time";
|
||||
private static final String TIMING_STATS = "timing_stats";
|
||||
|
||||
private GetJobsStatsAction() {
|
||||
|
@ -275,13 +278,16 @@ public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response>
|
|||
builder.endObject();
|
||||
}
|
||||
if (assignmentExplanation != null) {
|
||||
builder.field("assignment_explanation", assignmentExplanation);
|
||||
builder.field(ASSIGNMENT_EXPLANATION, assignmentExplanation);
|
||||
}
|
||||
if (openTime != null) {
|
||||
builder.field("open_time", openTime.getStringRep());
|
||||
builder.field(OPEN_TIME, openTime.getStringRep());
|
||||
}
|
||||
if (timingStats != null) {
|
||||
builder.field(TIMING_STATS, timingStats);
|
||||
builder.field(
|
||||
TIMING_STATS,
|
||||
timingStats,
|
||||
new MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_CALCULATED_FIELDS, "true")));
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -26,7 +27,9 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
|
||||
public static final ParseField JOB_ID = new ParseField("job_id");
|
||||
public static final ParseField SEARCH_COUNT = new ParseField("search_count");
|
||||
public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
|
||||
public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms");
|
||||
public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms");
|
||||
|
||||
public static final ParseField TYPE = new ParseField("datafeed_timing_stats");
|
||||
|
||||
|
@ -40,11 +43,14 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
args -> {
|
||||
String jobId = (String) args[0];
|
||||
Long searchCount = (Long) args[1];
|
||||
Double totalSearchTimeMs = (Double) args[2];
|
||||
return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
|
||||
Long bucketCount = (Long) args[2];
|
||||
Double totalSearchTimeMs = (Double) args[3];
|
||||
return new DatafeedTimingStats(
|
||||
jobId, getOrDefault(searchCount, 0L), getOrDefault(bucketCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
|
||||
});
|
||||
parser.declareString(constructorArg(), JOB_ID);
|
||||
parser.declareLong(optionalConstructorArg(), SEARCH_COUNT);
|
||||
parser.declareLong(optionalConstructorArg(), BUCKET_COUNT);
|
||||
parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS);
|
||||
return parser;
|
||||
}
|
||||
|
@ -55,26 +61,29 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
|
||||
private final String jobId;
|
||||
private long searchCount;
|
||||
private long bucketCount;
|
||||
private double totalSearchTimeMs;
|
||||
|
||||
public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) {
|
||||
public DatafeedTimingStats(String jobId, long searchCount, long bucketCount, double totalSearchTimeMs) {
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
this.searchCount = searchCount;
|
||||
this.bucketCount = bucketCount;
|
||||
this.totalSearchTimeMs = totalSearchTimeMs;
|
||||
}
|
||||
|
||||
public DatafeedTimingStats(String jobId) {
|
||||
this(jobId, 0, 0);
|
||||
this(jobId, 0, 0, 0.0);
|
||||
}
|
||||
|
||||
public DatafeedTimingStats(StreamInput in) throws IOException {
|
||||
jobId = in.readString();
|
||||
searchCount = in.readLong();
|
||||
bucketCount = in.readLong();
|
||||
totalSearchTimeMs = in.readDouble();
|
||||
}
|
||||
|
||||
public DatafeedTimingStats(DatafeedTimingStats other) {
|
||||
this(other.jobId, other.searchCount, other.totalSearchTimeMs);
|
||||
this(other.jobId, other.searchCount, other.bucketCount, other.totalSearchTimeMs);
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
|
@ -85,19 +94,34 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
return searchCount;
|
||||
}
|
||||
|
||||
public long getBucketCount() {
|
||||
return bucketCount;
|
||||
}
|
||||
|
||||
public double getTotalSearchTimeMs() {
|
||||
return totalSearchTimeMs;
|
||||
}
|
||||
|
||||
public Double getAvgSearchTimePerBucketMs() {
|
||||
return bucketCount > 0
|
||||
? totalSearchTimeMs / bucketCount
|
||||
: null;
|
||||
}
|
||||
|
||||
public void incrementTotalSearchTimeMs(double searchTimeMs) {
|
||||
this.searchCount++;
|
||||
this.totalSearchTimeMs += searchTimeMs;
|
||||
}
|
||||
|
||||
public void setBucketCount(long bucketCount) {
|
||||
this.bucketCount = bucketCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
out.writeLong(searchCount);
|
||||
out.writeLong(bucketCount);
|
||||
out.writeDouble(totalSearchTimeMs);
|
||||
}
|
||||
|
||||
|
@ -106,7 +130,14 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
builder.startObject();
|
||||
builder.field(JOB_ID.getPreferredName(), jobId);
|
||||
builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
|
||||
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
|
||||
builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs);
|
||||
if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) {
|
||||
Double avgSearchTimePerBucket = getAvgSearchTimePerBucketMs();
|
||||
if (avgSearchTimePerBucket != null) {
|
||||
builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), getAvgSearchTimePerBucketMs());
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -123,12 +154,13 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
DatafeedTimingStats other = (DatafeedTimingStats) obj;
|
||||
return Objects.equals(this.jobId, other.jobId)
|
||||
&& this.searchCount == other.searchCount
|
||||
&& this.bucketCount == other.bucketCount
|
||||
&& this.totalSearchTimeMs == other.totalSearchTimeMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, searchCount, totalSearchTimeMs);
|
||||
return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -936,6 +936,7 @@ public class ElasticsearchMappings {
|
|||
|
||||
/**
|
||||
* {@link DatafeedTimingStats} mapping.
|
||||
* Does not include mapping for BUCKET_COUNT as this mapping is added by {@link #addDataCountsMapping} method.
|
||||
*
|
||||
* @throws IOException On builder write error
|
||||
*/
|
||||
|
@ -944,6 +945,7 @@ public class ElasticsearchMappings {
|
|||
.startObject(DatafeedTimingStats.SEARCH_COUNT.getPreferredName())
|
||||
.field(TYPE, LONG)
|
||||
.endObject()
|
||||
// re-used: BUCKET_COUNT
|
||||
.startObject(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName())
|
||||
.field(TYPE, DOUBLE)
|
||||
.endObject();
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -28,6 +29,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
|
|||
public class TimingStats implements ToXContentObject, Writeable {
|
||||
|
||||
public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
|
||||
public static final ParseField TOTAL_BUCKET_PROCESSING_TIME_MS = new ParseField("total_bucket_processing_time_ms");
|
||||
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
|
||||
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
|
||||
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
|
||||
|
@ -40,8 +42,21 @@ public class TimingStats implements ToXContentObject, Writeable {
|
|||
new ConstructingObjectParser<>(
|
||||
TYPE.getPreferredName(),
|
||||
true,
|
||||
args ->
|
||||
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
|
||||
args -> {
|
||||
String jobId = (String) args[0];
|
||||
long bucketCount = (long) args[1];
|
||||
Double minBucketProcessingTimeMs = (Double) args[2];
|
||||
Double maxBucketProcessingTimeMs = (Double) args[3];
|
||||
Double avgBucketProcessingTimeMs = (Double) args[4];
|
||||
Double exponentialAvgBucketProcessingTimeMs = (Double) args[5];
|
||||
return new TimingStats(
|
||||
jobId,
|
||||
bucketCount,
|
||||
minBucketProcessingTimeMs,
|
||||
maxBucketProcessingTimeMs,
|
||||
avgBucketProcessingTimeMs,
|
||||
exponentialAvgBucketProcessingTimeMs);
|
||||
});
|
||||
|
||||
static {
|
||||
PARSER.declareString(constructorArg(), Job.ID);
|
||||
|
@ -109,6 +124,13 @@ public class TimingStats implements ToXContentObject, Writeable {
|
|||
return bucketCount;
|
||||
}
|
||||
|
||||
/** Calculates total bucket processing time as a product of the all-time average bucket processing time and the number of buckets. */
|
||||
public double getTotalBucketProcessingTimeMs() {
|
||||
return avgBucketProcessingTimeMs != null
|
||||
? bucketCount * avgBucketProcessingTimeMs
|
||||
: 0.0;
|
||||
}
|
||||
|
||||
public Double getMinBucketProcessingTimeMs() {
|
||||
return minBucketProcessingTimeMs;
|
||||
}
|
||||
|
@ -126,7 +148,7 @@ public class TimingStats implements ToXContentObject, Writeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Updates the statistics (min, max, avg) for the given data point (bucket processing time).
|
||||
* Updates the statistics (min, max, avg, exponential avg) for the given data point (bucket processing time).
|
||||
*/
|
||||
public void updateStats(double bucketProcessingTimeMs) {
|
||||
if (bucketProcessingTimeMs < 0.0) {
|
||||
|
@ -175,6 +197,9 @@ public class TimingStats implements ToXContentObject, Writeable {
|
|||
builder.startObject();
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
|
||||
if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) {
|
||||
builder.field(TOTAL_BUCKET_PROCESSING_TIME_MS.getPreferredName(), getTotalBucketProcessingTimeMs());
|
||||
}
|
||||
if (minBucketProcessingTimeMs != null) {
|
||||
builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs);
|
||||
}
|
||||
|
|
|
@ -187,6 +187,7 @@ public final class ReservedFieldNames {
|
|||
TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
|
||||
|
||||
DatafeedTimingStats.SEARCH_COUNT.getPreferredName(),
|
||||
DatafeedTimingStats.BUCKET_COUNT.getPreferredName(),
|
||||
DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(),
|
||||
|
||||
GetResult._ID,
|
||||
|
|
|
@ -24,6 +24,12 @@ public final class ToXContentParams {
|
|||
*/
|
||||
public static final String INCLUDE_TYPE = "include_type";
|
||||
|
||||
/**
|
||||
* When serialising POJOs to X Content this indicates whether the calculated (i.e. not stored) fields
|
||||
* should be included or not
|
||||
*/
|
||||
public static final String INCLUDE_CALCULATED_FIELDS = "include_calculated_fields";
|
||||
|
||||
private ToXContentParams() {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
|
|||
Collections.emptySet(),
|
||||
Version.CURRENT);
|
||||
|
||||
DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 123.456);
|
||||
DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 10, 100.0);
|
||||
|
||||
Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null, timingStats);
|
||||
|
||||
|
@ -109,9 +109,11 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
|
|||
assertThat(nodeAttributes, hasEntry("ml.max_open_jobs", "5"));
|
||||
|
||||
Map<String, Object> timingStatsMap = (Map<String, Object>) dfStatsMap.get("timing_stats");
|
||||
assertThat(timingStatsMap.size(), is(equalTo(3)));
|
||||
assertThat(timingStatsMap.size(), is(equalTo(5)));
|
||||
assertThat(timingStatsMap, hasEntry("job_id", "my-job-id"));
|
||||
assertThat(timingStatsMap, hasEntry("search_count", 5));
|
||||
assertThat(timingStatsMap, hasEntry("total_search_time_ms", 123.456));
|
||||
assertThat(timingStatsMap, hasEntry("bucket_count", 10));
|
||||
assertThat(timingStatsMap, hasEntry("total_search_time_ms", 100.0));
|
||||
assertThat(timingStatsMap, hasEntry("average_search_time_per_bucket_ms", 10.0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,14 +14,16 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.closeTo;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<DatafeedTimingStats> {
|
||||
|
||||
private static final String JOB_ID = "my-job-id";
|
||||
|
||||
public static DatafeedTimingStats createRandom() {
|
||||
return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble());
|
||||
return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomLong(), randomDouble());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -43,10 +45,12 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
|||
protected DatafeedTimingStats mutateInstance(DatafeedTimingStats instance) throws IOException {
|
||||
String jobId = instance.getJobId();
|
||||
long searchCount = instance.getSearchCount();
|
||||
long bucketCount = instance.getBucketCount();
|
||||
double totalSearchTimeMs = instance.getTotalSearchTimeMs();
|
||||
return new DatafeedTimingStats(
|
||||
jobId + randomAlphaOfLength(5),
|
||||
searchCount + 1,
|
||||
searchCount + 2,
|
||||
bucketCount + 1,
|
||||
totalSearchTimeMs + randomDoubleBetween(1.0, 100.0, true));
|
||||
}
|
||||
|
||||
|
@ -58,14 +62,16 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
|||
DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(0L));
|
||||
assertThat(stats.getBucketCount(), equalTo(0L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void testEquals() {
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0);
|
||||
|
||||
assertTrue(stats1.equals(stats1));
|
||||
assertTrue(stats1.equals(stats2));
|
||||
|
@ -73,9 +79,9 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
|||
}
|
||||
|
||||
public void testHashCode() {
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0);
|
||||
|
||||
assertEquals(stats1.hashCode(), stats1.hashCode());
|
||||
assertEquals(stats1.hashCode(), stats2.hashCode());
|
||||
|
@ -83,32 +89,72 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
|
|||
}
|
||||
|
||||
public void testConstructorsAndGetters() {
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456);
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(5L));
|
||||
assertThat(stats.getBucketCount(), equalTo(10L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), closeTo(12.3456, 1e-9));
|
||||
|
||||
stats = new DatafeedTimingStats(JOB_ID);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(0L));
|
||||
assertThat(stats.getBucketCount(), equalTo(0L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue());
|
||||
}
|
||||
|
||||
public void testCopyConstructor() {
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 123.456);
|
||||
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456);
|
||||
DatafeedTimingStats stats2 = new DatafeedTimingStats(stats1);
|
||||
|
||||
assertThat(stats2.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats2.getSearchCount(), equalTo(5L));
|
||||
assertThat(stats2.getBucketCount(), equalTo(10L));
|
||||
assertThat(stats2.getTotalSearchTimeMs(), equalTo(123.456));
|
||||
assertThat(stats2.getAvgSearchTimePerBucketMs(), closeTo(12.3456, 1e-9));
|
||||
}
|
||||
|
||||
public void testIncrementTotalSearchTimeMs() {
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 100.0);
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
stats.incrementTotalSearchTimeMs(200.0);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(6L));
|
||||
assertThat(stats.getBucketCount(), equalTo(10L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(30.0));
|
||||
}
|
||||
|
||||
public void testSetBucketCount() {
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
stats.setBucketCount(20);
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getSearchCount(), equalTo(5L));
|
||||
assertThat(stats.getBucketCount(), equalTo(20L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0));
|
||||
}
|
||||
|
||||
public void testAvgSearchTimePerBucketIsCalculatedProperlyAfterUpdates() {
|
||||
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
|
||||
assertThat(stats.getBucketCount(), equalTo(10L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(10.0));
|
||||
|
||||
stats.setBucketCount(20);
|
||||
assertThat(stats.getBucketCount(), equalTo(20L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0));
|
||||
|
||||
stats.incrementTotalSearchTimeMs(200.0);
|
||||
assertThat(stats.getBucketCount(), equalTo(20L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(15.0));
|
||||
|
||||
stats.setBucketCount(25);
|
||||
assertThat(stats.getBucketCount(), equalTo(25L));
|
||||
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
|
||||
assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(12.0));
|
||||
}
|
||||
|
||||
public void testDocumentId() {
|
||||
|
|
|
@ -69,6 +69,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
|
|||
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getBucketCount(), equalTo(0L));
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0));
|
||||
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
|
||||
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
|
||||
|
@ -80,6 +81,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
|
|||
|
||||
assertThat(stats.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats.getBucketCount(), equalTo(7L));
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61));
|
||||
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
|
||||
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
|
||||
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
|
||||
|
@ -92,6 +94,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
|
|||
|
||||
assertThat(stats2.getJobId(), equalTo(JOB_ID));
|
||||
assertThat(stats2.getBucketCount(), equalTo(7L));
|
||||
assertThat(stats2.getTotalBucketProcessingTimeMs(), equalTo(8.61));
|
||||
assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0));
|
||||
assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0));
|
||||
assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23));
|
||||
|
@ -119,6 +122,26 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
|
|||
assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0, 3.00029801), 1e-9));
|
||||
}
|
||||
|
||||
public void testTotalBucketProcessingTimeIsCalculatedProperlyAfterUpdates() {
|
||||
TimingStats stats = new TimingStats(JOB_ID);
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0));
|
||||
|
||||
stats.updateStats(3);
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(3.0));
|
||||
|
||||
stats.updateStats(2);
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(5.0));
|
||||
|
||||
stats.updateStats(4);
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(9.0));
|
||||
|
||||
stats.updateStats(1);
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(10.0));
|
||||
|
||||
stats.updateStats(5);
|
||||
assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(15.0));
|
||||
}
|
||||
|
||||
public void testDocumentId() {
|
||||
assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
|
||||
}
|
||||
|
@ -138,6 +161,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
|
|||
protected boolean matchesSafely(TimingStats item) {
|
||||
return equalTo(operand.getJobId()).matches(item.getJobId())
|
||||
&& equalTo(operand.getBucketCount()).matches(item.getBucketCount())
|
||||
&& closeTo(operand.getTotalBucketProcessingTimeMs(), error).matches(item.getTotalBucketProcessingTimeMs())
|
||||
&& closeTo(operand.getMinBucketProcessingTimeMs(), error).matches(item.getMinBucketProcessingTimeMs())
|
||||
&& closeTo(operand.getMaxBucketProcessingTimeMs(), error).matches(item.getMaxBucketProcessingTimeMs())
|
||||
&& closeTo(operand.getAvgBucketProcessingTimeMs(), error).matches(item.getAvgBucketProcessingTimeMs())
|
||||
|
|
|
@ -250,7 +250,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
datafeed,
|
||||
job,
|
||||
xContentRegistry,
|
||||
// Creating fake {@link TimingStatsReporter} so that search API call is not needed.
|
||||
// Creating fake DatafeedTimingStatsReporter so that search API call is not needed.
|
||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
|
||||
ActionListener.wrap(
|
||||
unused ->
|
||||
|
|
|
@ -61,6 +61,7 @@ class DatafeedJob {
|
|||
private final long queryDelayMs;
|
||||
private final Client client;
|
||||
private final DataExtractorFactory dataExtractorFactory;
|
||||
private final DatafeedTimingStatsReporter timingStatsReporter;
|
||||
private final Supplier<Long> currentTimeSupplier;
|
||||
private final DelayedDataDetector delayedDataDetector;
|
||||
|
||||
|
@ -74,13 +75,15 @@ class DatafeedJob {
|
|||
private volatile boolean isIsolated;
|
||||
|
||||
DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs,
|
||||
DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
|
||||
DelayedDataDetector delayedDataDetector, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
|
||||
DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client,
|
||||
Auditor auditor, Supplier<Long> currentTimeSupplier, DelayedDataDetector delayedDataDetector,
|
||||
long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
|
||||
this.jobId = jobId;
|
||||
this.dataDescription = Objects.requireNonNull(dataDescription);
|
||||
this.frequencyMs = frequencyMs;
|
||||
this.queryDelayMs = queryDelayMs;
|
||||
this.dataExtractorFactory = dataExtractorFactory;
|
||||
this.timingStatsReporter = timingStatsReporter;
|
||||
this.client = client;
|
||||
this.auditor = auditor;
|
||||
this.currentTimeSupplier = currentTimeSupplier;
|
||||
|
@ -350,6 +353,7 @@ class DatafeedJob {
|
|||
try (InputStream in = extractedData.get()) {
|
||||
counts = postData(in, XContentType.JSON);
|
||||
LOGGER.trace("[{}] Processed another {} records", jobId, counts.getProcessedRecordCount());
|
||||
timingStatsReporter.reportDataCounts(counts);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
|
@ -69,10 +69,20 @@ public class DatafeedJobBuilder {
|
|||
TimeValue queryDelay = datafeedConfigHolder.get().getQueryDelay();
|
||||
DelayedDataDetector delayedDataDetector =
|
||||
DelayedDataDetectorFactory.buildDetector(jobHolder.get(), datafeedConfigHolder.get(), client, xContentRegistry);
|
||||
DatafeedJob datafeedJob = new DatafeedJob(jobHolder.get().getId(), buildDataDescription(jobHolder.get()),
|
||||
frequency.millis(), queryDelay.millis(),
|
||||
context.dataExtractorFactory, client, auditor, currentTimeSupplier, delayedDataDetector,
|
||||
context.latestFinalBucketEndMs, context.latestRecordTimeMs);
|
||||
DatafeedJob datafeedJob =
|
||||
new DatafeedJob(
|
||||
jobHolder.get().getId(),
|
||||
buildDataDescription(jobHolder.get()),
|
||||
frequency.millis(),
|
||||
queryDelay.millis(),
|
||||
context.dataExtractorFactory,
|
||||
context.timingStatsReporter,
|
||||
client,
|
||||
auditor,
|
||||
currentTimeSupplier,
|
||||
delayedDataDetector,
|
||||
context.latestFinalBucketEndMs,
|
||||
context.latestRecordTimeMs);
|
||||
|
||||
listener.onResponse(datafeedJob);
|
||||
};
|
||||
|
@ -92,12 +102,13 @@ public class DatafeedJobBuilder {
|
|||
|
||||
// Create data extractor factory
|
||||
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
|
||||
context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
|
||||
DataExtractorFactory.create(
|
||||
client,
|
||||
datafeedConfigHolder.get(),
|
||||
jobHolder.get(),
|
||||
xContentRegistry,
|
||||
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
|
||||
context.timingStatsReporter,
|
||||
dataExtractorFactoryHandler);
|
||||
};
|
||||
|
||||
|
@ -189,5 +200,6 @@ public class DatafeedJobBuilder {
|
|||
volatile long latestFinalBucketEndMs = -1L;
|
||||
volatile long latestRecordTimeMs = -1L;
|
||||
volatile DataExtractorFactory dataExtractorFactory;
|
||||
volatile DatafeedTimingStatsReporter timingStatsReporter;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed;
|
|||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
|
||||
import java.util.Objects;
|
||||
|
@ -46,33 +47,56 @@ public class DatafeedTimingStatsReporter {
|
|||
return;
|
||||
}
|
||||
currentTimingStats.incrementTotalSearchTimeMs(searchDuration.millis());
|
||||
flushIfDifferSignificantly();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports the data counts received from the autodetect process.
|
||||
*/
|
||||
public void reportDataCounts(DataCounts dataCounts) {
|
||||
if (dataCounts == null) {
|
||||
return;
|
||||
}
|
||||
currentTimingStats.setBucketCount(dataCounts.getBucketCount());
|
||||
flushIfDifferSignificantly();
|
||||
}
|
||||
|
||||
private void flushIfDifferSignificantly() {
|
||||
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
|
||||
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
|
||||
flush(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
|
||||
private void flush() {
|
||||
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
|
||||
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
|
||||
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
|
||||
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
|
||||
*/
|
||||
public static boolean differSignificantly(DatafeedTimingStats stats1, DatafeedTimingStats stats2) {
|
||||
return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs());
|
||||
return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs())
|
||||
|| differSignificantly(stats1.getAvgSearchTimePerBucketMs(), stats2.getAvgSearchTimePerBucketMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
|
||||
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO or
|
||||
* the absolute difference |value1 - value2| is greater than MAX_VALID_ABS_DIFFERENCE_MS.
|
||||
* This can be interpreted as values { value1, value2 } differing significantly from each other.
|
||||
* This method also returns:
|
||||
* - {@code true} in case one value is {@code null} while the other is not.
|
||||
* - {@code false} in case both values are {@code null}.
|
||||
*/
|
||||
private static boolean differSignificantly(double value1, double value2) {
|
||||
private static boolean differSignificantly(Double value1, Double value2) {
|
||||
if (value1 != null && value2 != null) {
|
||||
return (value2 / value1 < MIN_VALID_RATIO)
|
||||
|| (value1 / value2 < MIN_VALID_RATIO)
|
||||
|| Math.abs(value1 - value2) > MAX_VALID_ABS_DIFFERENCE_MS;
|
||||
}
|
||||
return (value1 != null) || (value2 != null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimum ratio of values that is interpreted as values being similar.
|
||||
|
|
|
@ -73,10 +73,11 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
private Auditor auditor;
|
||||
private DataExtractorFactory dataExtractorFactory;
|
||||
private DataExtractor dataExtractor;
|
||||
private DatafeedTimingStatsReporter timingStatsReporter;
|
||||
private Client client;
|
||||
private DelayedDataDetector delayedDataDetector;
|
||||
private DataDescription.Builder dataDescription;
|
||||
ActionFuture<PostDataAction.Response> postDataFuture;
|
||||
private ActionFuture<PostDataAction.Response> postDataFuture;
|
||||
private ActionFuture<FlushJobAction.Response> flushJobFuture;
|
||||
private ActionFuture<IndexResponse> indexFuture;
|
||||
private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
|
||||
|
@ -93,6 +94,7 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
dataExtractorFactory = mock(DataExtractorFactory.class);
|
||||
dataExtractor = mock(DataExtractor.class);
|
||||
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
|
||||
timingStatsReporter = mock(DatafeedTimingStatsReporter.class);
|
||||
client = mock(Client.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
|
@ -455,7 +457,7 @@ public class DatafeedJobTests extends ESTestCase {
|
|||
private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
|
||||
long latestRecordTimeMs) {
|
||||
Supplier<Long> currentTimeSupplier = () -> currentTime;
|
||||
return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
|
||||
currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs);
|
||||
return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter,
|
||||
client, auditor, currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.junit.Before;
|
||||
import org.mockito.InOrder;
|
||||
|
@ -18,6 +19,7 @@ import static org.hamcrest.Matchers.is;
|
|||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
public class DatafeedTimingStatsReporterTests extends ESTestCase {
|
||||
|
||||
|
@ -31,59 +33,106 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
|
|||
jobResultsPersister = mock(JobResultsPersister.class);
|
||||
}
|
||||
|
||||
public void testReportSearchDuration_Null() {
|
||||
DatafeedTimingStatsReporter timingStatsReporter =
|
||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
|
||||
|
||||
timingStatsReporter.reportSearchDuration(null);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
|
||||
|
||||
verifyZeroInteractions(jobResultsPersister);
|
||||
}
|
||||
|
||||
public void testReportSearchDuration() {
|
||||
DatafeedTimingStatsReporter timingStatsReporter =
|
||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10000.0), jobResultsPersister);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10000.0)));
|
||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
|
||||
|
||||
timingStatsReporter.reportSearchDuration(ONE_SECOND);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 11000.0)));
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 10, 11000.0)));
|
||||
|
||||
timingStatsReporter.reportSearchDuration(ONE_SECOND);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 12000.0)));
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 10, 12000.0)));
|
||||
|
||||
timingStatsReporter.reportSearchDuration(ONE_SECOND);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 13000.0)));
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 10, 13000.0)));
|
||||
|
||||
timingStatsReporter.reportSearchDuration(ONE_SECOND);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 14000.0)));
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 10, 14000.0)));
|
||||
|
||||
InOrder inOrder = inOrder(jobResultsPersister);
|
||||
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 12000.0), RefreshPolicy.IMMEDIATE);
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 12000.0), RefreshPolicy.IMMEDIATE);
|
||||
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
|
||||
new DatafeedTimingStats(JOB_ID, 7, 14000.0), RefreshPolicy.IMMEDIATE);
|
||||
new DatafeedTimingStats(JOB_ID, 7, 10, 14000.0), RefreshPolicy.IMMEDIATE);
|
||||
verifyNoMoreInteractions(jobResultsPersister);
|
||||
}
|
||||
|
||||
public void testReportDataCounts_Null() {
|
||||
DatafeedTimingStatsReporter timingStatsReporter =
|
||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
|
||||
|
||||
timingStatsReporter.reportDataCounts(null);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
|
||||
|
||||
verifyZeroInteractions(jobResultsPersister);
|
||||
}
|
||||
|
||||
public void testReportDataCounts() {
|
||||
DataCounts dataCounts = new DataCounts(JOB_ID);
|
||||
dataCounts.incrementBucketCount(20);
|
||||
DatafeedTimingStatsReporter timingStatsReporter =
|
||||
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, dataCounts.getBucketCount(), 10000.0), jobResultsPersister);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0)));
|
||||
|
||||
dataCounts.incrementBucketCount(1);
|
||||
timingStatsReporter.reportDataCounts(dataCounts);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 21, 10000.0)));
|
||||
|
||||
dataCounts.incrementBucketCount(1);
|
||||
timingStatsReporter.reportDataCounts(dataCounts);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 22, 10000.0)));
|
||||
|
||||
dataCounts.incrementBucketCount(1);
|
||||
timingStatsReporter.reportDataCounts(dataCounts);
|
||||
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0)));
|
||||
|
||||
InOrder inOrder = inOrder(jobResultsPersister);
|
||||
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
|
||||
new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE);
|
||||
verifyNoMoreInteractions(jobResultsPersister);
|
||||
}
|
||||
|
||||
public void testTimingStatsDifferSignificantly() {
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1000.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0)),
|
||||
is(false));
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1100.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1100.0)),
|
||||
is(false));
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1120.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1120.0)),
|
||||
is(true));
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11000.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11000.0)),
|
||||
is(false));
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11200.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11200.0)),
|
||||
is(true));
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110000.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110000.0)),
|
||||
is(false));
|
||||
assertThat(
|
||||
DatafeedTimingStatsReporter.differSignificantly(
|
||||
new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110001.0)),
|
||||
new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110001.0)),
|
||||
is(true));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
.when(client).index(any(), any(ActionListener.class));
|
||||
|
||||
JobResultsPersister persister = new JobResultsPersister(client);
|
||||
DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 666.0);
|
||||
DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 66, 666.0);
|
||||
persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
|
||||
ArgumentCaptor<IndexRequest> indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
|
||||
|
@ -256,6 +256,7 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
Map<String, Object> expectedSourceAsMap = new HashMap<>();
|
||||
expectedSourceAsMap.put("job_id", "foo");
|
||||
expectedSourceAsMap.put("search_count", 6);
|
||||
expectedSourceAsMap.put("bucket_count", 66);
|
||||
expectedSourceAsMap.put("total_search_time_ms", 666.0);
|
||||
assertThat(indexRequest.sourceAsMap(), equalTo(expectedSourceAsMap));
|
||||
|
||||
|
|
|
@ -902,10 +902,12 @@ public class JobResultsProviderTests extends ESTestCase {
|
|||
Map<String, Object> sourceFooMap = new HashMap<>();
|
||||
sourceFooMap.put(Job.ID.getPreferredName(), "foo");
|
||||
sourceFooMap.put(DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6);
|
||||
sourceFooMap.put(DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66);
|
||||
sourceFooMap.put(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0);
|
||||
Map<String, Object> sourceBarMap = new HashMap<>();
|
||||
sourceBarMap.put(Job.ID.getPreferredName(), "bar");
|
||||
sourceBarMap.put(DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 7);
|
||||
sourceBarMap.put(DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 77);
|
||||
sourceBarMap.put(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 777.0);
|
||||
|
||||
List<Map<String, Object>> sourceFoo = Arrays.asList(sourceFooMap);
|
||||
|
@ -939,8 +941,8 @@ public class JobResultsProviderTests extends ESTestCase {
|
|||
new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(AnomalyDetectorsIndex.jobResultsAliasedName("bar")));
|
||||
|
||||
Map<String, DatafeedTimingStats> expectedStatsByJobId = new HashMap<>();
|
||||
expectedStatsByJobId.put("foo", new DatafeedTimingStats("foo", 6, 666.0));
|
||||
expectedStatsByJobId.put("bar", new DatafeedTimingStats("bar", 7, 777.0));
|
||||
expectedStatsByJobId.put("foo", new DatafeedTimingStats("foo", 6, 66, 666.0));
|
||||
expectedStatsByJobId.put("bar", new DatafeedTimingStats("bar", 7, 77, 777.0));
|
||||
JobResultsProvider provider = createProvider(client);
|
||||
provider.datafeedTimingStats(
|
||||
Arrays.asList("foo", "bar"),
|
||||
|
@ -960,6 +962,7 @@ public class JobResultsProviderTests extends ESTestCase {
|
|||
Map<String, Object> sourceFooMap = new HashMap<>();
|
||||
sourceFooMap.put(Job.ID.getPreferredName(), "foo");
|
||||
sourceFooMap.put(DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6);
|
||||
sourceFooMap.put(DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66);
|
||||
sourceFooMap.put(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0);
|
||||
List<Map<String, Object>> source = Arrays.asList(sourceFooMap);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
|
@ -971,7 +974,7 @@ public class JobResultsProviderTests extends ESTestCase {
|
|||
JobResultsProvider provider = createProvider(client);
|
||||
provider.datafeedTimingStats(
|
||||
"foo",
|
||||
stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 666.0))),
|
||||
stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 66, 666.0))),
|
||||
e -> { throw new AssertionError(); });
|
||||
|
||||
verify(client).prepareSearch(indexName);
|
||||
|
|
|
@ -175,6 +175,7 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
|
|||
+ "\"timing_stats\":{"
|
||||
+ "\"job_id\":\"_job_id\","
|
||||
+ "\"bucket_count\":100,"
|
||||
+ "\"total_bucket_processing_time_ms\":2000.0,"
|
||||
+ "\"minimum_bucket_processing_time_ms\":10.0,"
|
||||
+ "\"maximum_bucket_processing_time_ms\":30.0,"
|
||||
+ "\"average_bucket_processing_time_ms\":20.0,"
|
||||
|
|
|
@ -175,6 +175,7 @@ setup:
|
|||
ml.start_datafeed:
|
||||
datafeed_id: "datafeed-1"
|
||||
start: 0
|
||||
- match: { started: true}
|
||||
|
||||
- do:
|
||||
ml.get_datafeed_stats:
|
||||
|
@ -183,7 +184,9 @@ setup:
|
|||
- match: { datafeeds.0.state: "started"}
|
||||
- match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
|
||||
- match: { datafeeds.0.timing_stats.search_count: 0}
|
||||
- match: { datafeeds.0.timing_stats.bucket_count: 0}
|
||||
- match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
|
||||
- is_false: datafeeds.0.timing_stats.average_search_time_per_bucket_ms
|
||||
|
||||
- do:
|
||||
ml.stop_datafeed:
|
||||
|
@ -196,8 +199,9 @@ setup:
|
|||
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
|
||||
- match: { datafeeds.0.state: "stopped"}
|
||||
- match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
|
||||
# TODO: Change "gte 0" to "match 1" once https://github.com/elastic/elasticsearch/issues/44132 is fixed
|
||||
# We don't really know at this point if datafeed managed to perform at least one search, hence the very relaxed assertion
|
||||
- gte: { datafeeds.0.timing_stats.search_count: 0}
|
||||
- gte: { datafeeds.0.timing_stats.bucket_count: 0}
|
||||
- gte: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
|
||||
|
||||
---
|
||||
|
|
|
@ -101,6 +101,7 @@ setup:
|
|||
- is_true: jobs.0.open_time
|
||||
- match: { jobs.0.timing_stats.job_id: job-stats-test }
|
||||
- match: { jobs.0.timing_stats.bucket_count: 1 } # Records are 1h apart and bucket span is 1h so 1 bucket is produced
|
||||
- gte: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
|
||||
- gte: { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
|
||||
- gte: { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
|
||||
- gte: { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
|
||||
|
@ -140,6 +141,7 @@ setup:
|
|||
- is_false: jobs.0.open_time
|
||||
- match: { jobs.0.timing_stats.job_id: job-stats-test }
|
||||
- match: { jobs.0.timing_stats.bucket_count: 1 } # Records are 1h apart and bucket span is 1h so 1 bucket is produced
|
||||
- gte: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
|
||||
- gte: { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
|
||||
- gte: { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
|
||||
- gte: { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
|
||||
|
@ -158,6 +160,7 @@ setup:
|
|||
- is_true: jobs.0.open_time
|
||||
- match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
|
||||
- match: { jobs.0.timing_stats.bucket_count: 0 }
|
||||
- match: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
|
||||
- is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
|
||||
- is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
|
||||
- is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
|
||||
|
@ -342,6 +345,7 @@ setup:
|
|||
- is_false: jobs.0.open_time
|
||||
- match: { jobs.0.timing_stats.job_id: job-stats-test }
|
||||
- match: { jobs.0.timing_stats.bucket_count: 0 }
|
||||
- match: { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
|
||||
- is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
|
||||
- is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
|
||||
- is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
|
||||
|
@ -356,6 +360,7 @@ setup:
|
|||
- is_false: jobs.1.open_time
|
||||
- match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
|
||||
- match: { jobs.1.timing_stats.bucket_count: 0 }
|
||||
- match: { jobs.1.timing_stats.total_bucket_processing_time_ms: 0.0 }
|
||||
- is_false: jobs.1.timing_stats.minimum_bucket_processing_time_ms
|
||||
- is_false: jobs.1.timing_stats.maximum_bucket_processing_time_ms
|
||||
- is_false: jobs.1.timing_stats.average_bucket_processing_time_ms
|
||||
|
|
Loading…
Reference in New Issue