[ML] Restore missing job stats (elastic/x-pack-elasticsearch#667)

* [ML] Pretty print dates in ModelSizeStats
* [ML] Add last_data_time field to DataCounts
* [ML] Add uptime to job stats
* [ML] Pretty print time fields in ModelSnapshot
* [ML] Rename uptime -> open_time

Original commit: elastic/x-pack-elasticsearch@4ce5258a77
This commit is contained in:
David Kyle 2017-03-02 15:54:20 +00:00 committed by GitHub
parent 948a8594bb
commit 14a0167781
16 changed files with 163 additions and 55 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -47,6 +48,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -157,20 +159,24 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
private DataCounts dataCounts;
@Nullable
private ModelSizeStats modelSizeStats;
@Nullable
private TimeValue openTime;
private JobState state;
@Nullable
private DiscoveryNode node;
@Nullable
private String assignmentExplanation;
JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.state = Objects.requireNonNull(state);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
this.openTime = opentime;
}
JobStats(StreamInput in) throws IOException {
@ -180,6 +186,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
state = JobState.fromStream(in);
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
openTime = in.readOptionalWriteable(TimeValue::new);
}
public String getJobId() {
@ -206,6 +213,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
return assignmentExplanation;
}
public TimeValue getOpenTime() {
return openTime;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -232,6 +243,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
if (assignmentExplanation != null) {
builder.field("assigment_explanation", assignmentExplanation);
}
if (openTime != null) {
builder.timeValueField("open_time", "open_time_string", openTime);
}
builder.endObject();
return builder;
}
@ -244,11 +258,12 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
state.writeTo(out);
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
out.writeOptionalWriteable(openTime);
}
@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation);
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime);
}
@Override
@ -265,7 +280,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)
&& Objects.equals(this.openTime, other.openTime);
}
}
@ -401,8 +417,9 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
JobState jobState = MlMetadata.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(jobId));
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState,
node, assignmentExplanation);
node, assignmentExplanation, openTime);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
} else {
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
@ -432,7 +449,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(slot, new Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null,
assignmentExplanation));
assignmentExplanation, null));
if (counter.decrementAndGet() == 0) {
List<Response.JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList().stream()
@ -454,6 +471,14 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
}, errorHandler);
}
static TimeValue durationToTimeValue(Optional<Duration> duration) {
if (duration.isPresent()) {
return TimeValue.timeValueSeconds(duration.get().getSeconds());
} else {
return null;
}
}
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds, List<Response.JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(Response.JobStats::getJobId).collect(Collectors.toSet());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());

View File

