[ML] Autodetect should receive events from the earliest valid timestamp (elastic/x-pack-elasticsearch#3570)

When events are searched to be passed to the autodetect process, they
are currently calculated based on the latest record timestamp, when
a job opens, and `now` when the process is updated.

This commit changes both to be consistent and based on the earliest
valid timestamp for the job. The earliest valid timestamp is the
latest record timestamp minus the job latency.

Relates elastic/x-pack-elasticsearch#3016

Original commit: elastic/x-pack-elasticsearch@7f882ea053
This commit is contained in:
Dimitris Athanasiou 2018-01-15 18:07:48 +00:00 committed by GitHub
parent 57d887c9df
commit aff9a4a2ba
7 changed files with 53 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xpack.ml.MlParserType; import org.elasticsearch.xpack.ml.MlParserType;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils; import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -450,6 +451,23 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
+ PROCESS_MEMORY_OVERHEAD.getBytes(); + PROCESS_MEMORY_OVERHEAD.getBytes();
} }
/**
* Returns the timestamp before which data is not accepted by the job.
* This is the latest record timestamp minus the job latency.
* @param dataCounts the job data counts
* @return the timestamp before which data is not accepted by the job
*/
public long earliestValidTimestamp(DataCounts dataCounts) {
long currentTime = 0;
Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp();
if (latestRecordTimestamp != null) {
TimeValue latency = analysisConfig.getLatency();
long latencyMillis = latency == null ? 0 : latency.millis();
currentTime = latestRecordTimestamp.getTime() - latencyMillis;
}
return currentTime;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId); out.writeString(jobId);

View File

@ -369,10 +369,7 @@ public class JobProvider {
ActionListener<AutodetectParams.Builder> getScheduledEventsListener = ActionListener.wrap( ActionListener<AutodetectParams.Builder> getScheduledEventsListener = ActionListener.wrap(
paramsBuilder -> { paramsBuilder -> {
ScheduledEventsQueryBuilder scheduledEventsQueryBuilder = new ScheduledEventsQueryBuilder(); ScheduledEventsQueryBuilder scheduledEventsQueryBuilder = new ScheduledEventsQueryBuilder();
Date lastestRecordTime = paramsBuilder.getDataCounts().getLatestRecordTimeStamp(); scheduledEventsQueryBuilder.start(job.earliestValidTimestamp(paramsBuilder.getDataCounts()));
if (lastestRecordTime != null) {
scheduledEventsQueryBuilder.start(Long.toString(lastestRecordTime.getTime()));
}
scheduledEventsForJob(jobId, job.getGroups(), scheduledEventsQueryBuilder, ActionListener.wrap( scheduledEventsForJob(jobId, job.getGroups(), scheduledEventsQueryBuilder, ActionListener.wrap(
events -> { events -> {
paramsBuilder.setScheduledEvents(events.results()); paramsBuilder.setScheduledEvents(events.results());

View File

@ -41,6 +41,11 @@ public class ScheduledEventsQueryBuilder {
return this; return this;
} }
public ScheduledEventsQueryBuilder start(long start) {
this.start = Long.toString(start);
return this;
}
public ScheduledEventsQueryBuilder end(String end) { public ScheduledEventsQueryBuilder end(String end) {
this.end = end; this.end = end;
return this; return this;

View File

@ -88,8 +88,7 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
listener.onFailure( listener.onFailure(ExceptionsHelper.serverError("Error indexing event", e));
ExceptionsHelper.serverError("Error indexing event", e));
} }
}); });
}, },

View File

@ -28,7 +28,6 @@ import org.elasticsearch.xpack.ml.MLMetadataField;
import org.elasticsearch.xpack.ml.MlMetaIndex; import org.elasticsearch.xpack.ml.MlMetaIndex;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.calendars.Calendar; import org.elasticsearch.xpack.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
@ -43,18 +42,16 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final JobManager jobManager;
@Inject @Inject
public TransportPutCalendarAction(Settings settings, ThreadPool threadPool, public TransportPutCalendarAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, ClusterService clusterService, JobManager jobManager) { Client client, ClusterService clusterService) {
super(settings, PutCalendarAction.NAME, threadPool, transportService, actionFilters, super(settings, PutCalendarAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, PutCalendarAction.Request::new); indexNameExpressionResolver, PutCalendarAction.Request::new);
this.client = client; this.client = client;
this.clusterService = clusterService; this.clusterService = clusterService;
this.jobManager = jobManager;
} }
@Override @Override
@ -81,7 +78,6 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
new ActionListener<IndexResponse>() { new ActionListener<IndexResponse>() {
@Override @Override
public void onResponse(IndexResponse indexResponse) { public void onResponse(IndexResponse indexResponse) {
jobManager.updateProcessOnCalendarChanged(calendar.getJobIds());
listener.onResponse(new PutCalendarAction.Response(calendar)); listener.onResponse(new PutCalendarAction.Response(calendar));
} }

View File

@ -60,7 +60,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -280,7 +279,8 @@ public class AutodetectProcessManager extends AbstractComponent {
if (updateParams.isUpdateScheduledEvents()) { if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(Long.toString(new Date().getTime())); DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
} else { } else {
eventsListener.onResponse(null); eventsListener.onResponse(null);

View File

@ -22,6 +22,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MachineLearningClientActionPlugin; import org.elasticsearch.xpack.ml.MachineLearningClientActionPlugin;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -551,6 +552,30 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
builder.build().estimateMemoryFootprint()); builder.build().estimateMemoryFootprint());
} }
public void testEarliestValidTimestamp_GivenEmptyDataCounts() {
assertThat(createRandomizedJob().earliestValidTimestamp(new DataCounts("foo")), equalTo(0L));
}
public void testEarliestValidTimestamp_GivenDataCountsAndZeroLatency() {
Job.Builder builder = buildJobBuilder("foo");
DataCounts dataCounts = new DataCounts(builder.getId());
dataCounts.setLatestRecordTimeStamp(new Date(123456789L));
assertThat(builder.build().earliestValidTimestamp(dataCounts), equalTo(123456789L));
}
public void testEarliestValidTimestamp_GivenDataCountsAndLatency() {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(builder.build().getAnalysisConfig());
analysisConfig.setLatency(TimeValue.timeValueMillis(1000L));
builder.setAnalysisConfig(analysisConfig);
DataCounts dataCounts = new DataCounts(builder.getId());
dataCounts.setLatestRecordTimeStamp(new Date(123456789L));
assertThat(builder.build().earliestValidTimestamp(dataCounts), equalTo(123455789L));
}
public static Job.Builder buildJobBuilder(String id, Date date) { public static Job.Builder buildJobBuilder(String id, Date date) {
Job.Builder builder = new Job.Builder(id); Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(date); builder.setCreateTime(date);