[ML] Impove mechanism for ignoring maintenance windows (elastic/x-pack-elasticsearch#1914)

Currently, the autodetect process has an `ignoreDowntime`
parameter which, when set to true, results to time being
skipped over to the end of the bucket of the first data
point received. After that, skipping time requires closing
and opening the job. With regard to datafeeds, this does not
work well with real-time requests which use the advance-time
API in order to ensure results are created for data gaps.

This commit improves this functionality by making it more
flexible and less ambiguous.

- flush API now supports skip_time parameter which
sends a control message to the autodetect process
telling it to skip time to a given value
- the flush API now also returns the last_finalized_bucket_end
time which allows clients to resume data searches correctly
- the datafeed start API issues a skip_time request when the
given start time is after the resume point. It then resumes
the search from the last_finalized_bucket_end time.

relates elastic/x-pack-elasticsearch#1913


Original commit: elastic/x-pack-elasticsearch@caa5fe8016
This commit is contained in:
Dimitris Athanasiou 2017-07-05 11:33:42 +01:00 committed by GitHub
parent 595468a505
commit 15f9b1ed9c
33 changed files with 473 additions and 222 deletions

View File

@ -280,8 +280,7 @@ public class MachineLearning implements ActionPlugin {
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
}
} else {
autodetectProcessFactory = (job, modelSnapshot, quantiles, filters,
ignoreDowntime, executorService, onProcessCrash) ->
autodetectProcessFactory = (job, modelSnapshot, quantiles, filters, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
@ -13,6 +14,7 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -28,10 +30,12 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobAction.Response, FlushJobAction.RequestBuilder> {
@ -59,6 +63,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
public static final ParseField START = new ParseField("start");
public static final ParseField END = new ParseField("end");
public static final ParseField ADVANCE_TIME = new ParseField("advance_time");
public static final ParseField SKIP_TIME = new ParseField("skip_time");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
@ -68,6 +73,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
PARSER.declareString(Request::setStart, START);
PARSER.declareString(Request::setEnd, END);
PARSER.declareString(Request::setAdvanceTime, ADVANCE_TIME);
PARSER.declareString(Request::setSkipTime, SKIP_TIME);
}
public static Request parseRequest(String jobId, XContentParser parser) {
@ -82,6 +88,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
private String start;
private String end;
private String advanceTime;
private String skipTime;
Request() {
}
@ -114,12 +121,22 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
this.end = end;
}
public String getAdvanceTime() { return advanceTime; }
public String getAdvanceTime() {
return advanceTime;
}
public void setAdvanceTime(String advanceTime) {
this.advanceTime = advanceTime;
}
public String getSkipTime() {
return skipTime;
}
public void setSkipTime(String skipTime) {
this.skipTime = skipTime;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -127,6 +144,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
start = in.readOptionalString();
end = in.readOptionalString();
advanceTime = in.readOptionalString();
if (in.getVersion().after(Version.V_5_5_0)) {
skipTime = in.readOptionalString();
}
}
@Override
@ -136,11 +156,14 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
out.writeOptionalString(start);
out.writeOptionalString(end);
out.writeOptionalString(advanceTime);
if (out.getVersion().after(Version.V_5_5_0)) {
out.writeOptionalString(skipTime);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId, calcInterim, start, end, advanceTime);
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
}
@Override
@ -156,7 +179,8 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
calcInterim == other.calcInterim &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(advanceTime, other.advanceTime);
Objects.equals(advanceTime, other.advanceTime) &&
Objects.equals(skipTime, other.skipTime);
}
@Override
@ -173,6 +197,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
if (advanceTime != null) {
builder.field(ADVANCE_TIME.getPreferredName(), advanceTime);
}
if (skipTime != null) {
builder.field(SKIP_TIME.getPreferredName(), skipTime);
}
builder.endObject();
return builder;
}
@ -188,36 +215,52 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean flushed;
private Date lastFinalizedBucketEnd;
Response() {
super(null, null);
}
Response(boolean flushed) {
public Response(boolean flushed, @Nullable Date lastFinalizedBucketEnd) {
super(null, null);
this.flushed = flushed;
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
}
public boolean isFlushed() {
return flushed;
}
public Date getLastFinalizedBucketEnd() {
return lastFinalizedBucketEnd;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
flushed = in.readBoolean();
if (in.getVersion().after(Version.V_5_5_0)) {
lastFinalizedBucketEnd = new Date(in.readVLong());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(flushed);
if (out.getVersion().after(Version.V_5_5_0)) {
out.writeVLong(lastFinalizedBucketEnd.getTime());
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("flushed", flushed);
if (lastFinalizedBucketEnd != null) {
builder.dateField(FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName(),
FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", lastFinalizedBucketEnd.getTime());
}
builder.endObject();
return builder;
}
@ -227,12 +270,13 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return flushed == response.flushed;
return flushed == response.flushed &&
Objects.equals(lastFinalizedBucketEnd, response.lastFinalizedBucketEnd);
}
@Override
public int hashCode() {
return Objects.hash(flushed);
return Objects.hash(flushed, lastFinalizedBucketEnd);
}
}
@ -261,6 +305,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
if (request.getAdvanceTime() != null) {
paramsBuilder.advanceTime(request.getAdvanceTime());
}
if (request.getSkipTime() != null) {
paramsBuilder.skipTime(request.getSkipTime());
}
TimeRange.Builder timeRangeBuilder = TimeRange.builder();
if (request.getStart() != null) {
timeRangeBuilder.startTime(request.getStart());
@ -269,13 +316,12 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
timeRangeBuilder.endTime(request.getEnd());
}
paramsBuilder.forTimeRange(timeRangeBuilder.build());
processManager.flushJob(task, paramsBuilder.build(), e -> {
if (e == null) {
listener.onResponse(new Response(true));
} else {
listener.onFailure(e);
}
});
processManager.flushJob(task, paramsBuilder.build(), ActionListener.wrap(
flushAcknowledgement -> {
listener.onResponse(new Response(true,
flushAcknowledgement == null ? null : flushAcknowledgement.getLastFinalizedBucketEnd()));
}, listener::onFailure
));
}
}
}