@ -460,6 +460,9 @@ public class ElasticsearchMappings {
.startObject(DataCounts.LATEST_RECORD_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(DataCounts.LAST_DATA_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.endObject()
.endObject()
.endObject();

View File

@ -255,6 +255,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
* Report the counts now regardless of whether or not we are at a reporting boundary.
*/
public void finishReporting() {
Date now = new Date();
incrementalRecordStats.setLastDataTimeStamp(now);
totalRecordStats.setLastDataTimeStamp(now);
dataCountsPersister.persistDataCounts(jobId, runningTotalStats(), new LoggingActionListener());
}

View File

@ -300,12 +300,12 @@ public class AutodetectProcessManager extends AbstractComponent {
return autoDetectCommunicatorByJob.get(jobId) != null;
}
public Duration jobUpTime(String jobId) {
public Optional<Duration> jobOpenTime(String jobId) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
return Duration.ZERO;
return Optional.empty();
}
return Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now());
return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()));
}
private void setJobState(long taskId, String jobId, JobState state) {

View File

@ -47,6 +47,7 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
public static final String OUT_OF_ORDER_TIME_COUNT_STR = "out_of_order_timestamp_count";
public static final String EARLIEST_RECORD_TIME_STR = "earliest_record_timestamp";
public static final String LATEST_RECORD_TIME_STR = "latest_record_timestamp";
public static final String LAST_DATA_TIME_STR = "last_data_time";
public static final ParseField PROCESSED_RECORD_COUNT = new ParseField(PROCESSED_RECORD_COUNT_STR);
public static final ParseField PROCESSED_FIELD_COUNT = new ParseField(PROCESSED_FIELD_COUNT_STR);
@ -58,12 +59,13 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
public static final ParseField OUT_OF_ORDER_TIME_COUNT = new ParseField(OUT_OF_ORDER_TIME_COUNT_STR);
public static final ParseField EARLIEST_RECORD_TIME = new ParseField(EARLIEST_RECORD_TIME_STR);
public static final ParseField LATEST_RECORD_TIME = new ParseField(LATEST_RECORD_TIME_STR);
public static final ParseField LAST_DATA_TIME = new ParseField(LAST_DATA_TIME_STR);
public static final ParseField TYPE = new ParseField("data_counts");
public static final ConstructingObjectParser<DataCounts, Void> PARSER =
new ConstructingObjectParser<>("data_counts", a -> new DataCounts((String) a[0], (long) a[1], (long) a[2], (long) a[3],
(long) a[4], (long) a[5], (long) a[6], (long) a[7], (Date) a[8], (Date) a[9]));
(long) a[4], (long) a[5], (long) a[6], (long) a[7], (Date) a[8], (Date) a[9], (Date) a[10]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
@ -92,6 +94,15 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LATEST_RECORD_TIME.getPreferredName() + "]");
}, LATEST_RECORD_TIME, ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LAST_DATA_TIME.getPreferredName() + "]");
}, LAST_DATA_TIME, ValueType.VALUE);
PARSER.declareLong((t, u) -> {;}, INPUT_RECORD_COUNT);
}
@ -110,10 +121,11 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
// NORELEASE: Use Jodatime instead
private Date earliestRecordTimeStamp;
private Date latestRecordTimeStamp;
private Date lastDataTimeStamp;
public DataCounts(String jobId, long processedRecordCount, long processedFieldCount, long inputBytes,
long inputFieldCount, long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount,
Date earliestRecordTimeStamp, Date latestRecordTimeStamp) {
Date earliestRecordTimeStamp, Date latestRecordTimeStamp, Date lastDataTimeStamp) {
this.jobId = jobId;
this.processedRecordCount = processedRecordCount;
this.processedFieldCount = processedFieldCount;
@ -124,6 +136,7 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.outOfOrderTimeStampCount = outOfOrderTimeStampCount;
this.latestRecordTimeStamp = latestRecordTimeStamp;
this.earliestRecordTimeStamp = earliestRecordTimeStamp;
this.lastDataTimeStamp = lastDataTimeStamp;
}
public DataCounts(String jobId) {
@ -141,6 +154,7 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
outOfOrderTimeStampCount = lhs.outOfOrderTimeStampCount;
latestRecordTimeStamp = lhs.latestRecordTimeStamp;
earliestRecordTimeStamp = lhs.earliestRecordTimeStamp;
lastDataTimeStamp = lhs.lastDataTimeStamp;
}
public DataCounts(StreamInput in) throws IOException {
@ -158,6 +172,9 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
if (in.readBoolean()) {
earliestRecordTimeStamp = new Date(in.readVLong());
}
if (in.readBoolean()) {
lastDataTimeStamp = new Date(in.readVLong());
}
in.readVLong(); // throw away inputRecordCount
}
@ -327,6 +344,19 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.latestRecordTimeStamp = latestRecordTimeStamp;
}
/**
* The wall clock time the latest record was seen.
*
* @return Wall clock time of the lastest record
*/
public Date getLastDataTimeStamp() {
return lastDataTimeStamp;
}
public void setLastDataTimeStamp(Date lastDataTimeStamp) {
this.lastDataTimeStamp = lastDataTimeStamp;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
@ -349,6 +379,12 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
} else {
out.writeBoolean(false);
}
if (lastDataTimeStamp != null) {
out.writeBoolean(true);
out.writeVLong(lastDataTimeStamp.getTime());
} else {
out.writeBoolean(false);
}
out.writeVLong(getInputRecordCount());
}
@ -377,6 +413,10 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
builder.dateField(LATEST_RECORD_TIME.getPreferredName(), LATEST_RECORD_TIME.getPreferredName() + "_string",
latestRecordTimeStamp.getTime());
}
if (lastDataTimeStamp != null) {
builder.dateField(LAST_DATA_TIME.getPreferredName(), LAST_DATA_TIME.getPreferredName() + "_string",
lastDataTimeStamp.getTime());
}
builder.field(INPUT_RECORD_COUNT.getPreferredName(), getInputRecordCount());
return builder;
@ -406,7 +446,8 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.missingFieldCount == that.missingFieldCount &&
this.outOfOrderTimeStampCount == that.outOfOrderTimeStampCount &&
Objects.equals(this.latestRecordTimeStamp, that.latestRecordTimeStamp) &&
Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp);
Objects.equals(this.earliestRecordTimeStamp, that.earliestRecordTimeStamp) &&
Objects.equals(this.lastDataTimeStamp, that.lastDataTimeStamp);
}
@ -414,6 +455,6 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
public int hashCode() {
return Objects.hash(jobId, processedRecordCount, processedFieldCount,
inputBytes, inputFieldCount, invalidDateCount, missingFieldCount,
outOfOrderTimeStampCount, latestRecordTimeStamp, earliestRecordTimeStamp);
outOfOrderTimeStampCount, latestRecordTimeStamp, earliestRecordTimeStamp, lastDataTimeStamp);
}
}

