mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Merge branch 'master' into feature/sql
This should get us OK until 0170e0e8d3fa210a539737c91251dbdd520bb458. Original commit: elastic/x-pack-elasticsearch@e0bea08365
This commit is contained in:
commit
b2484bebe8
@ -98,6 +98,15 @@ If you are send pre-aggregated data to a job for analysis, you must ensure
|
||||
that the `size` is configured correctly. Otherwise, some data might not be
|
||||
analyzed.
|
||||
|
||||
|
||||
[float]
|
||||
=== Time-based index patterns are not supported
|
||||
|
||||
It is not possible to create an {xpackml} analysis job that uses time-based
|
||||
index patterns, for example `[logstash-]YYYY.MM.DD`.
|
||||
This applies to the single metric or multi metric job creation wizards in {kib}.
|
||||
|
||||
|
||||
[float]
|
||||
=== Jobs created in {kib} use model plot config and pre-aggregated data
|
||||
//See x-pack-elasticsearch/#844
|
||||
|
@ -71,6 +71,7 @@ public class XPackLicenseState {
|
||||
messages.put(XPackPlugin.WATCHER, XPackLicenseState::watcherAcknowledgementMessages);
|
||||
messages.put(XPackPlugin.MONITORING, XPackLicenseState::monitoringAcknowledgementMessages);
|
||||
messages.put(XPackPlugin.GRAPH, XPackLicenseState::graphAcknowledgementMessages);
|
||||
messages.put(XPackPlugin.MACHINE_LEARNING, XPackLicenseState::machineLearningAcknowledgementMessages);
|
||||
messages.put(XPackPlugin.LOGSTASH, XPackLicenseState::logstashAcknowledgementMessages);
|
||||
ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages);
|
||||
}
|
||||
@ -177,6 +178,21 @@ public class XPackLicenseState {
|
||||
return Strings.EMPTY_ARRAY;
|
||||
}
|
||||
|
||||
private static String[] machineLearningAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
|
||||
switch (newMode) {
|
||||
case BASIC:
|
||||
case STANDARD:
|
||||
case GOLD:
|
||||
switch (currentMode) {
|
||||
case TRIAL:
|
||||
case PLATINUM:
|
||||
return new String[] { "Machine learning will be disabled" };
|
||||
}
|
||||
break;
|
||||
}
|
||||
return Strings.EMPTY_ARRAY;
|
||||
}
|
||||
|
||||
private static String[] logstashAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
|
||||
switch (newMode) {
|
||||
case BASIC:
|
||||
|
@ -280,8 +280,7 @@ public class MachineLearning implements ActionPlugin {
|
||||
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
|
||||
}
|
||||
} else {
|
||||
autodetectProcessFactory = (job, modelSnapshot, quantiles, filters,
|
||||
ignoreDowntime, executorService, onProcessCrash) ->
|
||||
autodetectProcessFactory = (job, modelSnapshot, quantiles, filters, executorService, onProcessCrash) ->
|
||||
new BlackHoleAutodetectProcess(job.getId());
|
||||
// factor of 1.0 makes renormalization a no-op
|
||||
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
@ -13,6 +14,7 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
@ -28,10 +30,12 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobAction.Response, FlushJobAction.RequestBuilder> {
|
||||
@ -59,6 +63,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
public static final ParseField START = new ParseField("start");
|
||||
public static final ParseField END = new ParseField("end");
|
||||
public static final ParseField ADVANCE_TIME = new ParseField("advance_time");
|
||||
public static final ParseField SKIP_TIME = new ParseField("skip_time");
|
||||
|
||||
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
|
||||
|
||||
@ -68,6 +73,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
PARSER.declareString(Request::setStart, START);
|
||||
PARSER.declareString(Request::setEnd, END);
|
||||
PARSER.declareString(Request::setAdvanceTime, ADVANCE_TIME);
|
||||
PARSER.declareString(Request::setSkipTime, SKIP_TIME);
|
||||
}
|
||||
|
||||
public static Request parseRequest(String jobId, XContentParser parser) {
|
||||
@ -82,6 +88,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
private String start;
|
||||
private String end;
|
||||
private String advanceTime;
|
||||
private String skipTime;
|
||||
|
||||
Request() {
|
||||
}
|
||||
@ -114,12 +121,22 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
public String getAdvanceTime() { return advanceTime; }
|
||||
public String getAdvanceTime() {
|
||||
return advanceTime;
|
||||
}
|
||||
|
||||
public void setAdvanceTime(String advanceTime) {
|
||||
this.advanceTime = advanceTime;
|
||||
}
|
||||
|
||||
public String getSkipTime() {
|
||||
return skipTime;
|
||||
}
|
||||
|
||||
public void setSkipTime(String skipTime) {
|
||||
this.skipTime = skipTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
@ -127,6 +144,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
start = in.readOptionalString();
|
||||
end = in.readOptionalString();
|
||||
advanceTime = in.readOptionalString();
|
||||
if (in.getVersion().after(Version.V_5_5_0)) {
|
||||
skipTime = in.readOptionalString();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -136,11 +156,14 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
out.writeOptionalString(start);
|
||||
out.writeOptionalString(end);
|
||||
out.writeOptionalString(advanceTime);
|
||||
if (out.getVersion().after(Version.V_5_5_0)) {
|
||||
out.writeOptionalString(skipTime);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, calcInterim, start, end, advanceTime);
|
||||
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -156,7 +179,8 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
calcInterim == other.calcInterim &&
|
||||
Objects.equals(start, other.start) &&
|
||||
Objects.equals(end, other.end) &&
|
||||
Objects.equals(advanceTime, other.advanceTime);
|
||||
Objects.equals(advanceTime, other.advanceTime) &&
|
||||
Objects.equals(skipTime, other.skipTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -173,6 +197,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
if (advanceTime != null) {
|
||||
builder.field(ADVANCE_TIME.getPreferredName(), advanceTime);
|
||||
}
|
||||
if (skipTime != null) {
|
||||
builder.field(SKIP_TIME.getPreferredName(), skipTime);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -188,36 +215,52 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
|
||||
|
||||
private boolean flushed;
|
||||
private Date lastFinalizedBucketEnd;
|
||||
|
||||
Response() {
|
||||
super(null, null);
|
||||
}
|
||||
|
||||
Response(boolean flushed) {
|
||||
public Response(boolean flushed, @Nullable Date lastFinalizedBucketEnd) {
|
||||
super(null, null);
|
||||
this.flushed = flushed;
|
||||
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
|
||||
}
|
||||
|
||||
public boolean isFlushed() {
|
||||
return flushed;
|
||||
}
|
||||
|
||||
public Date getLastFinalizedBucketEnd() {
|
||||
return lastFinalizedBucketEnd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
flushed = in.readBoolean();
|
||||
if (in.getVersion().after(Version.V_5_5_0)) {
|
||||
lastFinalizedBucketEnd = new Date(in.readVLong());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(flushed);
|
||||
if (out.getVersion().after(Version.V_5_5_0)) {
|
||||
out.writeVLong(lastFinalizedBucketEnd.getTime());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("flushed", flushed);
|
||||
if (lastFinalizedBucketEnd != null) {
|
||||
builder.dateField(FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName(),
|
||||
FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", lastFinalizedBucketEnd.getTime());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -227,12 +270,13 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Response response = (Response) o;
|
||||
return flushed == response.flushed;
|
||||
return flushed == response.flushed &&
|
||||
Objects.equals(lastFinalizedBucketEnd, response.lastFinalizedBucketEnd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(flushed);
|
||||
return Objects.hash(flushed, lastFinalizedBucketEnd);
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,6 +305,9 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
if (request.getAdvanceTime() != null) {
|
||||
paramsBuilder.advanceTime(request.getAdvanceTime());
|
||||
}
|
||||
if (request.getSkipTime() != null) {
|
||||
paramsBuilder.skipTime(request.getSkipTime());
|
||||
}
|
||||
TimeRange.Builder timeRangeBuilder = TimeRange.builder();
|
||||
if (request.getStart() != null) {
|
||||
timeRangeBuilder.startTime(request.getStart());
|
||||
@ -269,13 +316,12 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
|
||||
timeRangeBuilder.endTime(request.getEnd());
|
||||
}
|
||||
paramsBuilder.forTimeRange(timeRangeBuilder.build());
|
||||
processManager.flushJob(task, paramsBuilder.build(), e -> {
|
||||
if (e == null) {
|
||||
listener.onResponse(new Response(true));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
processManager.flushJob(task, paramsBuilder.build(), ActionListener.wrap(
|
||||
flushAcknowledgement -> {
|
||||
listener.onResponse(new Response(true,
|
||||
flushAcknowledgement == null ? null : flushAcknowledgement.getLastFinalizedBucketEnd()));
|
||||
}, listener::onFailure
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -186,13 +186,15 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
|
||||
public static class JobParams implements PersistentTaskParams {
|
||||
|
||||
/** TODO Remove in 7.0.0 */
|
||||
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
|
||||
|
||||
public static final ParseField TIMEOUT = new ParseField("timeout");
|
||||
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, JobParams::new);
|
||||
|
||||
static {
|
||||
PARSER.declareString(JobParams::setJobId, Job.ID);
|
||||
PARSER.declareBoolean(JobParams::setIgnoreDowntime, IGNORE_DOWNTIME);
|
||||
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
|
||||
PARSER.declareString((params, val) ->
|
||||
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
|
||||
}
|
||||
@ -210,7 +212,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
}
|
||||
|
||||
private String jobId;
|
||||
private boolean ignoreDowntime = true;
|
||||
// A big state can take a while to restore. For symmetry with the _close endpoint any
|
||||
// changes here should be reflected there too.
|
||||
private TimeValue timeout = MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT;
|
||||
@ -224,7 +225,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
|
||||
public JobParams(StreamInput in) throws IOException {
|
||||
jobId = in.readString();
|
||||
ignoreDowntime = in.readBoolean();
|
||||
if (in.getVersion().onOrBefore(Version.V_5_5_0)) {
|
||||
// Read `ignoreDowntime`
|
||||
in.readBoolean();
|
||||
}
|
||||
timeout = TimeValue.timeValueMillis(in.readVLong());
|
||||
}
|
||||
|
||||
@ -236,14 +240,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
this.jobId = jobId;
|
||||
}
|
||||
|
||||
public boolean isIgnoreDowntime() {
|
||||
return ignoreDowntime;
|
||||
}
|
||||
|
||||
public void setIgnoreDowntime(boolean ignoreDowntime) {
|
||||
this.ignoreDowntime = ignoreDowntime;
|
||||
}
|
||||
|
||||
public TimeValue getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
@ -260,7 +256,10 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
out.writeBoolean(ignoreDowntime);
|
||||
if (out.getVersion().onOrBefore(Version.V_5_5_0)) {
|
||||
// Write `ignoreDowntime` - true by default
|
||||
out.writeBoolean(true);
|
||||
}
|
||||
out.writeVLong(timeout.millis());
|
||||
}
|
||||
|
||||
@ -268,7 +267,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(IGNORE_DOWNTIME.getPreferredName(), ignoreDowntime);
|
||||
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
@ -276,7 +274,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, ignoreDowntime, timeout);
|
||||
return Objects.hash(jobId, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -289,7 +287,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
}
|
||||
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
|
||||
return Objects.equals(jobId, other.jobId) &&
|
||||
Objects.equals(ignoreDowntime, other.ignoreDowntime) &&
|
||||
Objects.equals(timeout, other.timeout);
|
||||
}
|
||||
|
||||
@ -588,7 +585,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||
protected void nodeOperation(AllocatedPersistentTask task, JobParams params) {
|
||||
JobTask jobTask = (JobTask) task;
|
||||
jobTask.autodetectProcessManager = autodetectProcessManager;
|
||||
autodetectProcessManager.openJob(jobTask, params.isIgnoreDowntime(), e2 -> {
|
||||
autodetectProcessManager.openJob(jobTask, e2 -> {
|
||||
if (e2 == null) {
|
||||
task.markAsCompleted();
|
||||
} else {
|
||||
|
@ -77,7 +77,7 @@ class DatafeedJob {
|
||||
}
|
||||
|
||||
Long runLookBack(long startTime, Long endTime) throws Exception {
|
||||
lookbackStartTimeMs = (lastEndTimeMs != null && lastEndTimeMs + 1 > startTime) ? lastEndTimeMs + 1 : startTime;
|
||||
lookbackStartTimeMs = skipToStartTime(startTime);
|
||||
Optional<Long> endMs = Optional.ofNullable(endTime);
|
||||
long lookbackEnd = endMs.orElse(currentTimeSupplier.get() - queryDelayMs);
|
||||
boolean isLookbackOnly = endMs.isPresent();
|
||||
@ -115,6 +115,22 @@ class DatafeedJob {
|
||||
return null;
|
||||
}
|
||||
|
||||
private long skipToStartTime(long startTime) {
|
||||
if (lastEndTimeMs != null) {
|
||||
if (lastEndTimeMs + 1 > startTime) {
|
||||
// start time is before last checkpoint, thus continue from checkpoint
|
||||
return lastEndTimeMs + 1;
|
||||
}
|
||||
// start time is after last checkpoint, thus we need to skip time
|
||||
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
|
||||
request.setSkipTime(String.valueOf(startTime));
|
||||
FlushJobAction.Response flushResponse = flushJob(request);
|
||||
LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
|
||||
return flushResponse.getLastFinalizedBucketEnd().getTime();
|
||||
}
|
||||
return startTime;
|
||||
}
|
||||
|
||||
long runRealtime() throws Exception {
|
||||
long start = lastEndTimeMs == null ? lookbackStartTimeMs : Math.max(lookbackStartTimeMs, lastEndTimeMs + 1);
|
||||
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
|
||||
@ -265,10 +281,10 @@ class DatafeedJob {
|
||||
return (epochMs / frequencyMs) * frequencyMs;
|
||||
}
|
||||
|
||||
private void flushJob(FlushJobAction.Request flushRequest) {
|
||||
private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) {
|
||||
try {
|
||||
LOGGER.trace("[" + jobId + "] Sending flush request");
|
||||
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
|
||||
return client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
|
||||
} catch (Exception e) {
|
||||
LOGGER.debug("[" + jobId + "] error while flushing job", e);
|
||||
|
||||
|
@ -78,7 +78,6 @@ public class ProcessCtrl {
|
||||
*/
|
||||
static final String BUCKET_SPAN_ARG = "--bucketspan=";
|
||||
public static final String DELETE_STATE_FILES_ARG = "--deleteStateFiles";
|
||||
static final String IGNORE_DOWNTIME_ARG = "--ignoreDowntime";
|
||||
static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
|
||||
static final String MODEL_CONFIG_ARG = "--modelconfig=";
|
||||
public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState=";
|
||||
@ -151,8 +150,7 @@ public class ProcessCtrl {
|
||||
return rng.nextInt(SECONDS_IN_HOUR);
|
||||
}
|
||||
|
||||
public static List<String> buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, boolean ignoreDowntime,
|
||||
long controllerPid) {
|
||||
public static List<String> buildAutodetectCommand(Environment env, Settings settings, Job job, Logger logger, long controllerPid) {
|
||||
List<String> command = new ArrayList<>();
|
||||
command.add(AUTODETECT_PATH);
|
||||
|
||||
@ -213,10 +211,6 @@ public class ProcessCtrl {
|
||||
int maxQuantileInterval = BASE_MAX_QUANTILE_INTERVAL + intervalStagger;
|
||||
command.add(MAX_QUANTILE_INTERVAL_ARG + maxQuantileInterval);
|
||||
|
||||
if (ignoreDowntime) {
|
||||
command.add(IGNORE_DOWNTIME_ARG);
|
||||
}
|
||||
|
||||
if (modelConfigFilePresent(env)) {
|
||||
String modelConfigFile = XPackPlugin.resolveConfigFile(env, ML_MODEL_CONF).toString();
|
||||
command.add(MODEL_CONFIG_ARG + modelConfigFile);
|
||||
|
@ -43,7 +43,6 @@ public class AutodetectBuilder {
|
||||
private Job job;
|
||||
private List<Path> filesToDelete;
|
||||
private Logger logger;
|
||||
private boolean ignoreDowntime;
|
||||
private Set<MlFilter> referencedFilters;
|
||||
private Quantiles quantiles;
|
||||
private Environment env;
|
||||
@ -68,21 +67,9 @@ public class AutodetectBuilder {
|
||||
this.job = Objects.requireNonNull(job);
|
||||
this.filesToDelete = Objects.requireNonNull(filesToDelete);
|
||||
this.logger = Objects.requireNonNull(logger);
|
||||
ignoreDowntime = false;
|
||||
referencedFilters = new HashSet<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set ignoreDowntime
|
||||
*
|
||||
* @param ignoreDowntime If true set the ignore downtime flag overriding the
|
||||
* setting in the job configuration
|
||||
*/
|
||||
public AutodetectBuilder ignoreDowntime(boolean ignoreDowntime) {
|
||||
this.ignoreDowntime = ignoreDowntime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AutodetectBuilder referencedFilters(Set<MlFilter> filters) {
|
||||
referencedFilters = filters;
|
||||
return this;
|
||||
@ -103,7 +90,7 @@ public class AutodetectBuilder {
|
||||
*/
|
||||
public void build() throws IOException, TimeoutException {
|
||||
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, ignoreDowntime, controller.getPid());
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job, logger, controller.getPid());
|
||||
|
||||
buildLimits(command);
|
||||
buildModelPlotConfig(command);
|
||||
|
@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
@ -23,6 +24,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
|
||||
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
|
||||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
@ -196,23 +198,24 @@ public class AutodetectCommunicator implements Closeable {
|
||||
}, handler);
|
||||
}
|
||||
|
||||
public void flushJob(FlushJobParams params, BiConsumer<Void, Exception> handler) {
|
||||
public void flushJob(FlushJobParams params, BiConsumer<FlushAcknowledgement, Exception> handler) {
|
||||
submitOperation(() -> {
|
||||
String flushId = autodetectProcess.flushJob(params);
|
||||
waitFlushToCompletion(flushId);
|
||||
return null;
|
||||
return waitFlushToCompletion(flushId);
|
||||
}, handler);
|
||||
}
|
||||
|
||||
void waitFlushToCompletion(String flushId) {
|
||||
@Nullable
|
||||
FlushAcknowledgement waitFlushToCompletion(String flushId) {
|
||||
LOGGER.debug("[{}] waiting for flush", job.getId());
|
||||
|
||||
FlushAcknowledgement flushAcknowledgement;
|
||||
try {
|
||||
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
||||
while (isFlushComplete == false) {
|
||||
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
||||
while (flushAcknowledgement == null) {
|
||||
checkProcessIsAlive();
|
||||
checkResultsProcessorIsAlive();
|
||||
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
||||
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
|
||||
}
|
||||
} finally {
|
||||
autoDetectResultProcessor.clearAwaitingFlush(flushId);
|
||||
@ -225,6 +228,8 @@ public class AutodetectCommunicator implements Closeable {
|
||||
|
||||
LOGGER.debug("[{}] Flush completed", job.getId());
|
||||
}
|
||||
|
||||
return flushAcknowledgement;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -25,13 +25,11 @@ public interface AutodetectProcessFactory {
|
||||
* @param modelSnapshot The model snapshot to restore from
|
||||
* @param quantiles The quantiles to push to the native process
|
||||
* @param filters The filters to push to the native process
|
||||
* @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start
|
||||
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
|
||||
* @param onProcessCrash Callback to execute if the process stops unexpectedly
|
||||
* @return The process
|
||||
*/
|
||||
AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters,
|
||||
boolean ignoreDowntime,
|
||||
ExecutorService executorService,
|
||||
Runnable onProcessCrash);
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
|
||||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
|
||||
@ -201,23 +202,23 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||
* @param params Parameters describing the controls that will accompany the flushing
|
||||
* (e.g. calculating interim results, time control, etc.)
|
||||
*/
|
||||
public void flushJob(JobTask jobTask, FlushJobParams params, Consumer<Exception> handler) {
|
||||
public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<FlushAcknowledgement> handler) {
|
||||
logger.debug("Flushing job {}", jobTask.getJobId());
|
||||
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
|
||||
if (communicator == null) {
|
||||
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
|
||||
logger.debug(message);
|
||||
handler.accept(ExceptionsHelper.conflictStatusException(message));
|
||||
handler.onFailure(ExceptionsHelper.conflictStatusException(message));
|
||||
return;
|
||||
}
|
||||
|
||||
communicator.flushJob(params, (aVoid, e) -> {
|
||||
if (e == null) {
|
||||
handler.accept(null);
|
||||
} else {
|
||||
communicator.flushJob(params, (flushAcknowledgement, e) -> {
|
||||
if (e != null) {
|
||||
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobTask.getJobId());
|
||||
logger.error(msg);
|
||||
handler.accept(ExceptionsHelper.serverError(msg, e));
|
||||
handler.onFailure(ExceptionsHelper.serverError(msg, e));
|
||||
} else {
|
||||
handler.onResponse(flushAcknowledgement);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -240,7 +241,7 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||
});
|
||||
}
|
||||
|
||||
public void openJob(JobTask jobTask, boolean ignoreDowntime, Consumer<Exception> handler) {
|
||||
public void openJob(JobTask jobTask, Consumer<Exception> handler) {
|
||||
String jobId = jobTask.getJobId();
|
||||
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
|
||||
|
||||
@ -263,7 +264,7 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||
protected void doRun() throws Exception {
|
||||
try {
|
||||
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobTask.getAllocationId(),
|
||||
id -> create(jobTask, params, ignoreDowntime, handler));
|
||||
id -> create(jobTask, params, handler));
|
||||
communicator.init(params.modelSnapshot());
|
||||
setJobState(jobTask, JobState.OPENED);
|
||||
} catch (Exception e1) {
|
||||
@ -286,8 +287,7 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||
});
|
||||
}
|
||||
|
||||
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams,
|
||||
boolean ignoreDowntime, Consumer<Exception> handler) {
|
||||
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer<Exception> handler) {
|
||||
if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) {
|
||||
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
|
||||
RestStatus.TOO_MANY_REQUESTS);
|
||||
@ -321,8 +321,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
||||
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
|
||||
|
||||
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
|
||||
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
|
||||
autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED));
|
||||
autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService,
|
||||
() -> setJobState(jobTask, JobState.FAILED));
|
||||
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
|
||||
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
|
||||
ExecutorService autodetectWorkerExecutor;
|
||||
|
@ -77,7 +77,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
|
||||
*/
|
||||
@Override
|
||||
public String flushJob(FlushJobParams params) throws IOException {
|
||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID);
|
||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null);
|
||||
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement);
|
||||
results.add(result);
|
||||
return FLUSH_ID;
|
||||
|
@ -59,13 +59,12 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
||||
@Override
|
||||
public AutodetectProcess createAutodetectProcess(Job job, ModelSnapshot modelSnapshot,
|
||||
Quantiles quantiles, Set<MlFilter> filters,
|
||||
boolean ignoreDowntime,
|
||||
ExecutorService executorService,
|
||||
Runnable onProcessCrash) {
|
||||
List<Path> filesToDelete = new ArrayList<>();
|
||||
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.AUTODETECT, job.getId(),
|
||||
true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
|
||||
createNativeProcess(job, quantiles, filters, processPipes, ignoreDowntime, filesToDelete);
|
||||
createNativeProcess(job, quantiles, filters, processPipes, filesToDelete);
|
||||
int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size();
|
||||
|
||||
StateProcessor stateProcessor = new StateProcessor(settings, client);
|
||||
@ -88,11 +87,10 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
||||
}
|
||||
|
||||
private void createNativeProcess(Job job, Quantiles quantiles, Set<MlFilter> filters, ProcessPipes processPipes,
|
||||
boolean ignoreDowntime, List<Path> filesToDelete) {
|
||||
List<Path> filesToDelete) {
|
||||
try {
|
||||
AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
|
||||
settings, nativeController, processPipes)
|
||||
.ignoreDowntime(ignoreDowntime)
|
||||
.referencedFilters(filters);
|
||||
|
||||
// if state is null or empty it will be ignored
|
||||
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
@ -231,7 +232,7 @@ public class AutoDetectResultProcessor {
|
||||
// through to the data store
|
||||
context.bulkResultsPersister.executeRequest();
|
||||
persister.commitResultWrites(context.jobId);
|
||||
flushListener.acknowledgeFlush(flushAcknowledgement.getId());
|
||||
flushListener.acknowledgeFlush(flushAcknowledgement);
|
||||
// Interim results may have been produced by the flush,
|
||||
// which need to be
|
||||
// deleted when the next finalized results come through
|
||||
@ -291,13 +292,11 @@ public class AutoDetectResultProcessor {
|
||||
*
|
||||
* @param flushId the id of the flush request to wait for
|
||||
* @param timeout the timeout
|
||||
* @return {@code true} if the flush has completed or the parsing finished; {@code false} if the timeout expired
|
||||
* @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
|
||||
*/
|
||||
public boolean waitForFlushAcknowledgement(String flushId, Duration timeout) {
|
||||
if (failed) {
|
||||
return false;
|
||||
}
|
||||
return flushListener.waitForFlush(flushId, timeout);
|
||||
@Nullable
|
||||
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
|
||||
return failed ? null : flushListener.waitForFlush(flushId, timeout);
|
||||
}
|
||||
|
||||
public void clearAwaitingFlush(String flushId) {
|
||||
|
@ -5,15 +5,20 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
@ -25,44 +30,70 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
|
||||
*/
|
||||
public static final ParseField TYPE = new ParseField("flush");
|
||||
public static final ParseField ID = new ParseField("id");
|
||||
public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
|
||||
|
||||
public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
|
||||
TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0]));
|
||||
TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Date) a[1]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
|
||||
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
|
||||
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
|
||||
return new Date(p.longValue());
|
||||
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
|
||||
return new Date(TimeUtils.dateStringToEpoch(p.text()));
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"unexpected token [" + p.currentToken() + "] for [" + LAST_FINALIZED_BUCKET_END.getPreferredName() + "]");
|
||||
}, LAST_FINALIZED_BUCKET_END, ObjectParser.ValueType.VALUE);
|
||||
}
|
||||
|
||||
private String id;
|
||||
private Date lastFinalizedBucketEnd;
|
||||
|
||||
public FlushAcknowledgement(String id) {
|
||||
public FlushAcknowledgement(String id, Date lastFinalizedBucketEnd) {
|
||||
this.id = id;
|
||||
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
|
||||
}
|
||||
|
||||
public FlushAcknowledgement(StreamInput in) throws IOException {
|
||||
id = in.readString();
|
||||
if (in.getVersion().after(Version.V_5_5_0)) {
|
||||
lastFinalizedBucketEnd = new Date(in.readVLong());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(id);
|
||||
if (out.getVersion().after(Version.V_5_5_0)) {
|
||||
out.writeVLong(lastFinalizedBucketEnd.getTime());
|
||||
}
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public Date getLastFinalizedBucketEnd() {
|
||||
return lastFinalizedBucketEnd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(ID.getPreferredName(), id);
|
||||
if (lastFinalizedBucketEnd != null) {
|
||||
builder.dateField(LAST_FINALIZED_BUCKET_END.getPreferredName(), LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string",
|
||||
lastFinalizedBucketEnd.getTime());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id);
|
||||
return Objects.hash(id, lastFinalizedBucketEnd);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -74,7 +105,8 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
|
||||
return false;
|
||||
}
|
||||
FlushAcknowledgement other = (FlushAcknowledgement) obj;
|
||||
return Objects.equals(id, other.id);
|
||||
return Objects.equals(id, other.id) &&
|
||||
Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,8 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -15,29 +17,34 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
class FlushListener {
|
||||
|
||||
final ConcurrentMap<String, CountDownLatch> awaitingFlushed = new ConcurrentHashMap<>();
|
||||
final ConcurrentMap<String, FlushAcknowledgementHolder> awaitingFlushed = new ConcurrentHashMap<>();
|
||||
final AtomicBoolean cleared = new AtomicBoolean(false);
|
||||
|
||||
boolean waitForFlush(String flushId, Duration timeout) {
|
||||
@Nullable
|
||||
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) {
|
||||
if (cleared.get()) {
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
|
||||
FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
|
||||
try {
|
||||
return latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
return holder.flushAcknowledgement;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
void acknowledgeFlush(String flushId) {
|
||||
void acknowledgeFlush(FlushAcknowledgement flushAcknowledgement) {
|
||||
// acknowledgeFlush(...) could be called before waitForFlush(...)
|
||||
// a flush api call writes a flush command to the analytical process and then via a different thread the
|
||||
// result reader then reads whether the flush has been acked.
|
||||
CountDownLatch latch = awaitingFlushed.computeIfAbsent(flushId, (key) -> new CountDownLatch(1));
|
||||
latch.countDown();
|
||||
String flushId = flushAcknowledgement.getId();
|
||||
FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
|
||||
holder.flushAcknowledgement = flushAcknowledgement;
|
||||
holder.latch.countDown();
|
||||
}
|
||||
|
||||
void clear(String flushId) {
|
||||
@ -46,11 +53,22 @@ class FlushListener {
|
||||
|
||||
void clear() {
|
||||
if (cleared.compareAndSet(false, true)) {
|
||||
Iterator<ConcurrentMap.Entry<String, CountDownLatch>> latches = awaitingFlushed.entrySet().iterator();
|
||||
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
|
||||
while (latches.hasNext()) {
|
||||
latches.next().getValue().countDown();
|
||||
latches.next().getValue().latch.countDown();
|
||||
latches.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class FlushAcknowledgementHolder {
|
||||
|
||||
private final CountDownLatch latch;
|
||||
private volatile FlushAcknowledgement flushAcknowledgement;
|
||||
|
||||
private FlushAcknowledgementHolder(String flushId) {
|
||||
this.flushAcknowledgement = new FlushAcknowledgement(flushId, null);
|
||||
this.latch = new CountDownLatch(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
|
||||
@ -13,14 +14,32 @@ import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
|
||||
import java.util.Objects;
|
||||
|
||||
public class FlushJobParams {
|
||||
|
||||
/**
|
||||
* Whether interim results should be calculated
|
||||
*/
|
||||
private final boolean calcInterim;
|
||||
|
||||
/**
|
||||
* The time range for which interim results should be calculated
|
||||
*/
|
||||
private final TimeRange timeRange;
|
||||
|
||||
/**
|
||||
* The epoch (seconds) to advance time to
|
||||
*/
|
||||
private final Long advanceTimeSeconds;
|
||||
|
||||
private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds) {
|
||||
/**
|
||||
* The epoch (seconds) to skip time to
|
||||
*/
|
||||
private final Long skipTimeSeconds;
|
||||
|
||||
private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds, Long skipTimeSeconds) {
|
||||
this.calcInterim = calcInterim;
|
||||
this.timeRange = Objects.requireNonNull(timeRange);
|
||||
this.advanceTimeSeconds = advanceTimeSeconds;
|
||||
this.skipTimeSeconds = skipTimeSeconds;
|
||||
}
|
||||
|
||||
public boolean shouldCalculateInterim() {
|
||||
@ -31,6 +50,10 @@ public class FlushJobParams {
|
||||
return advanceTimeSeconds != null;
|
||||
}
|
||||
|
||||
public boolean shouldSkipTime() {
|
||||
return skipTimeSeconds != null;
|
||||
}
|
||||
|
||||
public String getStart() {
|
||||
return timeRange.getStart();
|
||||
}
|
||||
@ -46,6 +69,13 @@ public class FlushJobParams {
|
||||
return advanceTimeSeconds;
|
||||
}
|
||||
|
||||
public long getSkipTime() {
|
||||
if (!shouldSkipTime()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
return skipTimeSeconds;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
@ -57,24 +87,20 @@ public class FlushJobParams {
|
||||
FlushJobParams that = (FlushJobParams) o;
|
||||
return calcInterim == that.calcInterim &&
|
||||
Objects.equals(timeRange, that.timeRange) &&
|
||||
Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds);
|
||||
Objects.equals(advanceTimeSeconds, that.advanceTimeSeconds) &&
|
||||
Objects.equals(skipTimeSeconds, that.skipTimeSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(calcInterim, timeRange, advanceTimeSeconds);
|
||||
return Objects.hash(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private boolean calcInterim = false;
|
||||
private TimeRange timeRange;
|
||||
private TimeRange timeRange = TimeRange.builder().build();
|
||||
private String advanceTime;
|
||||
|
||||
private Builder() {
|
||||
calcInterim = false;
|
||||
timeRange = TimeRange.builder().build();
|
||||
advanceTime = "";
|
||||
}
|
||||
private String skipTime;
|
||||
|
||||
public Builder calcInterim(boolean value) {
|
||||
calcInterim = value;
|
||||
@ -87,14 +113,24 @@ public class FlushJobParams {
|
||||
}
|
||||
|
||||
public Builder advanceTime(String timestamp) {
|
||||
advanceTime = ExceptionsHelper.requireNonNull(timestamp, "advance");
|
||||
advanceTime = ExceptionsHelper.requireNonNull(timestamp, "advance_time");
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder skipTime(String timestamp) {
|
||||
skipTime = ExceptionsHelper.requireNonNull(timestamp, "skip_time");
|
||||
return this;
|
||||
}
|
||||
|
||||
public FlushJobParams build() {
|
||||
checkValidFlushArgumentsCombination();
|
||||
Long advanceTimeSeconds = checkAdvanceTimeParam();
|
||||
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds);
|
||||
Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime);
|
||||
Long skipTimeSeconds = parseTimeParam("skip_time", skipTime);
|
||||
if (skipTimeSeconds != null && advanceTimeSeconds != null && advanceTimeSeconds <= skipTimeSeconds) {
|
||||
throw ExceptionsHelper.badRequestException("advance_time [" + advanceTime + "] must be later than skip_time ["
|
||||
+ skipTime + "]");
|
||||
}
|
||||
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
|
||||
}
|
||||
|
||||
private void checkValidFlushArgumentsCombination() {
|
||||
@ -107,11 +143,11 @@ public class FlushJobParams {
|
||||
}
|
||||
}
|
||||
|
||||
private Long checkAdvanceTimeParam() {
|
||||
if (advanceTime != null && !advanceTime.isEmpty()) {
|
||||
return paramToEpochIfValidOrThrow("advance_time", advanceTime) / TimeRange.MILLISECONDS_IN_SECOND;
|
||||
private Long parseTimeParam(String name, String value) {
|
||||
if (Strings.isNullOrEmpty(value)) {
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
return paramToEpochIfValidOrThrow(name, value) / TimeRange.MILLISECONDS_IN_SECOND;
|
||||
}
|
||||
|
||||
private long paramToEpochIfValidOrThrow(String paramName, String date) {
|
||||
|
@ -52,6 +52,11 @@ public class ControlMsgToProcessWriter {
|
||||
*/
|
||||
private static final String ADVANCE_TIME_MESSAGE_CODE = "t";
|
||||
|
||||
/**
|
||||
* This must match the code defined in the api::CAnomalyDetector C++ class.
|
||||
*/
|
||||
private static final String SKIP_TIME_MESSAGE_CODE = "s";
|
||||
|
||||
/**
|
||||
* This must match the code defined in the api::CAnomalyDetector C++ class.
|
||||
*/
|
||||
@ -108,6 +113,9 @@ public class ControlMsgToProcessWriter {
|
||||
* (e.g. calculating interim results, time control, etc.)
|
||||
*/
|
||||
public void writeFlushControlMessage(FlushJobParams params) throws IOException {
|
||||
if (params.shouldSkipTime()) {
|
||||
writeMessage(SKIP_TIME_MESSAGE_CODE + params.getSkipTime());
|
||||
}
|
||||
if (params.shouldAdvanceTime()) {
|
||||
writeMessage(ADVANCE_TIME_MESSAGE_CODE + params.getAdvanceTime());
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ public class RestFlushJobAction extends BaseRestHandler {
|
||||
private final String DEFAULT_START = "";
|
||||
private final String DEFAULT_END = "";
|
||||
private final String DEFAULT_ADVANCE_TIME = "";
|
||||
private final String DEFAULT_SKIP_TIME = "";
|
||||
|
||||
public RestFlushJobAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
@ -50,6 +51,7 @@ public class RestFlushJobAction extends BaseRestHandler {
|
||||
request.setStart(restRequest.param(FlushJobAction.Request.START.getPreferredName(), DEFAULT_START));
|
||||
request.setEnd(restRequest.param(FlushJobAction.Request.END.getPreferredName(), DEFAULT_END));
|
||||
request.setAdvanceTime(restRequest.param(FlushJobAction.Request.ADVANCE_TIME.getPreferredName(), DEFAULT_ADVANCE_TIME));
|
||||
request.setSkipTime(restRequest.param(FlushJobAction.Request.SKIP_TIME.getPreferredName(), DEFAULT_SKIP_TIME));
|
||||
}
|
||||
|
||||
return channel -> client.execute(FlushJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
|
@ -42,7 +42,6 @@ public class RestOpenJobAction extends BaseRestHandler {
|
||||
request = OpenJobAction.Request.parseRequest(restRequest.param(Job.ID.getPreferredName()), restRequest.contentParser());
|
||||
} else {
|
||||
OpenJobAction.JobParams jobParams = new OpenJobAction.JobParams(restRequest.param(Job.ID.getPreferredName()));
|
||||
jobParams.setIgnoreDowntime(restRequest.paramAsBoolean(OpenJobAction.JobParams.IGNORE_DOWNTIME.getPreferredName(), true));
|
||||
if (restRequest.hasParam(OpenJobAction.JobParams.TIMEOUT.getPreferredName())) {
|
||||
TimeValue openTimeout = restRequest.paramAsTime(OpenJobAction.JobParams.TIMEOUT.getPreferredName(),
|
||||
TimeValue.timeValueSeconds(20));
|
||||
|
@ -136,16 +136,18 @@ public class ReportingAttachmentParser implements EmailAttachmentParser<Reportin
|
||||
logger.trace("Watch[{}] reporting[{}] pdf is not ready, polling in [{}] again", context.watch().id(), attachment.id(),
|
||||
TimeValue.timeValueMillis(sleepMillis));
|
||||
} else if (response.status() >= 400) {
|
||||
String body = response.body() != null ? response.body().utf8ToString() : null;
|
||||
throw new ElasticsearchException("Watch[{}] reporting[{}] Error when polling pdf from host[{}], port[{}], " +
|
||||
"method[{}], path[{}], status[{}]", context.watch().id(), attachment.id(), request.host(), request.port(),
|
||||
request.method(), request.path(), reportGenerationResponse.status());
|
||||
"method[{}], path[{}], status[{}], body[{}]", context.watch().id(), attachment.id(), request.host(),
|
||||
request.port(), request.method(), request.path(), response.status(), body);
|
||||
} else if (response.status() == 200) {
|
||||
return new Attachment.Bytes(attachment.id(), BytesReference.toBytes(response.body()),
|
||||
response.contentType(), attachment.inline());
|
||||
} else {
|
||||
String body = response.body() != null ? response.body().utf8ToString() : null;
|
||||
String message = LoggerMessageFormat.format("", "Watch[{}] reporting[{}] Unexpected status code host[{}], port[{}], " +
|
||||
"method[{}], path[{}], status[{}]", context.watch().id(), attachment.id(), request.host(), request.port(),
|
||||
request.method(), request.path(), reportGenerationResponse.status());
|
||||
"method[{}], path[{}], status[{}], body[{}]", context.watch().id(), attachment.id(), request.host(),
|
||||
request.port(), request.method(), request.path(), response.status(), body);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
|
@ -18,9 +18,6 @@ public class OpenJobActionRequestTests extends AbstractStreamableXContentTestCas
|
||||
if (randomBoolean()) {
|
||||
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
params.setIgnoreDowntime(randomBoolean());
|
||||
}
|
||||
return new Request(params);
|
||||
}
|
||||
|
||||
|
@ -7,12 +7,13 @@ package org.elasticsearch.xpack.ml.action;
|
||||
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.xpack.ml.action.FlushJobAction.Response;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
public class PostDataFlushResponseTests extends AbstractStreamableTestCase<Response> {
|
||||
|
||||
@Override
|
||||
protected Response createTestInstance() {
|
||||
return new Response(randomBoolean());
|
||||
return new Response(randomBoolean(), new DateTime(randomDateTimeZone()).toDate());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -27,9 +27,11 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
@ -139,6 +141,7 @@ public class DatafeedJobTests extends ESTestCase {
|
||||
assertEquals(10000 + frequencyMs + 100, next);
|
||||
|
||||
verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs);
|
||||
assertThat(flushJobRequests.getAllValues().size(), equalTo(1));
|
||||
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
|
||||
flushRequest.setCalcInterim(true);
|
||||
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
|
||||
@ -148,14 +151,17 @@ public class DatafeedJobTests extends ESTestCase {
|
||||
// We need to return empty counts so that the lookback doesn't update the last end time
|
||||
when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(new DataCounts("_job_id")));
|
||||
|
||||
currentTime = 10000L;
|
||||
currentTime = 9999L;
|
||||
long latestFinalBucketEndTimeMs = 5000;
|
||||
long latestRecordTimeMs = 5000;
|
||||
|
||||
FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, new Date(10000L));
|
||||
when(flushJobFuture.actionGet()).thenReturn(skipTimeResponse);
|
||||
|
||||
long frequencyMs = 1000;
|
||||
long queryDelayMs = 500;
|
||||
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
|
||||
datafeedJob.runLookBack(10000L, null);
|
||||
datafeedJob.runLookBack(currentTime, null);
|
||||
|
||||
// advance time
|
||||
currentTime = 12000L;
|
||||
@ -163,6 +169,13 @@ public class DatafeedJobTests extends ESTestCase {
|
||||
expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runRealtime());
|
||||
|
||||
verify(dataExtractorFactory, times(1)).newExtractor(10000L, 11000L);
|
||||
List<FlushJobAction.Request> capturedFlushJobRequests = flushJobRequests.getAllValues();
|
||||
assertThat(capturedFlushJobRequests.size(), equalTo(2));
|
||||
assertThat(capturedFlushJobRequests.get(0).getCalcInterim(), is(false));
|
||||
assertThat(capturedFlushJobRequests.get(0).getSkipTime(), equalTo("9999"));
|
||||
assertThat(capturedFlushJobRequests.get(1).getCalcInterim(), is(true));
|
||||
assertThat(capturedFlushJobRequests.get(1).getSkipTime(), is(nullValue()));
|
||||
assertThat(capturedFlushJobRequests.get(1).getAdvanceTime(), equalTo("11000"));
|
||||
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
|
||||
}
|
||||
|
||||
|
@ -354,7 +354,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
}
|
||||
|
||||
private FlushAcknowledgement createFlushAcknowledgement() {
|
||||
return new FlushAcknowledgement(randomAlphaOfLength(5));
|
||||
return new FlushAcknowledgement(randomAlphaOfLength(5), new Date(randomNonNegativeLong()));
|
||||
}
|
||||
|
||||
private class ResultsBuilder {
|
||||
|
@ -444,8 +444,9 @@ public class MlJobIT extends ESRestTestCase {
|
||||
response = client().performRequest("delete", indexName + "/_alias/" + writeAliasName);
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
|
||||
// check alias was deleted
|
||||
expectThrows(ResponseException.class, () -> client().performRequest("get", "_cat/aliases"));
|
||||
// check aliases were deleted
|
||||
expectThrows(ResponseException.class, () -> client().performRequest("get", indexName + "/_alias/" + readAliasName));
|
||||
expectThrows(ResponseException.class, () -> client().performRequest("get", indexName + "/_alias/" + writeAliasName));
|
||||
|
||||
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
|
||||
|
@ -51,8 +51,8 @@ public class ProcessCtrlTests extends ESTestCase {
|
||||
dd.setTimeField("tf");
|
||||
job.setDataDescription(dd);
|
||||
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid);
|
||||
assertEquals(15, command.size());
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
|
||||
assertEquals(14, command.size());
|
||||
assertTrue(command.contains(ProcessCtrl.AUTODETECT_PATH));
|
||||
assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "120"));
|
||||
assertTrue(command.contains(ProcessCtrl.LATENCY_ARG + "360"));
|
||||
@ -73,7 +73,6 @@ public class ProcessCtrlTests extends ESTestCase {
|
||||
assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
|
||||
int expectedMaxQuantileInterval = 21600 + ProcessCtrl.calculateStaggeringInterval(job.getId());
|
||||
assertTrue(command.contains(ProcessCtrl.MAX_QUANTILE_INTERVAL_ARG + expectedMaxQuantileInterval));
|
||||
assertTrue(command.contains(ProcessCtrl.IGNORE_DOWNTIME_ARG));
|
||||
}
|
||||
|
||||
public void testBuildAutodetectCommand_defaultTimeField() {
|
||||
@ -81,7 +80,7 @@ public class ProcessCtrlTests extends ESTestCase {
|
||||
Environment env = new Environment(settings);
|
||||
Job.Builder job = buildJobBuilder("unit-test-job");
|
||||
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
|
||||
|
||||
assertTrue(command.contains(ProcessCtrl.TIME_FIELD_ARG + "time"));
|
||||
}
|
||||
@ -94,38 +93,16 @@ public class ProcessCtrlTests extends ESTestCase {
|
||||
|
||||
int expectedPersistInterval = 10800 + ProcessCtrl.calculateStaggeringInterval(job.getId());
|
||||
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
|
||||
assertFalse(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
|
||||
|
||||
settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
env = new Environment(settings);
|
||||
|
||||
command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
|
||||
command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, pid);
|
||||
assertTrue(command.contains(ProcessCtrl.PERSIST_INTERVAL_ARG + expectedPersistInterval));
|
||||
}
|
||||
|
||||
public void testBuildAutodetectCommand_GivenNoIgnoreDowntime() {
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
Environment env = new Environment(
|
||||
settings);
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, false, pid);
|
||||
|
||||
assertFalse(command.contains("--ignoreDowntime"));
|
||||
}
|
||||
|
||||
public void testBuildAutodetectCommand_GivenIgnoreDowntimeParam() {
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
Environment env = new Environment(
|
||||
settings);
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
|
||||
List<String> command = ProcessCtrl.buildAutodetectCommand(env, settings, job.build(), logger, true, pid);
|
||||
|
||||
assertTrue(command.contains("--ignoreDowntime"));
|
||||
}
|
||||
|
||||
public void testBuildNormalizerCommand() throws IOException {
|
||||
Environment env = new Environment(
|
||||
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
|
||||
|
@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
|
||||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
|
||||
@ -35,9 +36,11 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
@ -70,10 +73,13 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
when(process.isProcessAlive()).thenReturn(true);
|
||||
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
|
||||
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(true);
|
||||
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
|
||||
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(flushAcknowledgement);
|
||||
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor)) {
|
||||
FlushJobParams params = FlushJobParams.builder().build();
|
||||
communicator.flushJob(params, (aVoid, e) -> {});
|
||||
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
|
||||
communicator.flushJob(params, (f, e) -> flushAcknowledgementHolder.set(f));
|
||||
assertThat(flushAcknowledgementHolder.get(), equalTo(flushAcknowledgement));
|
||||
Mockito.verify(process).flushJob(params);
|
||||
}
|
||||
}
|
||||
@ -83,7 +89,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
||||
when(process.isProcessAlive()).thenReturn(true);
|
||||
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
|
||||
when(processor.isFailed()).thenReturn(true);
|
||||
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(false);
|
||||
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null);
|
||||
AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor);
|
||||
expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo"));
|
||||
}
|
||||
@ -103,8 +109,9 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
when(process.isProcessAlive()).thenReturn(true);
|
||||
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);
|
||||
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
|
||||
when(autoDetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1))))
|
||||
.thenReturn(false).thenReturn(true);
|
||||
.thenReturn(null).thenReturn(flushAcknowledgement);
|
||||
FlushJobParams params = FlushJobParams.builder().build();
|
||||
|
||||
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, autoDetectResultProcessor)) {
|
||||
|
@ -6,6 +6,7 @@
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -134,7 +135,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
when(jobTask.getJobId()).thenReturn(job.getId());
|
||||
|
||||
AtomicReference<Exception> errorHolder = new AtomicReference<>();
|
||||
manager.openJob(jobTask, false, e -> errorHolder.set(e));
|
||||
manager.openJob(jobTask, e -> errorHolder.set(e));
|
||||
|
||||
Exception error = errorHolder.get();
|
||||
assertThat(error, is(notNullValue()));
|
||||
@ -150,7 +151,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
when(jobTask.getAllocationId()).thenReturn(1L);
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
assertEquals(1, manager.numberOfOpenJobs());
|
||||
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
|
||||
verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any());
|
||||
@ -175,7 +176,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
when(autodetectProcess.isProcessAlive()).thenReturn(true);
|
||||
when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator());
|
||||
AutodetectProcessFactory autodetectProcessFactory =
|
||||
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
|
||||
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
|
||||
Settings.Builder settings = Settings.builder();
|
||||
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
|
||||
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
|
||||
@ -192,22 +193,22 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("bar");
|
||||
when(jobTask.getAllocationId()).thenReturn(1L);
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("baz");
|
||||
when(jobTask.getAllocationId()).thenReturn(2L);
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
assertEquals(3, manager.numberOfOpenJobs());
|
||||
|
||||
Exception[] holder = new Exception[1];
|
||||
jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foobar");
|
||||
when(jobTask.getAllocationId()).thenReturn(3L);
|
||||
manager.openJob(jobTask, false, e -> holder[0] = e);
|
||||
manager.openJob(jobTask, e -> holder[0] = e);
|
||||
Exception e = holder[0];
|
||||
assertEquals("max running job capacity [3] reached", e.getMessage());
|
||||
|
||||
@ -216,7 +217,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
when(jobTask.getJobId()).thenReturn("baz");
|
||||
manager.closeJob(jobTask, false, null);
|
||||
assertEquals(2, manager.numberOfOpenJobs());
|
||||
manager.openJob(jobTask, false, e1 -> {});
|
||||
manager.openJob(jobTask, e1 -> {});
|
||||
assertEquals(3, manager.numberOfOpenJobs());
|
||||
}
|
||||
|
||||
@ -228,7 +229,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
|
||||
params, (dataCounts1, e) -> {});
|
||||
assertEquals(1, manager.numberOfOpenJobs());
|
||||
@ -250,7 +251,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
Exception[] holder = new Exception[1];
|
||||
manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
|
||||
assertNotNull(holder[0]);
|
||||
@ -263,7 +264,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
|
||||
mock(DataLoadParams.class), (dataCounts1, e) -> {});
|
||||
|
||||
@ -282,7 +283,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
InputStream inputStream = createInputStream("");
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> {});
|
||||
verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any());
|
||||
}
|
||||
@ -294,12 +295,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
InputStream inputStream = createInputStream("");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, inputStream, randomFrom(XContentType.values()),
|
||||
mock(DataLoadParams.class), (dataCounts1, e) -> {});
|
||||
|
||||
FlushJobParams params = FlushJobParams.builder().build();
|
||||
manager.flushJob(jobTask, params, e -> {});
|
||||
manager.flushJob(jobTask, params, ActionListener.wrap(flushAcknowledgement -> {}, e -> fail(e.getMessage())));
|
||||
|
||||
verify(communicator).flushJob(same(params), any());
|
||||
}
|
||||
@ -318,7 +319,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
Exception[] holder = new Exception[1];
|
||||
manager.flushJob(jobTask, params, e -> holder[0] = e);
|
||||
manager.flushJob(jobTask, params, ActionListener.wrap(flushAcknowledgement -> {}, e -> holder[0] = e));
|
||||
assertEquals("[foo] exception while flushing job", holder[0].getMessage());
|
||||
}
|
||||
|
||||
@ -333,8 +334,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
// create a jobtask
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {
|
||||
});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class),
|
||||
(dataCounts1, e) -> {
|
||||
});
|
||||
@ -366,7 +366,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
|
||||
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
|
||||
mock(DataLoadParams.class), (dataCounts1, e) -> {});
|
||||
|
||||
@ -385,7 +385,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
|
||||
when(communicator.getJobTask()).thenReturn(jobTask);
|
||||
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
|
||||
mock(DataLoadParams.class), (dataCounts1, e) -> {});
|
||||
|
||||
@ -408,7 +408,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
InputStream inputStream = createInputStream("");
|
||||
DataCounts[] dataCounts = new DataCounts[1];
|
||||
manager.processData(jobTask, inputStream,
|
||||
@ -429,7 +429,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
|
||||
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
|
||||
AutodetectProcessFactory autodetectProcessFactory =
|
||||
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
|
||||
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
|
||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
|
||||
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
|
||||
@ -437,7 +437,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("my_id");
|
||||
expectThrows(EsRejectedExecutionException.class,
|
||||
() -> manager.create(jobTask, buildAutodetectParams(), false, e -> {}));
|
||||
() -> manager.create(jobTask, buildAutodetectParams(), e -> {}));
|
||||
verify(autodetectProcess, times(1)).close();
|
||||
}
|
||||
|
||||
@ -447,7 +447,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
|
||||
manager.create(jobTask, buildAutodetectParams(), e -> {});
|
||||
|
||||
String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]";
|
||||
verify(auditor).info("foo", expectedNotification);
|
||||
@ -463,7 +463,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
|
||||
manager.create(jobTask, buildAutodetectParams(), e -> {});
|
||||
|
||||
String expectedNotification = "Loading model snapshot [snapshot-1] with " +
|
||||
"latest_record_timestamp [1970-01-01T00:00:00.000Z], " +
|
||||
@ -482,7 +482,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
|
||||
manager.create(jobTask, buildAutodetectParams(), e -> {});
|
||||
|
||||
String expectedNotification = "Loading model snapshot [N/A], " +
|
||||
"job latest_record_timestamp [1970-01-01T00:00:00.000Z]";
|
||||
@ -501,7 +501,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
when(jobManager.getJobOrThrowIfUnknown(jobId)).thenReturn(createJobDetails(jobId));
|
||||
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
|
||||
AutodetectProcessFactory autodetectProcessFactory =
|
||||
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
|
||||
(j, modelSnapshot, quantiles, filters, e, onProcessCrash) -> autodetectProcess;
|
||||
return new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
|
||||
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
|
||||
@ -531,7 +531,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
autodetectProcessFactory, normalizerFactory,
|
||||
new NamedXContentRegistry(Collections.emptyList()), auditor);
|
||||
manager = spy(manager);
|
||||
doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), anyBoolean(), any());
|
||||
doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), any());
|
||||
return manager;
|
||||
}
|
||||
|
||||
@ -539,7 +539,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
||||
AutodetectProcessManager manager = createManager(communicator);
|
||||
JobTask jobTask = mock(JobTask.class);
|
||||
when(jobTask.getJobId()).thenReturn("foo");
|
||||
manager.openJob(jobTask, false, e -> {});
|
||||
manager.openJob(jobTask, e -> {});
|
||||
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
|
||||
mock(DataLoadParams.class), (dataCounts, e) -> {});
|
||||
return manager;
|
||||
|
@ -34,6 +34,8 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.same;
|
||||
@ -200,7 +202,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
||||
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
|
||||
processorUnderTest.processResult(context, result);
|
||||
|
||||
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
|
||||
verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
|
||||
verify(persister, times(1)).commitResultWrites(JOB_ID);
|
||||
verify(bulkBuilder, times(1)).executeRequest();
|
||||
verifyNoMoreInteractions(persister);
|
||||
@ -225,7 +227,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
||||
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
|
||||
inOrder.verify(bulkBuilder, times(1)).executeRequest();
|
||||
inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID);
|
||||
inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
|
||||
inOrder.verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
|
||||
verifyNoMoreInteractions(persister);
|
||||
assertTrue(context.deleteInterimRequired);
|
||||
}
|
||||
@ -342,7 +344,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
||||
assertTrue(processorUnderTest.isFailed());
|
||||
|
||||
// Wait for flush should return immediately
|
||||
assertFalse(processorUnderTest.waitForFlushAcknowledgement("foo", Duration.of(300, ChronoUnit.SECONDS)));
|
||||
FlushAcknowledgement flushAcknowledgement = processorUnderTest.waitForFlushAcknowledgement(
|
||||
"foo", Duration.of(300, ChronoUnit.SECONDS));
|
||||
assertThat(flushAcknowledgement, is(nullValue()));
|
||||
}
|
||||
|
||||
public void testKill() throws TimeoutException {
|
||||
|
@ -9,6 +9,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
public class FlushAcknowledgementTests extends AbstractSerializingTestCase<FlushAcknowledgement> {
|
||||
|
||||
@Override
|
||||
@ -18,7 +20,7 @@ public class FlushAcknowledgementTests extends AbstractSerializingTestCase<Flush
|
||||
|
||||
@Override
|
||||
protected FlushAcknowledgement createTestInstance() {
|
||||
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20));
|
||||
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), new Date(randomNonNegativeLong()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -9,22 +9,26 @@ import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class FlushListenerTests extends ESTestCase {
|
||||
|
||||
public void testAcknowledgeFlush() throws Exception {
|
||||
FlushListener listener = new FlushListener();
|
||||
AtomicBoolean bool = new AtomicBoolean();
|
||||
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
|
||||
new Thread(() -> {
|
||||
boolean result = listener.waitForFlush("_id", Duration.ofMillis(10000));
|
||||
bool.set(result);
|
||||
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
|
||||
flushAcknowledgementHolder.set(flushAcknowledgement);
|
||||
}).start();
|
||||
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
|
||||
assertFalse(bool.get());
|
||||
listener.acknowledgeFlush("_id");
|
||||
assertBusy(() -> assertTrue(bool.get()));
|
||||
assertNull(flushAcknowledgementHolder.get());
|
||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L));
|
||||
listener.acknowledgeFlush(flushAcknowledgement);
|
||||
assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get()));
|
||||
assertEquals(1, listener.awaitingFlushed.size());
|
||||
|
||||
listener.clear("_id");
|
||||
@ -35,27 +39,26 @@ public class FlushListenerTests extends ESTestCase {
|
||||
FlushListener listener = new FlushListener();
|
||||
|
||||
int numWaits = 9;
|
||||
List<AtomicBoolean> bools = new ArrayList<>(numWaits);
|
||||
List<AtomicReference<FlushAcknowledgement>> flushAcknowledgementHolders = new ArrayList<>(numWaits);
|
||||
for (int i = 0; i < numWaits; i++) {
|
||||
int id = i;
|
||||
AtomicBoolean bool = new AtomicBoolean();
|
||||
bools.add(bool);
|
||||
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
|
||||
flushAcknowledgementHolders.add(flushAcknowledgementHolder);
|
||||
new Thread(() -> {
|
||||
boolean result = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
|
||||
bool.set(result);
|
||||
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
|
||||
flushAcknowledgementHolder.set(flushAcknowledgement);
|
||||
}).start();
|
||||
}
|
||||
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
|
||||
for (AtomicBoolean bool : bools) {
|
||||
assertFalse(bool.get());
|
||||
}
|
||||
assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false));
|
||||
assertFalse(listener.cleared.get());
|
||||
|
||||
listener.clear();
|
||||
for (AtomicBoolean bool : bools) {
|
||||
assertBusy(() -> assertTrue(bool.get()));
|
||||
|
||||
for (AtomicReference<FlushAcknowledgement> f : flushAcknowledgementHolders) {
|
||||
assertBusy(() -> assertNotNull(f.get()));
|
||||
}
|
||||
assertTrue(listener.awaitingFlushed.isEmpty());
|
||||
assertTrue(listener.cleared.get());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -5,27 +5,31 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FlushJobParamsTests extends ESTestCase {
|
||||
|
||||
public void testBuilder_GivenDefault() {
|
||||
FlushJobParams params = FlushJobParams.builder().build();
|
||||
assertFalse(params.shouldCalculateInterim());
|
||||
assertFalse(params.shouldAdvanceTime());
|
||||
assertFalse(params.shouldSkipTime());
|
||||
assertEquals("", params.getStart());
|
||||
assertEquals("", params.getEnd());
|
||||
}
|
||||
|
||||
|
||||
public void testBuilder_GivenCalcInterim() {
|
||||
FlushJobParams params = FlushJobParams.builder().calcInterim(true).build();
|
||||
assertTrue(params.shouldCalculateInterim());
|
||||
assertFalse(params.shouldAdvanceTime());
|
||||
assertFalse(params.shouldSkipTime());
|
||||
assertEquals("", params.getStart());
|
||||
assertEquals("", params.getEnd());
|
||||
}
|
||||
|
||||
|
||||
public void testBuilder_GivenCalcInterimAndStart() {
|
||||
FlushJobParams params = FlushJobParams.builder()
|
||||
.calcInterim(true)
|
||||
@ -33,6 +37,7 @@ public class FlushJobParamsTests extends ESTestCase {
|
||||
.build();
|
||||
assertTrue(params.shouldCalculateInterim());
|
||||
assertFalse(params.shouldAdvanceTime());
|
||||
assertFalse(params.shouldSkipTime());
|
||||
assertEquals("42", params.getStart());
|
||||
assertEquals("43", params.getEnd());
|
||||
}
|
||||
@ -47,7 +52,6 @@ public class FlushJobParamsTests extends ESTestCase {
|
||||
assertEquals("Invalid flush parameters: 'start' has not been specified.", e.getMessage());
|
||||
}
|
||||
|
||||
|
||||
public void testBuilder_GivenCalcInterimAndStartAndEnd() {
|
||||
FlushJobParams params = FlushJobParams.builder()
|
||||
.calcInterim(true)
|
||||
@ -59,7 +63,6 @@ public class FlushJobParamsTests extends ESTestCase {
|
||||
assertEquals("7200", params.getEnd());
|
||||
}
|
||||
|
||||
|
||||
public void testBuilder_GivenAdvanceTime() {
|
||||
FlushJobParams params = FlushJobParams.builder().advanceTime("1821").build();
|
||||
assertFalse(params.shouldCalculateInterim());
|
||||
@ -69,7 +72,6 @@ public class FlushJobParamsTests extends ESTestCase {
|
||||
assertEquals(1821, params.getAdvanceTime());
|
||||
}
|
||||
|
||||
|
||||
public void testBuilder_GivenCalcInterimAndAdvanceTime() {
|
||||
FlushJobParams params = FlushJobParams.builder()
|
||||
.calcInterim(true)
|
||||
@ -82,7 +84,6 @@ public class FlushJobParamsTests extends ESTestCase {
|
||||
assertEquals(1940, params.getAdvanceTime());
|
||||
}
|
||||
|
||||
|
||||
public void testBuilder_GivenCalcInterimWithTimeRangeAndAdvanceTime() {
|
||||
FlushJobParams params = FlushJobParams.builder()
|
||||
.calcInterim(true)
|
||||
@ -96,6 +97,27 @@ public class FlushJobParamsTests extends ESTestCase {
|
||||
assertEquals(1940, params.getAdvanceTime());
|
||||
}
|
||||
|
||||
public void testBuilder_GivenAdvanceTimeIsEarlierThanSkipTime() {
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> FlushJobParams.builder().advanceTime("2017-01-01T00:00:00Z").skipTime("2017-02-01T00:00:00Z").build());
|
||||
|
||||
assertEquals("advance_time [2017-01-01T00:00:00Z] must be later than skip_time [2017-02-01T00:00:00Z]", e.getMessage());
|
||||
}
|
||||
|
||||
public void testBuilder_GivenAdvanceTimeIsEqualToSkipTime() {
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> FlushJobParams.builder().advanceTime("2017-01-01T00:00:00Z").skipTime("2017-01-01T00:00:00Z").build());
|
||||
|
||||
assertEquals("advance_time [2017-01-01T00:00:00Z] must be later than skip_time [2017-01-01T00:00:00Z]", e.getMessage());
|
||||
}
|
||||
|
||||
public void testBuilder_GivenAdvanceTimeIsLaterToSkipTime() {
|
||||
FlushJobParams params = FlushJobParams.builder().advanceTime("2017-02-01T00:00:00Z").skipTime("2017-01-01T00:00:00Z").build();
|
||||
|
||||
assertThat(params.getSkipTime(), equalTo(1483228800L));
|
||||
assertThat(params.getAdvanceTime(), equalTo(1485907200L));
|
||||
}
|
||||
|
||||
public void testValidate_GivenOnlyStartSpecified() {
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> FlushJobParams.builder().forTimeRange(TimeRange.builder().startTime("1").build()).build());
|
||||
|
@ -41,8 +41,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
|
||||
|
||||
public void testWriteFlushControlMessage_GivenAdvanceTime() throws IOException {
|
||||
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
|
||||
FlushJobParams flushJobParams = FlushJobParams.builder()
|
||||
.advanceTime("1234567890").build();
|
||||
FlushJobParams flushJobParams = FlushJobParams.builder().advanceTime("1234567890").build();
|
||||
|
||||
writer.writeFlushControlMessage(flushJobParams);
|
||||
|
||||
@ -53,6 +52,30 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
|
||||
verifyNoMoreInteractions(lengthEncodedWriter);
|
||||
}
|
||||
|
||||
public void testWriteFlushControlMessage_GivenSkipTime() throws IOException {
|
||||
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
|
||||
FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1234567890").build();
|
||||
|
||||
writer.writeFlushControlMessage(flushJobParams);
|
||||
|
||||
InOrder inOrder = inOrder(lengthEncodedWriter);
|
||||
inOrder.verify(lengthEncodedWriter).writeNumFields(4);
|
||||
inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
|
||||
inOrder.verify(lengthEncodedWriter).writeField("s1234567890");
|
||||
verifyNoMoreInteractions(lengthEncodedWriter);
|
||||
}
|
||||
|
||||
public void testWriteFlushControlMessage_GivenSkipAndAdvanceTime() throws IOException {
|
||||
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
|
||||
FlushJobParams flushJobParams = FlushJobParams.builder().skipTime("1000").advanceTime("2000").build();
|
||||
|
||||
writer.writeFlushControlMessage(flushJobParams);
|
||||
|
||||
InOrder inOrder = inOrder(lengthEncodedWriter);
|
||||
inOrder.verify(lengthEncodedWriter).writeField("s1000");
|
||||
inOrder.verify(lengthEncodedWriter).writeField("t2000");
|
||||
}
|
||||
|
||||
public void testWriteFlushControlMessage_GivenCalcInterimResultsWithNoTimeParams() throws IOException {
|
||||
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
|
||||
FlushJobParams flushJobParams = FlushJobParams.builder()
|
||||
@ -67,7 +90,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
|
||||
verifyNoMoreInteractions(lengthEncodedWriter);
|
||||
}
|
||||
|
||||
public void testWriteFlushControlMessage_GivenNeitherCalcInterimNorAdvanceTime() throws IOException {
|
||||
public void testWriteFlushControlMessage_GivenPlainFlush() throws IOException {
|
||||
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
|
||||
FlushJobParams flushJobParams = FlushJobParams.builder().build();
|
||||
|
||||
|
@ -91,7 +91,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
||||
categoryDefinition = null;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20));
|
||||
flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), new Date(randomNonNegativeLong()));
|
||||
} else {
|
||||
flushAcknowledgement = null;
|
||||
}
|
||||
|
@ -240,9 +240,10 @@ public class ReportingAttachmentParserTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testPollingRequestIsError() throws Exception {
|
||||
boolean hasBody = randomBoolean();
|
||||
when(httpClient.execute(any(HttpRequest.class)))
|
||||
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
|
||||
.thenReturn(new HttpResponse(403));
|
||||
.thenReturn(new HttpResponse(403, hasBody ? "no permissions" : null));
|
||||
|
||||
ReportingAttachment attachment =
|
||||
new ReportingAttachment("foo", "http://www.example.org/", randomBoolean(), TimeValue.timeValueMillis(1), 10, null, null);
|
||||
@ -250,6 +251,9 @@ public class ReportingAttachmentParserTests extends ESTestCase {
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
|
||||
assertThat(e.getMessage(), containsString("Error when polling pdf"));
|
||||
if (hasBody) {
|
||||
assertThat(e.getMessage(), containsString("body[no permissions]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testPollingRequestRetryIsExceeded() throws Exception {
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.info": {
|
||||
"documentation": "Retrieve information about xpack, including build number/timestamp and license status",
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/info-api.html",
|
||||
"methods": [ "GET" ],
|
||||
"url": {
|
||||
"path": "/_xpack",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.close_job": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-close-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-close-job.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_close",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.delete_datafeed": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-delete-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-delete-datafeed.html",
|
||||
"methods": [ "DELETE" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.delete_job": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-delete-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-delete-job.html",
|
||||
"methods": [ "DELETE" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.delete_model_snapshot": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-delete-snapshot.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-delete-snapshot.html",
|
||||
"methods": [ "DELETE" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/model_snapshots/{snapshot_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.flush_job": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-flush-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html",
|
||||
"methods": [
|
||||
"POST"
|
||||
],
|
||||
@ -31,7 +31,11 @@
|
||||
},
|
||||
"advance_time": {
|
||||
"type": "string",
|
||||
"description": "Setting this tells the Engine API that no data prior to advance_time is expected"
|
||||
"description": "Advances time to the given value generating results and updating the model for the advanced interval"
|
||||
},
|
||||
"skip_time": {
|
||||
"type": "string",
|
||||
"description": "Skips time to the given value without generating results or updating the model for the skipped interval"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_buckets": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-bucket.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-bucket.html",
|
||||
"methods": [ "GET", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/results/buckets/{timestamp}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_categories": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-category.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-category.html",
|
||||
"methods": [ "GET", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/results/categories/{category_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_datafeed_stats": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-datafeed-stats.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-datafeed-stats.html",
|
||||
"methods": [ "GET"],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}/_stats",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_datafeeds": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-datafeed.html",
|
||||
"methods": [ "GET"],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_influencers": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-influencer.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-influencer.html",
|
||||
"methods": [ "GET", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/results/influencers",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_job_stats": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-job-stats.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html",
|
||||
"methods": [ "GET"],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_stats",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_jobs": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job.html",
|
||||
"methods": [ "GET"],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_model_snapshots": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-snapshot.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-snapshot.html",
|
||||
"methods": [ "GET", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/model_snapshots/{snapshot_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.get_records": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-get-record.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-record.html",
|
||||
"methods": ["GET", "POST"],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/results/records",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.open_job": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-open-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-open-job.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_open",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.post_data": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-post-data.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-post-data.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_data",
|
||||
@ -30,4 +30,3 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.preview_datafeed": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-preview-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-preview-datafeed.html",
|
||||
"methods": [ "GET" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}/_preview",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.put_datafeed": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-put-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-datafeed.html",
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.put_job": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-put-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-job.html",
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}",
|
||||
@ -16,6 +16,6 @@
|
||||
"body": {
|
||||
"description" : "The job",
|
||||
"required" : true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.revert_model_snapshot": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-revert-snapshot.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-revert-snapshot.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/model_snapshots/{snapshot_id}/_revert",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.start_datafeed": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-start-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}/_start",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.stop_datafeed": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-stop-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-stop-datafeed.html",
|
||||
"methods": [
|
||||
"POST"
|
||||
],
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.update_datafeed": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-update-datafeed.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-update-datafeed.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/datafeeds/{datafeed_id}/_update",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.update_job": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-update-job.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-update-job.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_update",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.ml.update_model_snapshot": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-update-snapshot.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-update-snapshot.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/{job_id}/model_snapshots/{snapshot_id}/_update",
|
||||
|
@ -1,6 +1,5 @@
|
||||
{
|
||||
"xpack.ml.validate": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-valid-job.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/_validate",
|
||||
@ -10,6 +9,6 @@
|
||||
"body": {
|
||||
"description" : "The job config",
|
||||
"required" : true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
{
|
||||
"xpack.ml.validate_detector": {
|
||||
"documentation": "http://www.elastic.co/guide/en/x-pack/current/ml-valid-detector.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/ml/anomaly_detectors/_validate/detector",
|
||||
@ -10,6 +9,6 @@
|
||||
"body": {
|
||||
"description" : "The detector",
|
||||
"required" : true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.ack_watch": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-ack-watch.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-ack-watch.html",
|
||||
"methods": [ "PUT", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{watch_id}/_ack",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.activate_watch": {
|
||||
"documentation": "https://www.elastic.co/guide/en/watcher/current/api-rest.html#api-rest-activate-watch",
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-activate-watch.html",
|
||||
"methods": [ "PUT", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{watch_id}/_activate",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.deactivate_watch": {
|
||||
"documentation": "https://www.elastic.co/guide/en/watcher/current/api-rest.html#api-rest-deactivate-watch",
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-deactivate-watch.html",
|
||||
"methods": [ "PUT", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{watch_id}/_deactivate",
|
||||
|
@ -1,6 +1,7 @@
|
||||
{
|
||||
"xpack.watcher.delete_watch": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-delete-watch.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-delete-watch.html",
|
||||
|
||||
"methods": [ "DELETE" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.execute_watch": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-execute-watch.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-execute-watch.html",
|
||||
"methods": [ "PUT", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{id}/_execute",
|
||||
@ -22,6 +22,6 @@
|
||||
"body": {
|
||||
"description" : "Execution control",
|
||||
"required" : false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.get_watch": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-get-watch.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html",
|
||||
"methods": [ "GET" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{id}",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.put_watch": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-put-watch.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-put-watch.html",
|
||||
"methods": [ "PUT", "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/watch/{id}",
|
||||
@ -26,6 +26,6 @@
|
||||
"body": {
|
||||
"description" : "The watch",
|
||||
"required" : true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.restart": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-service.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-restart.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/_restart",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.start": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-service.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-start.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/_start",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.stats": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-stats.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-stats.html",
|
||||
"methods": [ "GET" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/stats",
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"xpack.watcher.stop": {
|
||||
"documentation": "http://www.elastic.co/guide/en/watcher/current/appendix-api-service.html",
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-stop.html",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/_xpack/watcher/_stop",
|
||||
|
@ -66,6 +66,7 @@ setup:
|
||||
xpack.ml.flush_job:
|
||||
job_id: post-data-job
|
||||
- match: { flushed: true }
|
||||
- match: { last_finalized_bucket_end: 1403481600000 }
|
||||
|
||||
- do:
|
||||
xpack.ml.close_job:
|
||||
@ -100,6 +101,7 @@ setup:
|
||||
xpack.ml.flush_job:
|
||||
job_id: post-data-job
|
||||
- match: { flushed: true }
|
||||
- match: { last_finalized_bucket_end: 0 }
|
||||
|
||||
- do:
|
||||
xpack.ml.close_job:
|
||||
@ -111,6 +113,72 @@ setup:
|
||||
job_id: post-data-job
|
||||
- match: { jobs.0.state: "closed" }
|
||||
|
||||
---
|
||||
"Test flush with skip_time":
|
||||
|
||||
- do:
|
||||
xpack.ml.post_data:
|
||||
job_id: post-data-job
|
||||
body:
|
||||
- airline: AAL
|
||||
responsetime: 132.2046
|
||||
sourcetype: post-data-job
|
||||
time: 1403481600
|
||||
- airline: AAL
|
||||
responsetime: 990.4628
|
||||
sourcetype: post-data-job
|
||||
time: 1403485200
|
||||
|
||||
# Skip a bucket
|
||||
- do:
|
||||
xpack.ml.flush_job:
|
||||
job_id: post-data-job
|
||||
skip_time: 1403488700
|
||||
- match: { flushed: true }
|
||||
- match: { last_finalized_bucket_end: 1403488800000 }
|
||||
|
||||
# Send some data that should be ignored
|
||||
- do:
|
||||
xpack.ml.post_data:
|
||||
job_id: post-data-job
|
||||
body:
|
||||
- airline: AAL
|
||||
responsetime: 132.2046
|
||||
sourcetype: post-data-job
|
||||
time: 1403488600
|
||||
- airline: AAL
|
||||
responsetime: 990.4628
|
||||
sourcetype: post-data-job
|
||||
time: 1403488700
|
||||
|
||||
# Send data that will create results for the bucket after the skipped one
|
||||
- do:
|
||||
xpack.ml.post_data:
|
||||
job_id: post-data-job
|
||||
body:
|
||||
- airline: AAL
|
||||
responsetime: 132.2046
|
||||
sourcetype: post-data-job
|
||||
time: 1403488900
|
||||
- airline: AAL
|
||||
responsetime: 132.2046
|
||||
sourcetype: post-data-job
|
||||
time: 1403492400
|
||||
|
||||
- do:
|
||||
xpack.ml.close_job:
|
||||
job_id: post-data-job
|
||||
- match: { closed: true }
|
||||
|
||||
- do:
|
||||
xpack.ml.get_buckets:
|
||||
job_id: "post-data-job"
|
||||
- match: { count: 2 }
|
||||
- match: { buckets.0.timestamp: 1403481600000 }
|
||||
- match: { buckets.0.event_count: 1 }
|
||||
- match: { buckets.1.timestamp: 1403488800000 }
|
||||
- match: { buckets.1.event_count: 1 }
|
||||
|
||||
---
|
||||
"Test POST data with invalid parameters":
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user