View File

@ -186,13 +186,15 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public static class JobParams implements PersistentTaskParams {
/** TODO Remove in 7.0.0 */
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean(JobParams::setIgnoreDowntime, IGNORE_DOWNTIME);
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
@ -210,7 +212,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
private String jobId;
private boolean ignoreDowntime = true;
// A big state can take a while to restore. For symmetry with the _close endpoint any
// changes here should be reflected there too.
private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
@ -224,7 +225,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public JobParams(StreamInput in) throws IOException {
jobId = in.readString();
ignoreDowntime = in.readBoolean();
if (in.getVersion().onOrBefore(Version.V_5_5_0)) {
// Read `ignoreDowntime`
in.readBoolean();
}
timeout = TimeValue.timeValueMillis(in.readVLong());
}
@ -236,14 +240,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
this.jobId = jobId;
}
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public void setIgnoreDowntime(boolean ignoreDowntime) {
this.ignoreDowntime = ignoreDowntime;
}
public TimeValue getTimeout() {
return timeout;
}
@ -260,7 +256,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeBoolean(ignoreDowntime);
if (out.getVersion().onOrBefore(Version.V_5_5_0)) {
// Write `ignoreDowntime` - true by default
out.writeBoolean(true);
}
out.writeVLong(timeout.millis());
}
@ -268,7 +267,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(IGNORE_DOWNTIME.getPreferredName(), ignoreDowntime);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.endObject();
return builder;
@ -276,7 +274,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public int hashCode() {
return Objects.hash(jobId, ignoreDowntime, timeout);
return Objects.hash(jobId, timeout);
}
@Override
@ -289,7 +287,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(ignoreDowntime, other.ignoreDowntime) &&
Objects.equals(timeout, other.timeout);
}
@ -588,7 +585,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
protected void nodeOperation(AllocatedPersistentTask task, JobParams params) {
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
autodetectProcessManager.openJob(jobTask, params.isIgnoreDowntime(), e2 -> {
autodetectProcessManager.openJob(jobTask, e2 -> {
if (e2 == null) {
task.markAsCompleted();
} else {

View File

@ -77,7 +77,7 @@ class DatafeedJob {
}
Long runLookBack(long startTime, Long endTime) throws Exception {
lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startTime) ? lastEndTimeMs + 1 : startTime;
lookbackStartTimeMs = skipToStartTime(startTime);
Optional<Long> endMs = Optional.ofNullable(endTime);
long lookbackEnd = endMs.orElse(currentTimeSupplier.get() - queryDelayMs);
boolean isLookbackOnly = endMs.isPresent();
@ -115,6 +115,22 @@ class DatafeedJob {
return null;
}
private long skipToStartTime(long startTime) {
if (lastEndTimeMs != null) {
if (lastEndTimeMs + 1 > startTime) {
// start time is before last checkpoint, thus continue from checkpoint
return lastEndTimeMs + 1;
}
// start time is after last checkpoint, thus we need to skip time
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setSkipTime(String.valueOf(startTime));
FlushJobAction.Response flushResponse = flushJob(request);
LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
return flushResponse.getLastFinalizedBucketEnd().getTime();
}
return startTime;
}
long runRealtime() throws Exception {
long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1);
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
@ -265,10 +281,10 @@ class DatafeedJob {
return (epochMs / frequencyMs) * frequencyMs;
}
private void flushJob(FlushJobAction.Request flushRequest) {
private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) {
try {
LOGGER.trace("[" + jobId + "] Sending flush request");
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
return client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
} catch (Exception e) {
LOGGER.debug("[" + jobId + "] error while flushing job", e);

View File

@ -78,7 +78,6 @@ public class ProcessCtrl {
*/
static final String BUCKET_SPAN_ARG = "--bucketspan=";
public static final String DELETE_STATE_FILES_ARG = "--deleteStateFiles";
static final String IGNORE_DOWNTIME_ARG = "--ignoreDowntime";
static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
static final String MODEL_CONFIG_ARG = "--modelconfig=";
public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState=";
@ -151,8 +150,7 @@ public class ProcessCtrl {
return rng.nextInt(SECONDS_IN_HOUR);
}
public static List<String> buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime,
long controllerPid) {
public static List<String> buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, long controllerPid) {
List<String> command = new ArrayList<>();
command.add(AUTODETECT_PATH);
@ -213,10 +211,6 @@ public class ProcessCtrl {
int maxQuantileInterval = BASE_MAX_QUANTILE_INTERVAL + intervalStagger;
command.add(MAX_QUANTILE_INTERVAL_ARG + maxQuantileInterval);
if (ignoreDowntime) {
command.add(IGNORE_DOWNTIME_ARG);
}
if (modelConfigFilePresent(env)) {
String modelConfigFile = XPackPlugin.resolveConfigFile(env, ML_MODEL_CONF).toString();
command.add(MODEL_CONFIG_ARG + modelConfigFile);

View File

@ -43,7 +43,6 @@ public class AutodetectBuilder {
private Job job;
private List<Path> filesToDelete;
private Logger logger;
private boolean ignoreDowntime;
private Set<MlFilter> referencedFilters;
private Quantiles quantiles;
private Environment env;
@ -68,21 +67,9 @@ public class AutodetectBuilder {
this.job = Objects.requireNonNull(job);
this.filesToDelete = Objects.requireNonNull(filesToDelete);
this.logger = Objects.requireNonNull(logger);
ignoreDowntime = false;
referencedFilters = new HashSet<>();
}
/**
* Set ignoreDowntime
*
* @param ignoreDowntime If true set the ignore downtime flag overriding the
* setting in the job configuration
*/
public AutodetectBuilder ignoreDowntime(boolean ignoreDowntime) {
this.ignoreDowntime = ignoreDowntime;
return this;
}
public AutodetectBuilder referencedFilters(Set<MlFilter> filters) {
referencedFilters = filters;
return this;
@ -103,7 +90,7 @@ public class AutodetectBuilder {
*/
public void build() throws IOException, TimeoutException {
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime, controller.getPid());
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, controller.getPid());
buildLimits(command);
buildModelPlotConfig(command);

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -23,6 +24,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -196,23 +198,24 @@ public class AutodetectCommunicator implements Closeable {
}, handler);
}
public void flushJob(FlushJobParams params, BiConsumer<Void, Exception> handler) {
public void flushJob(FlushJobParams params, BiConsumer<FlushAcknowledgement, Exception> handler) {
submitOperation(() -> {
String flushId = autodetectProcess.flushJob(params);
waitFlushToCompletion(flushId);
return null;
return waitFlushToCompletion(flushId);
}, handler);
}
void waitFlushToCompletion(String flushId) {
@Nullable
FlushAcknowledgement waitFlushToCompletion(String flushId) {
LOGGER.debug("[{}] waiting for flush", job.getId());
FlushAcknowledgement flushAcknowledgement;
try {
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
while (isFlushComplete == false) {
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
while (flushAcknowledgement == null) {
checkProcessIsAlive();
checkResultsProcessorIsAlive();
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
}
} finally {
autoDetectResultProcessor.clearAwaitingFlush(flushId);
@ -225,6 +228,8 @@ public class AutodetectCommunicator implements Closeable {
LOGGER.debug("[{}] Flush completed", job.getId());
}
return flushAcknowledgement;
}
/**

View File

@ -25,13 +25,11 @@ public interface AutodetectProcessFactory {
* @param modelSnapshot The model snapshot to restore from
* @param quantiles The quantiles to push to the native process
* @param filters The filters to push to the native process
* @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @return The process
*/
AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters,
boolean ignoreDowntime,
ExecutorService executorService,
Runnable onProcessCrash);
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@ -201,23 +202,23 @@ public class AutodetectProcessManager extends AbstractComponent {
* @param params Parameters describing the controls that will accompany the flushing
* (e.g. calculating interim results, time control, etc.)
*/
public void flushJob(JobTask jobTask, FlushJobParams params, Consumer<Exception> handler) {
public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<FlushAcknowledgement> handler) {
logger.debug("Flushing job {}", jobTask.getJobId());
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
handler.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
}
communicator.flushJob(params, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
communicator.flushJob(params, (flushAcknowledgement, e) -> {
if (e != null) {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobTask.getJobId());
logger.error(msg);
handler.accept(ExceptionsHelper.serverError(msg, e));
handler.onFailure(ExceptionsHelper.serverError(msg, e));
} else {
handler.onResponse(flushAcknowledgement);
}
});
}
@ -240,7 +241,7 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
public void openJob(JobTask jobTask, boolean ignoreDowntime, Consumer<Exception> handler) {
public void openJob(JobTask jobTask, Consumer<Exception> handler) {
String jobId = jobTask.getJobId();
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
@ -263,7 +264,7 @@ public class AutodetectProcessManager extends AbstractComponent {
protected void doRun() throws Exception {
try {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobTask.getAllocationId(),
id -> create(jobTask, params, ignoreDowntime, handler));
id -> create(jobTask, params, handler));
communicator.init(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
} catch (Exception e1) {
@ -286,8 +287,7 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams,
boolean ignoreDowntime, Consumer<Exception> handler) {
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer<Exception> handler) {
if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
RestStatus.TOO_MANY_REQUESTS);
@ -321,8 +321,8 @@ public class AutodetectProcessManager extends AbstractComponent {
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED));
autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService,
() -> setJobState(jobTask, JobState.FAILED));
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
ExecutorService autodetectWorkerExecutor;

View File

@ -77,7 +77,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
*/
@Override
public String flushJob(FlushJobParams params) throws IOException {
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID);
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement);
results.add(result);
return FLUSH_ID;

View File

@ -59,13 +59,12 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
@Override
public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot,
Quantiles quantiles, Set<MlFilter> filters,
boolean ignoreDowntime,
ExecutorService executorService,
Runnable onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(),
true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
createNativeProcess(job, quantiles, filters, processPipes, ignoreDowntime, filesToDelete);
createNativeProcess(job, quantiles, filters, processPipes, filesToDelete);
int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size();
StateProcessor stateProcessor = new StateProcessor(settings, client);
@ -88,11 +87,10 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
}
private void createNativeProcess(Job job, Quantiles quantiles, Set<MlFilter> filters, ProcessPipes processPipes,
boolean ignoreDowntime, List<Path> filesToDelete) {
List<Path> filesToDelete) {
try {
AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
settings, nativeController, processPipes)
.ignoreDowntime(ignoreDowntime)
.referencedFilters(filters);
// if state is null or empty it will be ignored

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -231,7 +232,7 @@ public class AutoDetectResultProcessor {
// through to the data store
context.bulkResultsPersister.executeRequest();
persister.commitResultWrites(context.jobId);
flushListener.acknowledgeFlush(flushAcknowledgement.getId());
flushListener.acknowledgeFlush(flushAcknowledgement);
// Interim results may have been produced by the flush,
// which need to be
// deleted when the next finalized results come through
@ -291,13 +292,11 @@ public class AutoDetectResultProcessor {
*
* @param flushId the id of the flush request to wait for
* @param timeout the timeout
* @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired
* @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
*/
public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) {
if (failed) {
return false;
}
return flushListener.waitForFlush(flushId, timeout);
@Nullable
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
return failed ? null : flushListener.waitForFlush(flushId, timeout);
}
public void clearAwaitingFlush(String flushId) {

View File

@ -5,15 +5,20 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
/**
@ -25,44 +30,70 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
*/
public static final ParseField TYPE = new ParseField("flush");
public static final ParseField ID = new ParseField("id");
public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0]));
TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Date) a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + LAST_FINALIZED_BUCKET_END.getPreferredName() + "]");
}, LAST_FINALIZED_BUCKET_END, ObjectParser.ValueType.VALUE);
}
private String id;
private Date lastFinalizedBucketEnd;
public FlushAcknowledgement(String id) {
public FlushAcknowledgement(String id, Date lastFinalizedBucketEnd) {
this.id = id;
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
}
public FlushAcknowledgement(StreamInput in) throws IOException {
id = in.readString();
if (in.getVersion().after(Version.V_5_5_0)) {
lastFinalizedBucketEnd = new Date(in.readVLong());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
if (out.getVersion().after(Version.V_5_5_0)) {
out.writeVLong(lastFinalizedBucketEnd.getTime());
}
}
public String getId() {
return id;
}
public Date getLastFinalizedBucketEnd() {
return lastFinalizedBucketEnd;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
if (lastFinalizedBucketEnd != null) {
builder.dateField(LAST_FINALIZED_BUCKET_END.getPreferredName(), LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string",
lastFinalizedBucketEnd.getTime());
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, lastFinalizedBucketEnd);
}
@Override
@ -74,7 +105,8 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
return false;
}
FlushAcknowledgement other = (FlushAcknowledgement) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) &&
Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.common.Nullable;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@ -15,29 +17,34 @@ import java.util.concurrent.atomic.AtomicBoolean;
class FlushListener {
final ConcurrentMap<String, CountDownLatch> awaitingFlushed = new ConcurrentHashMap<>();
final ConcurrentMap<String, FlushAcknowledgementHolder> awaitingFlushed = new ConcurrentHashMap<>();
final AtomicBoolean cleared = new AtomicBoolean(false);
boolean waitForFlush(String flushId, Duration timeout) {
@Nullable
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) {
if (cleared.get()) {
return false;
return null;
}
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
try {
return latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return holder.flushAcknowledgement;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return null;
}
void acknowledgeFlush(String flushId) {
void acknowledgeFlush(FlushAcknowledgement flushAcknowledgement) {
// acknowledgeFlush(...) could be called before waitForFlush(...)
// a flush api call writes a flush command to the analytical process and then via a different thread the
// result reader then reads whether the flush has been acked.
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
latch.countDown();
String flushId = flushAcknowledgement.getId();
FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
holder.flushAcknowledgement = flushAcknowledgement;
holder.latch.countDown();
}
void clear(String flushId) {
@ -46,11 +53,22 @@ class FlushListener {
void clear() {
if (cleared.compareAndSet(false, true)) {
Iterator<ConcurrentMap.Entry<String, CountDownLatch>> latches = awaitingFlushed.entrySet().iterator();
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
while (latches.hasNext()) {
latches.next().getValue().countDown();
latches.next().getValue().latch.countDown();
latches.remove();
}
}
}
private static class FlushAcknowledgementHolder {
private final CountDownLatch latch;
private volatile FlushAcknowledgement flushAcknowledgement;
private FlushAcknowledgementHolder(String flushId) {
this.flushAcknowledgement = new FlushAcknowledgement(flushId, null);
this.latch = new CountDownLatch(1);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
@ -13,14 +14,32 @@ import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.util.Objects;
public class FlushJobParams {
/**
* Whether interim results should be calculated
*/
private final boolean calcInterim;
/**
* The time range for which interim results should be calculated
*/
private final TimeRange timeRange;
/**
* The epoch (seconds) to advance time to
*/
private final Long advanceTimeSeconds;
private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) {
/**
* The epoch (seconds) to skip time to
*/
private final Long skipTimeSeconds;
private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds, Long skipTimeSeconds) {
this.calcInterim = calcInterim;
this.timeRange = Objects.requireNonNull(timeRange);
this.advanceTimeSeconds = advanceTimeSeconds;
this.skipTimeSeconds = skipTimeSeconds;
}
public boolean shouldCalculateInterim() {
@ -31,6 +50,10 @@ public class FlushJobParams {
return advanceTimeSeconds != null;
}
public boolean shouldSkipTime() {
return skipTimeSeconds != null;
}
public String getStart() {
return timeRange.getStart();
}
@ -46,6 +69,13 @@ public class FlushJobParams {
return advanceTimeSeconds;
}
public long getSkipTime() {
if (!shouldSkipTime()) {
throw new IllegalStateException();
}
return skipTimeSeconds;
}
public static Builder builder() {
return new Builder();
}
@ -57,24 +87,20 @@ public class FlushJobParams {
FlushJobParams that = (FlushJobParams) o;
return calcInterim == that.calcInterim &&
Objects.equals(timeRange, that.timeRange) &&
Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds);
Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds) &&
Objects.equals(skipTimeSeconds, that.skipTimeSeconds);
}
@Override
public int hashCode() {
return Objects.hash(calcInterim, timeRange, advanceTimeSeconds);
return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
}
public static class Builder {
private boolean calcInterim = false;
private TimeRange timeRange;
private TimeRange timeRange = TimeRange.builder().build();
private String advanceTime;
private Builder() {
calcInterim = false;
timeRange = TimeRange.builder().build();
advanceTime = "";
}
private String skipTime;
public Builder calcInterim(boolean value) {
calcInterim = value;
@ -87,14 +113,24 @@ public class FlushJobParams {
}
public Builder advanceTime(String timestamp) {
advanceTime = ExceptionsHelper.requireNonNull(timestamp, "advance");
advanceTime = ExceptionsHelper.requireNonNull(timestamp, "advance_time");
return this;
}
public Builder skipTime(String timestamp) {
skipTime = ExceptionsHelper.requireNonNull(timestamp, "skip_time");
return this;
}
public FlushJobParams build() {
checkValidFlushArgumentsCombination();
Long advanceTimeSeconds = checkAdvanceTimeParam();
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds);
Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime);
Long skipTimeSeconds = parseTimeParam("skip_time", skipTime);
if (skipTimeSeconds != null && advanceTimeSeconds != null && advanceTimeSeconds <= skipTimeSeconds) {
throw ExceptionsHelper.badRequestException("advance_time [" + advanceTime + "] must be later than skip_time ["
+ skipTime + "]");
}
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
}
private void checkValidFlushArgumentsCombination() {
@ -107,12 +143,12 @@ public class FlushJobParams {
}
}
private Long checkAdvanceTimeParam() {
if (advanceTime != null && !advanceTime.isEmpty()) {
return paramToEpochIfValidOrThrow("advance_time", advanceTime) / TimeRange.MILLISECONDS_IN_SECOND;
}
private Long parseTimeParam(String name, String value) {
if (Strings.isNullOrEmpty(value)) {
return null;
}
return paramToEpochIfValidOrThrow(name, value) / TimeRange.MILLISECONDS_IN_SECOND;
}
private long paramToEpochIfValidOrThrow(String paramName, String date) {
if (TimeRange.NOW.equals(date)) {

View File

@ -52,6 +52,11 @@ public class ControlMsgToProcessWriter {
*/
private static final String ADVANCE_TIME_MESSAGE_CODE = "t";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
*/
private static final String SKIP_TIME_MESSAGE_CODE = "s";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
*/
@ -108,6 +113,9 @@ public class ControlMsgToProcessWriter {
* (e.g. calculating interim results, time control, etc.)
*/
public void writeFlushControlMessage(FlushJobParams params) throws IOException {
if (params.shouldSkipTime()) {
writeMessage(SKIP_TIME_MESSAGE_CODE + params.getSkipTime());
}
if (params.shouldAdvanceTime()) {
writeMessage(ADVANCE_TIME_MESSAGE_CODE + params.getAdvanceTime());
}

View File

@ -24,6 +24,7 @@ public class RestFlushJobAction extends BaseRestHandler {
private final String DEFAULT_START = "";
private final String DEFAULT_END = "";
private final String DEFAULT_ADVANCE_TIME = "";
private final String DEFAULT_SKIP_TIME = "";
public RestFlushJobAction(Settings settings, RestController controller) {
super(settings);
@ -50,6 +51,7 @@ public class RestFlushJobAction extends BaseRestHandler {
request.setStart(restRequest.param(FlushJobAction.Request.START.getPreferredName(), DEFAULT_START));
request.setEnd(restRequest.param(FlushJobAction.Request.END.getPreferredName(), DEFAULT_END));
request.setAdvanceTime(restRequest.param(FlushJobAction.Request.ADVANCE_TIME.getPreferredName(), DEFAULT_ADVANCE_TIME));
request.setSkipTime(restRequest.param(FlushJobAction.Request.SKIP_TIME.getPreferredName(), DEFAULT_SKIP_TIME));
}
return channel -> client.execute(FlushJobAction.INSTANCE, request, new RestToXContentListener<>(channel));

View File

@ -42,7 +42,6 @@ public class RestOpenJobAction extends BaseRestHandler {
request = OpenJobAction.Request.parseRequest(restRequest.param(Job.ID.getPreferredName()), restRequest.contentParser());
} else {
OpenJobAction.JobParams jobParams = new OpenJobAction.JobParams(restRequest.param(Job.ID.getPreferredName()));
jobParams.setIgnoreDowntime(restRequest.paramAsBoolean(OpenJobAction.JobParams.IGNORE_DOWNTIME.getPreferredName(), true));
if (restRequest.hasParam(OpenJobAction.JobParams.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(OpenJobAction.JobParams.TIMEOUT.getPreferredName(),
TimeValue.timeValueSeconds(20));

View File

@ -18,9 +18,6 @@ public class OpenJobActionRequestTests extends AbstractStreamableXContentTestCas
if (randomBoolean()) {
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
if (randomBoolean()) {
params.setIgnoreDowntime(randomBoolean());
}
return new Request(params);
}

View File

@ -7,12 +7,13 @@ package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.FlushJobAction.Response;
import org.joda.time.DateTime;
public class PostDataFlushResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
return new Response(randomBoolean(), new DateTime(randomDateTimeZone()).toDate());
}
@Override

View File

@ -27,9 +27,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
@ -139,6 +141,7 @@ public class DatafeedJobTests extends ESTestCase {
assertEquals(10000 + frequencyMs + 100, next);
verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs);
assertThat(flushJobRequests.getAllValues().size(), equalTo(1));
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
@ -148,14 +151,17 @@ public class DatafeedJobTests extends ESTestCase {
// We need to return empty counts so that the lookback doesn't update the last end time
when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts("_job_id")));
currentTime = 10000L;
currentTime = 9999L;
long latestFinalBucketEndTimeMs = 5000;
long latestRecordTimeMs = 5000;
FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, new Date(10000L));
when(flushJobFuture.actionGet()).thenReturn(skipTimeResponse);
long frequencyMs = 1000;
long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
datafeedJob.runLookBack(10000L, null);
datafeedJob.runLookBack(currentTime, null);
// advance time
currentTime = 12000L;
@ -163,6 +169,13 @@ public class DatafeedJobTests extends ESTestCase {
expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runRealtime());
verify(dataExtractorFactory, times(1)).newExtractor(10000L, 11000L);
List<FlushJobAction.Request> capturedFlushJobRequests = flushJobRequests.getAllValues();
assertThat(capturedFlushJobRequests.size(), equalTo(2));
assertThat(capturedFlushJobRequests.get(0).getCalcInterim(), is(false));
assertThat(capturedFlushJobRequests.get(0).getSkipTime(), equalTo("9999"));
assertThat(capturedFlushJobRequests.get(1).getCalcInterim(), is(true));
assertThat(capturedFlushJobRequests.get(1).getSkipTime(), is(nullValue()));
assertThat(capturedFlushJobRequests.get(1).getAdvanceTime(), equalTo("11000"));
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
}

View File

@ -354,7 +354,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
}
private FlushAcknowledgement createFlushAcknowledgement() {
return new FlushAcknowledgement(randomAlphaOfLength(5));
return new FlushAcknowledgement(randomAlphaOfLength(5), new Date(randomNonNegativeLong()));
}
private class ResultsBuilder {

View File

@ -51,8 +51,8 @@ public class ProcessCtrlTests extends ESTestCase {
dd.setTimeField("tf");
job.setDataDescription(dd);
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid);
assertEquals(15, command.size());
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
assertEquals(14, command.size());
assertTrue(command.contains(ProcessCtrl.AUTODETECT_PATH));
assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "120"));
assertTrue(command.contains(ProcessCtrl.LATENCY_ARG + "360"));
@ -73,7 +73,6 @@ public class ProcessCtrlTests extends ESTestCase {
assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
int expectedMaxQuantileInterval = 21600 + ProcessCtrl.calculateStaggeringInterval(job.getId());
assertTrue(command.contains(ProcessCtrl.MAX_QUANTILE_INTERVAL_ARG + expectedMaxQuantileInterval));
assertTrue(command.contains(ProcessCtrl.IGNORE_DOWNTIME_ARG));
}
public void testBuildAutodetectCommand_defaultTimeField() {
@ -81,7 +80,7 @@ public class ProcessCtrlTests extends ESTestCase {
Environment env = new Environment(settings);
Job.Builder job = buildJobBuilder("unit-test-job");
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
assertTrue(command.contains(ProcessCtrl.TIME_FIELD_ARG + "time"));
}
@ -94,38 +93,16 @@ public class ProcessCtrlTests extends ESTestCase {
int expectedPersistInterval = 10800 + ProcessCtrl.calculateStaggeringInterval(job.getId());
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
assertFalse(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
env = new Environment(settings);
command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
}
public void testBuildAutodetectCommand_GivenNoIgnoreDowntime() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(
settings);
Job.Builder job = buildJobBuilder("foo");
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
assertFalse(command.contains("--ignoreDowntime"));
}
public void testBuildAutodetectCommand_GivenIgnoreDowntimeParam() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(
settings);
Job.Builder job = buildJobBuilder("foo");
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid);
assertTrue(command.contains("--ignoreDowntime"));
}
public void testBuildNormalizerCommand() throws IOException {
Environment env = new Environment(
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
@ -35,9 +36,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@ -70,10 +73,13 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(flushAcknowledgement);
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) {
FlushJobParams params = FlushJobParams.builder().build();
communicator.flushJob(params, (aVoid, e) -> {});
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
communicator.flushJob(params, (f, e) -> flushAcknowledgementHolder.set(f));
assertThat(flushAcknowledgementHolder.get(), equalTo(flushAcknowledgement));
Mockito.verify(process).flushJob(params);
}
}
@ -83,7 +89,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
when(processor.isFailed()).thenReturn(true);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor);
expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo"));
}
@ -103,8 +109,9 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))))
.thenReturn(false).thenReturn(true);
.thenReturn(null).thenReturn(flushAcknowledgement);
FlushJobParams params = FlushJobParams.builder().build();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
@ -134,7 +135,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn(job.getId());
AtomicReference<Exception> errorHolder = new AtomicReference<>();
manager.openJob(jobTask, false, e -> errorHolder.set(e));
manager.openJob(jobTask, e -> errorHolder.set(e));
Exception error = errorHolder.get();
assertThat(error, is(notNullValue()));
@ -150,7 +151,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
when(jobTask.getAllocationId()).thenReturn(1L);
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any());
@ -175,7 +176,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator());
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
@ -192,22 +193,22 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("bar");
when(jobTask.getAllocationId()).thenReturn(1L);
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("baz");
when(jobTask.getAllocationId()).thenReturn(2L);
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
assertEquals(3, manager.numberOfOpenJobs());
Exception[] holder = new Exception[1];
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foobar");
when(jobTask.getAllocationId()).thenReturn(3L);
manager.openJob(jobTask, false, e -> holder[0] = e);
manager.openJob(jobTask, e -> holder[0] = e);
Exception e = holder[0];
assertEquals("max running job capacity [3] reached", e.getMessage());
@ -216,7 +217,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn("baz");
manager.closeJob(jobTask, false, null);
assertEquals(2, manager.numberOfOpenJobs());
manager.openJob(jobTask, false, e1 -> {});
manager.openJob(jobTask, e1 -> {});
assertEquals(3, manager.numberOfOpenJobs());
}
@ -228,7 +229,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
params, (dataCounts1, e) -> {});
assertEquals(1, manager.numberOfOpenJobs());
@ -250,7 +251,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
Exception[] holder = new Exception[1];
manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
assertNotNull(holder[0]);
@ -263,7 +264,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -282,7 +283,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> {});
verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any());
}
@ -294,12 +295,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
InputStream inputStream = createInputStream("");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, inputStream, randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
FlushJobParams params = FlushJobParams.builder().build();
manager.flushJob(jobTask, params, e -> {});
manager.flushJob(jobTask, params, ActionListener.wrap(flushAcknowledgement -> {}, e -> fail(e.getMessage())));
verify(communicator).flushJob(same(params), any());
}
@ -318,7 +319,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
Exception[] holder = new Exception[1];
manager.flushJob(jobTask, params, e -> holder[0] = e);
manager.flushJob(jobTask, params, ActionListener.wrap(flushAcknowledgement -> {}, e -> holder[0] = e));
assertEquals("[foo] exception while flushing job", holder[0].getMessage());
}
@ -333,8 +334,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
// create a jobtask
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {
});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class),
(dataCounts1, e) -> {
});
@ -366,7 +366,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn("foo");
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -385,7 +385,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
when(communicator.getJobTask()).thenReturn(jobTask);
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -408,7 +408,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
InputStream inputStream = createInputStream("");
DataCounts[] dataCounts = new DataCounts[1];
manager.processData(jobTask, inputStream,
@ -429,7 +429,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
@ -437,7 +437,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("my_id");
expectThrows(EsRejectedExecutionException.class,
() -> manager.create(jobTask, buildAutodetectParams(), false, e -> {}));
() -> manager.create(jobTask, buildAutodetectParams(), e -> {}));
verify(autodetectProcess, times(1)).close();
}
@ -447,7 +447,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
manager.create(jobTask, buildAutodetectParams(), e -> {});
String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]";
verify(auditor).info("foo", expectedNotification);
@ -463,7 +463,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
manager.create(jobTask, buildAutodetectParams(), e -> {});
String expectedNotification = "Loading model snapshot [snapshot-1] with " +
"latest_record_timestamp [1970-01-01T00:00:00.000Z], " +
@ -482,7 +482,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
manager.create(jobTask, buildAutodetectParams(), e -> {});
String expectedNotification = "Loading model snapshot [N/A], " +
"job latest_record_timestamp [1970-01-01T00:00:00.000Z]";
@ -501,7 +501,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown(jobId)).thenReturn(createJobDetails(jobId));
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
return new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
@ -531,7 +531,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
autodetectProcessFactory, normalizerFactory,
new NamedXContentRegistry(Collections.emptyList()), auditor);
manager = spy(manager);
doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), anyBoolean(), any());
doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any());
return manager;
}
@ -539,7 +539,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.openJob(jobTask, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts, e) -> {});
return manager;