View File

@ -184,9 +184,9 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
builder.field(TOTAL_PARTITION_FIELD_COUNT_FIELD.getPreferredName(), totalPartitionFieldCount);
builder.field(BUCKET_ALLOCATION_FAILURES_COUNT_FIELD.getPreferredName(), bucketAllocationFailuresCount);
builder.field(MEMORY_STATUS_FIELD.getPreferredName(), memoryStatus);
builder.field(LOG_TIME_FIELD.getPreferredName(), logTime.getTime());
builder.dateField(LOG_TIME_FIELD.getPreferredName(), LOG_TIME_FIELD.getPreferredName() + "_string", logTime.getTime());
if (timestamp != null) {
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp.getTime());
builder.dateField(TIMESTAMP_FIELD.getPreferredName(), TIMESTAMP_FIELD.getPreferredName() + "_string", timestamp.getTime());
}
return builder;
@ -220,10 +220,18 @@ public class ModelSizeStats extends ToXContentToBytes implements Writeable {
return memoryStatus;
}
/**
* The timestamp of the last processed record when this instance was created.
* @return The record time
*/
public Date getTimestamp() {
return timestamp;
}
/**
* The wall clock time at the point when this instance was created.
* @return The wall clock time
*/
public Date getLogTime() {
return logTime;
}

View File

