diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java index df2fce15130..96c8495ecca 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -47,6 +48,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasks; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -157,20 +159,24 @@ public class GetJobsStatsAction extends Action(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); } else { listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD)); @@ -432,7 +449,7 @@ public class GetJobsStatsAction extends Action results = response.getResponse().results(); results.addAll(jobStats.asList().stream() @@ -454,6 +471,14 @@ public class GetJobsStatsAction extends Action duration) { + if (duration.isPresent()) { + return TimeValue.timeValueSeconds(duration.get().getSeconds()); + } else { + return null; + } + } + static List determineJobIdsWithoutLiveStats(List requestedJobIds, List stats) { Set excludeJobIds = stats.stream().map(Response.JobStats::getJobId).collect(Collectors.toSet()); return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index d29adf66d5a..61087f44d8f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -460,6 +460,9 @@ public class ElasticsearchMappings { .startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() + .startObject(DataCounts.LAST_DATA_TIME.getPreferredName()) + .field(TYPE, DATE) + .endObject() .endObject() .endObject() .endObject(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 24c8453032a..cba69a3d20d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -255,6 +255,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { * Report the counts now regardless of whether or not we are at a reporting boundary. */ public void finishReporting() { + Date now = new Date(); + incrementalRecordStats.setLastDataTimeStamp(now); + totalRecordStats.setLastDataTimeStamp(now); dataCountsPersister.persistDataCounts(jobId, runningTotalStats(), new LoggingActionListener()); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index f547f9dc1e5..0c122bb876b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -300,12 +300,12 @@ public class AutodetectProcessManager extends AbstractComponent { return autoDetectCommunicatorByJob.get(jobId) != null; } - public Duration jobUpTime(String jobId) { + public Optional jobOpenTime(String jobId) { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - return Duration.ZERO; + return Optional.empty(); } - return Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()); + return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now())); } private void setJobState(long taskId, String jobId, JobState state) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java index 784779cb30d..4ddb26f0680 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java @@ -47,6 +47,7 @@ public class DataCounts extends ToXContentToBytes implements Writeable { public static final String OUT_OF_ORDER_TIME_COUNT_STR = "out_of_order_timestamp_count"; public static final String EARLIEST_RECORD_TIME_STR = "earliest_record_timestamp"; public static final String LATEST_RECORD_TIME_STR = "latest_record_timestamp"; + public static final String LAST_DATA_TIME_STR = "last_data_time"; public static final ParseField PROCESSED_RECORD_COUNT = new ParseField(PROCESSED_RECORD_COUNT_STR); public static final ParseField PROCESSED_FIELD_COUNT = new ParseField(PROCESSED_FIELD_COUNT_STR); @@ -58,12 +59,13 @@ public class DataCounts extends ToXContentToBytes implements Writeable { public static final ParseField OUT_OF_ORDER_TIME_COUNT = new ParseField(OUT_OF_ORDER_TIME_COUNT_STR); public static final ParseField EARLIEST_RECORD_TIME = new ParseField(EARLIEST_RECORD_TIME_STR); public static final ParseField LATEST_RECORD_TIME = new ParseField(LATEST_RECORD_TIME_STR); + public static final ParseField LAST_DATA_TIME = new ParseField(LAST_DATA_TIME_STR); public static final ParseField TYPE = new ParseField("data_counts"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_counts", a -> new DataCounts((String) a[0], (long) a[1], (long) a[2], (long) a[3], - (long) a[4], (long) a[5], (long) a[6], (long) a[7], (Date) a[8], (Date) a[9])); + (long) a[4], (long) a[5], (long) a[6], (long) a[7], (Date) a[8], (Date) a[9], (Date) a[10])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); @@ -92,6 +94,15 @@ public class DataCounts extends ToXContentToBytes implements Writeable { throw new IllegalArgumentException( "unexpected token [" + p.currentToken() + "] for [" + LATEST_RECORD_TIME.getPreferredName() + "]"); }, LATEST_RECORD_TIME, ValueType.VALUE); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == Token.VALUE_NUMBER) { + return new Date(p.longValue()); + } else if (p.currentToken() == Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(p.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]"); + }, LAST_DATA_TIME, ValueType.VALUE); PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT); } @@ -110,10 +121,11 @@ public class DataCounts extends ToXContentToBytes implements Writeable { // NORELEASE: Use Jodatime instead private Date earliestRecordTimeStamp; private Date latestRecordTimeStamp; + private Date lastDataTimeStamp; public DataCounts(String jobId, long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, - Date earliestRecordTimeStamp, Date latestRecordTimeStamp) { + Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp) { this.jobId = jobId; this.processedRecordCount = processedRecordCount; this.processedFieldCount = processedFieldCount; @@ -124,6 +136,7 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.outOfOrderTimeStampCount = outOfOrderTimeStampCount; this.latestRecordTimeStamp = latestRecordTimeStamp; this.earliestRecordTimeStamp = earliestRecordTimeStamp; + this.lastDataTimeStamp = lastDataTimeStamp; } public DataCounts(String jobId) { @@ -141,6 +154,7 @@ public class DataCounts extends ToXContentToBytes implements Writeable { outOfOrderTimeStampCount = lhs.outOfOrderTimeStampCount; latestRecordTimeStamp = lhs.latestRecordTimeStamp; earliestRecordTimeStamp = lhs.earliestRecordTimeStamp; + lastDataTimeStamp = lhs.lastDataTimeStamp; } public DataCounts(StreamInput in) throws IOException { @@ -158,6 +172,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable { if (in.readBoolean()) { earliestRecordTimeStamp = new Date(in.readVLong()); } + if (in.readBoolean()) { + lastDataTimeStamp = new Date(in.readVLong()); + } in.readVLong(); // throw away inputRecordCount } @@ -327,6 +344,19 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.latestRecordTimeStamp = latestRecordTimeStamp; } + /** + * The wall clock time the latest record was seen. + * + * @return Wall clock time of the lastest record + */ + public Date getLastDataTimeStamp() { + return lastDataTimeStamp; + } + + public void setLastDataTimeStamp(Date lastDataTimeStamp) { + this.lastDataTimeStamp = lastDataTimeStamp; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); @@ -349,6 +379,12 @@ public class DataCounts extends ToXContentToBytes implements Writeable { } else { out.writeBoolean(false); } + if (lastDataTimeStamp != null) { + out.writeBoolean(true); + out.writeVLong(lastDataTimeStamp.getTime()); + } else { + out.writeBoolean(false); + } out.writeVLong(getInputRecordCount()); } @@ -377,6 +413,10 @@ public class DataCounts extends ToXContentToBytes implements Writeable { builder.dateField(LATEST_RECORD_TIME.getPreferredName(), LATEST_RECORD_TIME.getPreferredName() + "_string", latestRecordTimeStamp.getTime()); } + if (lastDataTimeStamp != null) { + builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + "_string", + lastDataTimeStamp.getTime()); + } builder.field(INPUT_RECORD_COUNT.getPreferredName(), getInputRecordCount()); return builder; @@ -406,7 +446,8 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.missingFieldCount == that.missingFieldCount && this.outOfOrderTimeStampCount == that.outOfOrderTimeStampCount && Objects.equals(this.latestRecordTimeStamp, that.latestRecordTimeStamp) && - Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp); + Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp) && + Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp); } @@ -414,6 +455,6 @@ public class DataCounts extends ToXContentToBytes implements Writeable { public int hashCode() { return Objects.hash(jobId, processedRecordCount, processedFieldCount, inputBytes, inputFieldCount, invalidDateCount, missingFieldCount, - outOfOrderTimeStampCount, latestRecordTimeStamp, earliestRecordTimeStamp); + outOfOrderTimeStampCount, latestRecordTimeStamp, earliestRecordTimeStamp, lastDataTimeStamp); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java index 6ce95838814..1721e490cff 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSizeStats.java @@ -184,9 +184,9 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { builder.field(TOTAL_PARTITION_FIELD_COUNT_FIELD.getPreferredName(), totalPartitionFieldCount); builder.field(BUCKET_ALLOCATION_FAILURES_COUNT_FIELD.getPreferredName(), bucketAllocationFailuresCount); builder.field(MEMORY_STATUS_FIELD.getPreferredName(), memoryStatus); - builder.field(LOG_TIME_FIELD.getPreferredName(), logTime.getTime()); + builder.dateField(LOG_TIME_FIELD.getPreferredName(), LOG_TIME_FIELD.getPreferredName() + "_string", logTime.getTime()); if (timestamp != null) { - builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp.getTime()); + builder.dateField(TIMESTAMP_FIELD.getPreferredName(), TIMESTAMP_FIELD.getPreferredName() + "_string", timestamp.getTime()); } return builder; @@ -220,10 +220,18 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable { return memoryStatus; } + /** + * The timestamp of the last processed record when this instance was created. + * @return The record time + */ public Date getTimestamp() { return timestamp; } + /** + * The wall clock time at the point when this instance was created. + * @return The wall clock time + */ public Date getLogTime() { return logTime; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java index a110663dd4f..fdeabd87e7a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/ModelSnapshot.java @@ -163,7 +163,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); if (timestamp != null) { - builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); + builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); } if (description != null) { builder.field(DESCRIPTION.getPreferredName(), description); @@ -176,10 +176,12 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable { builder.field(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), modelSizeStats); } if (latestRecordTimeStamp != null) { - builder.field(LATEST_RECORD_TIME.getPreferredName(), latestRecordTimeStamp.getTime()); + builder.dateField(LATEST_RECORD_TIME.getPreferredName(), LATEST_RECORD_TIME.getPreferredName() + "_string", + latestRecordTimeStamp.getTime()); } if (latestResultTimeStamp != null) { - builder.field(LATEST_RESULT_TIME.getPreferredName(), latestResultTimeStamp.getTime()); + builder.dateField(LATEST_RESULT_TIME.getPreferredName(), LATEST_RESULT_TIME.getPreferredName() + "_string", + latestResultTimeStamp.getTime()); } if (quantiles != null) { builder.field(Quantiles.TYPE.getPreferredName(), quantiles); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index f3bbca2b348..27d2daeacaa 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -108,6 +108,7 @@ public final class ReservedFieldNames { DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(), DataCounts.LATEST_RECORD_TIME.getPreferredName(), DataCounts.EARLIEST_RECORD_TIME.getPreferredName(), + DataCounts.LAST_DATA_TIME.getPreferredName(), Influence.INFLUENCER_FIELD_NAME.getPreferredName(), Influence.INFLUENCER_FIELD_VALUES.getPreferredName(), diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java index 966e33a84e2..a63727bc449 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/GetJobStatsActionResponseTests.java @@ -8,20 +8,23 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction.Response; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; -import org.joda.time.DateTime; import java.net.InetAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; + public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase { @Override @@ -33,11 +36,7 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase handler = mockConsumer(); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); @@ -209,7 +209,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); - DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); @@ -263,7 +263,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); when(dataExtractor.next()).thenReturn(Optional.of(in)); - DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 4d1a9671064..0a85e166677 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -63,7 +63,7 @@ public class DatafeedJobTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); - DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0)); + DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0)); PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); expectedRequest.setDataDescription(dataDescription.build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index f80f84b09ba..5da8415b197 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -73,7 +73,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testComplexConstructor() throws Exception { - DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date()); + DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date(), new Date()); try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) { @@ -249,13 +249,21 @@ public class DataCountsReporterTests extends ESTestCase { dataCountsReporter.setAnalysedFieldsPerRecord(3); - DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000)); + Date now = new Date(); + DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000), now); dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 3000); dataCountsReporter.reportMissingField(); dataCountsReporter.finishReporting(); + long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime(); + // check last data time is equal to now give or take a second + assertTrue(lastReportedTimeMs >= now.getTime() && lastReportedTimeMs <= now.getTime() +1); + assertEquals(dataCountsReporter.incrementalStats().getLastDataTimeStamp(), + dataCountsReporter.runningTotalStats().getLastDataTimeStamp()); + Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc), any()); + dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); assertEquals(dc, dataCountsReporter.incrementalStats()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java index df0e44089b1..a3ebec4f190 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCountsTests.java @@ -17,11 +17,12 @@ import static org.hamcrest.Matchers.greaterThan; public class DataCountsTests extends AbstractSerializingTestCase { @Override - protected DataCounts createTestInstance() { + public DataCounts createTestInstance() { return new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), - new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate()); + new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(), + new DateTime(randomDateTimeZone()).toDate()); } @Override @@ -35,22 +36,22 @@ public class DataCountsTests extends AbstractSerializingTestCase { } public void testCountsEquals_GivenEqualCounts() { - DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9); - DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9); + DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); assertTrue(counts1.equals(counts2)); assertTrue(counts2.equals(counts1)); } public void testCountsHashCode_GivenEqualCounts() { - DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9); - DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9); + DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); assertEquals(counts1.hashCode(), counts2.hashCode()); } public void testCountsCopyConstructor() { - DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9); + DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataCounts counts2 = new DataCounts(counts1); assertEquals(counts1.hashCode(), counts2.hashCode()); @@ -62,7 +63,7 @@ public class DataCountsTests extends AbstractSerializingTestCase { } public void testCountCopyCreatedFieldsNotZero() throws Exception { - DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 1479211200000L, 1479384000000L); + DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 1479211200000L, 1479384000000L, 1488282343000L); assertAllFieldsGreaterThanZero(counts1); DataCounts counts2 = new DataCounts(counts1); @@ -101,19 +102,19 @@ public class DataCountsTests extends AbstractSerializingTestCase { } public void testCalcProcessedFieldCount() { - DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date()); + DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date(), new Date()); counts.calcProcessedFieldCount(3); assertEquals(30, counts.getProcessedFieldCount()); - counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, new Date(), new Date()); + counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, new Date(), new Date(), new Date()); counts.calcProcessedFieldCount(3); assertEquals(25, counts.getProcessedFieldCount()); } public void testEquals() { DataCounts counts1 = new DataCounts( - randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, new Date(), new Date(1435000000L)); + randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, new Date(), new Date(1435000000L), new Date(10L)); DataCounts counts2 = new DataCounts(counts1); assertEquals(counts1, counts2); @@ -155,11 +156,12 @@ public class DataCountsTests extends AbstractSerializingTestCase { private static DataCounts createCounts( long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount, - long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long earliestRecordTime, long latestRecordTime) { + long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long earliestRecordTime, long latestRecordTime, + long lastDataTime) { DataCounts counts = new DataCounts("foo", processedRecordCount, processedFieldCount, inputBytes, inputFieldCount, invalidDateCount, missingFieldCount, outOfOrderTimeStampCount, - new Date(earliestRecordTime), new Date(latestRecordTime)); + new Date(earliestRecordTime), new Date(latestRecordTime), new Date(lastDataTime)); return counts; } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml index 572ed0307e8..6000b19f0c6 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml @@ -85,6 +85,7 @@ setup: - is_true: jobs.0.node.name - is_true: jobs.0.node.transport_address - match: { jobs.0.node.attributes.max_running_jobs: "10"} + - gte: { jobs.0.open_time: 0} --- "Test get job stats for closed job": @@ -121,9 +122,10 @@ setup: - gt: { jobs.0.model_size_stats.model_bytes: 0 } - match: { jobs.0.state: closed } - is_false: jobs.0.node + - is_false: jobs.0.open_time --- -"Test get job stats of datafeed job that has not received and data": +"Test get job stats of datafeed job that has not received any data": - do: xpack.ml.get_job_stats: @@ -132,6 +134,7 @@ setup: - match: { jobs.0.data_counts.processed_record_count: 0 } - match: { jobs.0.model_size_stats.model_bytes : 0 } - match: { jobs.0.state: opened } + - gte: { jobs.0.open_time: 0} --- "Test get all job stats explicitly":