View File

@ -34,6 +34,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
@ -200,7 +202,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
processorUnderTest.processResult(context, result);
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
verify(persister, times(1)).commitResultWrites(JOB_ID);
verify(bulkBuilder, times(1)).executeRequest();
verifyNoMoreInteractions(persister);
@ -225,7 +227,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(bulkBuilder, times(1)).executeRequest();
inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
}
@ -342,7 +344,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
assertTrue(processorUnderTest.isFailed());
// Wait for flush should return immediately
assertFalse(processorUnderTest.waitForFlushAcknowledgement("foo", Duration.of(300, ChronoUnit.SECONDS)));
FlushAcknowledgement flushAcknowledgement = processorUnderTest.waitForFlushAcknowledgement(
"foo", Duration.of(300, ChronoUnit.SECONDS));
assertThat(flushAcknowledgement, is(nullValue()));
}
public void testKill() throws TimeoutException {

View File

@ -9,6 +9,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.util.Date;
public class FlushAcknowledgementTests extends AbstractSerializingTestCase<FlushAcknowledgement> {
@Override
@ -18,7 +20,7 @@ public class FlushAcknowledgementTests extends AbstractSerializingTestCase<Flush
@Override
protected FlushAcknowledgement createTestInstance() {
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20));
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), new Date(randomNonNegativeLong()));
}
@Override