@ -163,7 +163,7 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
if (timestamp != null) {
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
}
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
@ -176,10 +176,12 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
builder.field(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), modelSizeStats);
}
if (latestRecordTimeStamp != null) {
builder.field(LATEST_RECORD_TIME.getPreferredName(), latestRecordTimeStamp.getTime());
builder.dateField(LATEST_RECORD_TIME.getPreferredName(), LATEST_RECORD_TIME.getPreferredName() + "_string",
latestRecordTimeStamp.getTime());
}
if (latestResultTimeStamp != null) {
builder.field(LATEST_RESULT_TIME.getPreferredName(), latestResultTimeStamp.getTime());
builder.dateField(LATEST_RESULT_TIME.getPreferredName(), LATEST_RESULT_TIME.getPreferredName() + "_string",
latestResultTimeStamp.getTime());
}
if (quantiles != null) {
builder.field(Quantiles.TYPE.getPreferredName(), quantiles);

View File

@ -108,6 +108,7 @@ public final class ReservedFieldNames {
DataCounts.OUT_OF_ORDER_TIME_COUNT.getPreferredName(),
DataCounts.LATEST_RECORD_TIME.getPreferredName(),
DataCounts.EARLIEST_RECORD_TIME.getPreferredName(),
DataCounts.LAST_DATA_TIME.getPreferredName(),
Influence.INFLUENCER_FIELD_NAME.getPreferredName(),
Influence.INFLUENCER_FIELD_VALUES.getPreferredName(),

View File

@ -8,20 +8,23 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction.Response;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
@ -33,11 +36,7 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
for (int j = 0; j < listSize; j++) {
String jobId = randomAsciiOfLength(10);
DataCounts dataCounts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
DataCounts dataCounts = new DataCountsTests().createTestInstance();
ModelSizeStats sizeStats = null;
if (randomBoolean()) {
sizeStats = new ModelSizeStats.Builder("foo").build();
@ -52,7 +51,11 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
if (randomBoolean()) {
explanation = randomAsciiOfLength(3);
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node, explanation);
TimeValue openTime = null;
if (randomBoolean()) {
openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test");
}
Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node, explanation, openTime);
jobStatsList.add(jobStats);
}

View File

@ -5,13 +5,16 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.elasticsearch.xpack.ml.action.GetJobsStatsAction.TransportAction.determineJobIdsWithoutLiveStats;
@ -23,7 +26,7 @@ public class GetJobsStatsActionTests extends ESTestCase {
assertEquals("id1", result.get(0));
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null)));
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null, null)));
assertEquals(0, result.size());
result = determineJobIdsWithoutLiveStats(
@ -36,24 +39,32 @@ public class GetJobsStatsActionTests extends ESTestCase {
result = determineJobIdsWithoutLiveStats(
Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null,
JobState.CLOSED, null, null))
JobState.CLOSED, null, null, null))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null, null)
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null, null, null)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Arrays.asList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null, null)));
Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.CLOSED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.CLOSED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.CLOSED, null, null, null)));
assertEquals(0, result.size());
}
public void testDurationToTimeValue() {
assertNull(GetJobsStatsAction.TransportAction.durationToTimeValue(Optional.empty()));
Duration duration = Duration.ofSeconds(10L);
TimeValue timeValue = GetJobsStatsAction.TransportAction.durationToTimeValue(Optional.of(duration));
assertEquals(10L, timeValue.getSeconds());
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCountsTests;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
@ -13,10 +14,7 @@ public class PostDataActionResponseTests extends AbstractStreamableTestCase<Post
@Override
protected PostDataAction.Response createTestInstance() {
DataCounts counts = new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
DataCounts counts = new DataCountsTests().createTestInstance();
return new PostDataAction.Response(counts);
}

View File

@ -183,7 +183,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
@ -209,7 +209,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
@ -263,7 +263,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0));
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean();

View File

@ -63,7 +63,7 @@ public class DatafeedJobTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8));
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0));
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
expectedRequest.setDataDescription(dataDescription.build());

View File

@ -73,7 +73,7 @@ public class DataCountsReporterTests extends ESTestCase {
}
public void testComplexConstructor() throws Exception {
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date());
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date(), new Date());
try (DataCountsReporter dataCountsReporter =
new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) {
@ -249,13 +249,21 @@ public class DataCountsReporterTests extends ESTestCase {
dataCountsReporter.setAnalysedFieldsPerRecord(3);
DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000));
Date now = new Date();
DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, new Date(2000), new Date(3000), now);
dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField();
dataCountsReporter.finishReporting();
long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime();
// check last data time is equal to now give or take a second
assertTrue(lastReportedTimeMs >= now.getTime() && lastReportedTimeMs <= now.getTime() +1);
assertEquals(dataCountsReporter.incrementalStats().getLastDataTimeStamp(),
dataCountsReporter.runningTotalStats().getLastDataTimeStamp());
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc), any());
dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp());
assertEquals(dc, dataCountsReporter.incrementalStats());
}
}

View File