View File

@ -9,22 +9,26 @@ import org.elasticsearch.test.ESTestCase;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.is;
public class FlushListenerTests extends ESTestCase {
public void testAcknowledgeFlush() throws Exception {
FlushListener listener = new FlushListener();
AtomicBoolean bool = new AtomicBoolean();
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
new Thread(() -> {
boolean result = listener.waitForFlush("_id", Duration.ofMillis(10000));
bool.set(result);
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
}).start();
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
assertFalse(bool.get());
listener.acknowledgeFlush("_id");
assertBusy(() -> assertTrue(bool.get()));
assertNull(flushAcknowledgementHolder.get());
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L));
listener.acknowledgeFlush(flushAcknowledgement);
assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get()));
assertEquals(1, listener.awaitingFlushed.size());
listener.clear("_id");
@ -35,27 +39,26 @@ public class FlushListenerTests extends ESTestCase {
FlushListener listener = new FlushListener();
int numWaits = 9;
List<AtomicBoolean> bools = new ArrayList<>(numWaits);
List<AtomicReference<FlushAcknowledgement>> flushAcknowledgementHolders = new ArrayList<>(numWaits);
for (int i = 0; i < numWaits; i++) {
int id = i;
AtomicBoolean bool = new AtomicBoolean();
bools.add(bool);
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
flushAcknowledgementHolders.add(flushAcknowledgementHolder);
new Thread(() -> {
boolean result = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
bool.set(result);
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
}).start();
}
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
for (AtomicBoolean bool : bools) {
assertFalse(bool.get());
}
assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false));
assertFalse(listener.cleared.get());
listener.clear();
for (AtomicBoolean bool : bools) {
assertBusy(() -> assertTrue(bool.get()));
for (AtomicReference<FlushAcknowledgement> f : flushAcknowledgementHolders) {
assertBusy(() -> assertNotNull(f.get()));
}
assertTrue(listener.awaitingFlushed.isEmpty());
assertTrue(listener.cleared.get());
}
}

View File

@ -5,27 +5,31 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class FlushJobParamsTests extends ESTestCase {
public void testBuilder_GivenDefault() {
FlushJobParams params = FlushJobParams.builder().build();
assertFalse(params.shouldCalculateInterim());
assertFalse(params.shouldAdvanceTime());
assertFalse(params.shouldSkipTime());
assertEquals("", params.getStart());
assertEquals("", params.getEnd());
}
public void testBuilder_GivenCalcInterim() {
FlushJobParams params = FlushJobParams.builder().calcInterim(true).build();
assertTrue(params.shouldCalculateInterim());
assertFalse(params.shouldAdvanceTime());
assertFalse(params.shouldSkipTime());
assertEquals("", params.getStart());
assertEquals("", params.getEnd());
}
public void testBuilder_GivenCalcInterimAndStart() {
FlushJobParams params = FlushJobParams.builder()
.calcInterim(true)
@ -33,6 +37,7 @@ public class FlushJobParamsTests extends ESTestCase {
.build();
assertTrue(params.shouldCalculateInterim());
assertFalse(params.shouldAdvanceTime());
assertFalse(params.shouldSkipTime());
assertEquals("42", params.getStart());
assertEquals("43", params.getEnd());
}
@ -47,7 +52,6 @@ public class FlushJobParamsTests extends ESTestCase {
assertEquals("Invalid flush parameters: 'start' has not been specified.", e.getMessage());
}
public void testBuilder_GivenCalcInterimAndStartAndEnd() {
FlushJobParams params = FlushJobParams.builder()
.calcInterim(true)
@ -59,7 +63,6 @@ public class FlushJobParamsTests extends ESTestCase {
assertEquals("7200", params.getEnd());
}
public void testBuilder_GivenAdvanceTime() {
FlushJobParams params = FlushJobParams.builder().advanceTime("1821").build();
assertFalse(params.shouldCalculateInterim());
@ -69,7 +72,6 @@ public class FlushJobParamsTests extends ESTestCase {
assertEquals(1821, params.getAdvanceTime());
}
public void testBuilder_GivenCalcInterimAndAdvanceTime() {
FlushJobParams params = FlushJobParams.builder()
.calcInterim(true)
@ -82,7 +84,6 @@ public class FlushJobParamsTests extends ESTestCase {
assertEquals(1940, params.getAdvanceTime());
}
public void testBuilder_GivenCalcInterimWithTimeRangeAndAdvanceTime() {
FlushJobParams params = FlushJobParams.builder()
.calcInterim(true)
@ -96,6 +97,27 @@ public class FlushJobParamsTests extends ESTestCase {
assertEquals(1940, params.getAdvanceTime());
}
public void testBuilder_GivenAdvanceTimeIsEarlierThanSkipTime() {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> FlushJobParams.builder().advanceTime("2017-01-01T00:00:00Z").skipTime("2017-02-01T00:00:00Z").build());
assertEquals("advance_time [2017-01-01T00:00:00Z] must be later than skip_time [2017-02-01T00:00:00Z]", e.getMessage());
}
public void testBuilder_GivenAdvanceTimeIsEqualToSkipTime() {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> FlushJobParams.builder().advanceTime("2017-01-01T00:00:00Z").skipTime("2017-01-01T00:00:00Z").build());
assertEquals("advance_time [2017-01-01T00:00:00Z] must be later than skip_time [2017-01-01T00:00:00Z]", e.getMessage());
}
public void testBuilder_GivenAdvanceTimeIsLaterToSkipTime() {
FlushJobParams params = FlushJobParams.builder().advanceTime("2017-02-01T00:00:00Z").skipTime("2017-01-01T00:00:00Z").build();
assertThat(params.getSkipTime(), equalTo(1483228800L));
assertThat(params.getAdvanceTime(), equalTo(1485907200L));
}
public void testValidate_GivenOnlyStartSpecified() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> FlushJobParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build());

View File

@ -41,8 +41,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
FlushJobParams flushJobParams = FlushJobParams.builder()
.advanceTime("1234567890").build();
FlushJobParams flushJobParams = FlushJobParams.builder().advanceTime("1234567890").build();
writer.writeFlushControlMessage(flushJobParams);
@ -53,6 +52,30 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteFlushControlMessage_GivenSkipTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1234567890").build();
writer.writeFlushControlMessage(flushJobParams);
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
inOrder.verify(lengthEncodedWriter).writeField("s1234567890");
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteFlushControlMessage_GivenSkipAndAdvanceTime() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1000").advanceTime("2000").build();
writer.writeFlushControlMessage(flushJobParams);
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeField("s1000");
inOrder.verify(lengthEncodedWriter).writeField("t2000");
}
public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
FlushJobParams flushJobParams = FlushJobParams.builder()
@ -67,7 +90,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
verifyNoMoreInteractions(lengthEncodedWriter);
}
public void testWriteFlushControlMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException {
public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
FlushJobParams flushJobParams = FlushJobParams.builder().build();

View File

@ -91,7 +91,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
categoryDefinition = null;
}
if (randomBoolean()) {
flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20));
flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), new Date(randomNonNegativeLong()));
} else {
flushAcknowledgement = null;
}