@ -17,11 +17,12 @@ import static org.hamcrest.Matchers.greaterThan;
public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
@Override
protected DataCounts createTestInstance() {
public DataCounts createTestInstance() {
return new DataCounts(randomAsciiOfLength(10), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000), randomIntBetween(1, 1_000_000),
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate());
new DateTime(randomDateTimeZone()).toDate(), new DateTime(randomDateTimeZone()).toDate(),
new DateTime(randomDateTimeZone()).toDate());
}
@Override
@ -35,22 +36,22 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
}
public void testCountsEquals_GivenEqualCounts() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9);
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertTrue(counts1.equals(counts2));
assertTrue(counts2.equals(counts1));
}
public void testCountsHashCode_GivenEqualCounts() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9);
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataCounts counts2 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertEquals(counts1.hashCode(), counts2.hashCode());
}
public void testCountsCopyConstructor() {
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9);
DataCounts counts1 = createCounts(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataCounts counts2 = new DataCounts(counts1);
assertEquals(counts1.hashCode(), counts2.hashCode());
@ -62,7 +63,7 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
}
public void testCountCopyCreatedFieldsNotZero() throws Exception {
DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 1479211200000L, 1479384000000L);
DataCounts counts1 = createCounts(1, 200, 400, 3, 4, 5, 6, 1479211200000L, 1479384000000L, 1488282343000L);
assertAllFieldsGreaterThanZero(counts1);
DataCounts counts2 = new DataCounts(counts1);
@ -101,19 +102,19 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
}
public void testCalcProcessedFieldCount() {
DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date());
DataCounts counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date(), new Date());
counts.calcProcessedFieldCount(3);
assertEquals(30, counts.getProcessedFieldCount());
counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, new Date(), new Date());
counts = new DataCounts(randomAsciiOfLength(16), 10L, 0L, 0L, 0L, 0L, 5L, 0L, new Date(), new Date(), new Date());
counts.calcProcessedFieldCount(3);
assertEquals(25, counts.getProcessedFieldCount());
}
public void testEquals() {
DataCounts counts1 = new DataCounts(
randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, new Date(), new Date(1435000000L));
randomAsciiOfLength(16), 10L, 5000L, 2000L, 300L, 6L, 15L, 0L, new Date(), new Date(1435000000L), new Date(10L));
DataCounts counts2 = new DataCounts(counts1);
assertEquals(counts1, counts2);
@ -155,11 +156,12 @@ public class DataCountsTests extends AbstractSerializingTestCase<DataCounts> {
private static DataCounts createCounts(
long processedRecordCount, long processedFieldCount, long inputBytes, long inputFieldCount,
long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long earliestRecordTime, long latestRecordTime) {
long invalidDateCount, long missingFieldCount, long outOfOrderTimeStampCount, long earliestRecordTime, long latestRecordTime,
long lastDataTime) {
DataCounts counts = new DataCounts("foo", processedRecordCount, processedFieldCount, inputBytes,
inputFieldCount, invalidDateCount, missingFieldCount, outOfOrderTimeStampCount,
new Date(earliestRecordTime), new Date(latestRecordTime));
new Date(earliestRecordTime), new Date(latestRecordTime), new Date(lastDataTime));
return counts;
}

View File

@ -85,6 +85,7 @@ setup:
- is_true: jobs.0.node.name
- is_true: jobs.0.node.transport_address
- match: { jobs.0.node.attributes.max_running_jobs: "10"}
- gte: { jobs.0.open_time: 0}
---
"Test get job stats for closed job":
@ -121,9 +122,10 @@ setup:
- gt: { jobs.0.model_size_stats.model_bytes: 0 }
- match: { jobs.0.state: closed }
- is_false: jobs.0.node
- is_false: jobs.0.open_time
---
"Test get job stats of datafeed job that has not received and data":
"Test get job stats of datafeed job that has not received any data":
- do:
xpack.ml.get_job_stats:
@ -132,6 +134,7 @@ setup:
- match: { jobs.0.data_counts.processed_record_count: 0 }
- match: { jobs.0.model_size_stats.model_bytes : 0 }
- match: { jobs.0.state: opened }
- gte: { jobs.0.open_time: 0}
---
"Test get all job stats explicitly":