View File

@ -31,7 +31,11 @@
},
"advance_time": {
"type": "string",
"description": "Setting this tells the Engine API that no data prior to advance_time is expected"
"description": "Advances time to the given value generating results and updating the model for the advanced interval"
},
"skip_time": {
"type": "string",
"description": "Skips time to the given value without generating results or updating the model for the skipped interval"
}
}
},

View File

@ -66,6 +66,7 @@ setup:
xpack.ml.flush_job:
job_id: post-data-job
- match: { flushed: true }
- match: { last_finalized_bucket_end: 1403481600000 }
- do:
xpack.ml.close_job:
@ -100,6 +101,7 @@ setup:
xpack.ml.flush_job:
job_id: post-data-job
- match: { flushed: true }
- match: { last_finalized_bucket_end: 0 }
- do:
xpack.ml.close_job:
@ -111,6 +113,68 @@ setup:
job_id: post-data-job
- match: { jobs.0.state: "closed" }
---
"Test flush with skip_time":
- do:
xpack.ml.post_data:
job_id: post-data-job
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: AAL
responsetime: 990.4628
sourcetype: post-data-job
time: 1403485200
# Skip a bucket
- do:
xpack.ml.flush_job:
job_id: post-data-job
skip_time: 1403488700
- match: { flushed: true }
- match: { last_finalized_bucket_end: 1403488800000 }
# Send some data that should be ignored
- do:
xpack.ml.post_data:
job_id: post-data-job
body:
airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403488700
# Send data that will create results for the bucket after the skipped one
- do:
xpack.ml.post_data:
job_id: post-data-job
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403488900
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403492400
- do:
xpack.ml.close_job:
job_id: post-data-job
- match: { closed: true }
- do:
xpack.ml.get_buckets:
job_id: "post-data-job"
- match: { count: 2 }
- match: { buckets.0.timestamp: 1403481600000 }
- match: { buckets.0.event_count: 1 }
- match: { buckets.1.timestamp: 1403488800000 }
- match: { buckets.1.event_count: 1 }
---
"Test POST data with invalid parameters":