Merge branch 'master' into feature/sql

Original commit: elastic/x-pack-elasticsearch@989e27840f
This commit is contained in:
Nik Everett 2017-11-21 10:34:23 -05:00
commit 0d4a91af50
27 changed files with 386 additions and 183 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.lease.Releasable;
@ -106,11 +107,9 @@ public class MlDailyMaintenanceService implements Releasable {
private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
try {
client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request());
} catch (Exception e) {
LOGGER.error("An error occurred during maintenance tasks execution", e);
}
client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap(
response -> LOGGER.info("Successfully completed [ML] maintenance tasks"),
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
}
}

View File

@ -28,10 +28,15 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Request, DeleteExpiredDataAction.Response,
@ -139,14 +144,24 @@ public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Requ
private void deleteExpiredData(ActionListener<Response> listener) {
Auditor auditor = new Auditor(client, clusterService);
ExpiredResultsRemover resultsRemover = new ExpiredResultsRemover(client, clusterService, auditor);
resultsRemover.trigger(() -> {
ExpiredModelSnapshotsRemover modelSnapshotsRemover = new ExpiredModelSnapshotsRemover(client, clusterService);
modelSnapshotsRemover.trigger(() -> {
logger.debug("Finished deleting expired data");
listener.onResponse(new Response(true));
});
});
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredModelSnapshotsRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
}
private void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator, ActionListener<Response> listener) {
if (mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next();
remover.remove(ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener),
listener::onFailure));
} else {
logger.info("Completed deletion of expired data");
listener.onResponse(new Response(true));
}
}
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import java.io.IOException;
import java.util.Objects;
@ -58,6 +59,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static final ParseField END_TIME = new ParseField("end");
public static final ParseField DURATION = new ParseField("duration");
public static final ParseField EXPIRES_IN = new ParseField("expires_in");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
@ -65,6 +67,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString(Request::setDuration, DURATION);
PARSER.declareString(Request::setExpiresIn, EXPIRES_IN);
}
public static Request parseRequest(String jobId, XContentParser parser) {
@ -77,6 +80,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
private String endTime;
private TimeValue duration;
private TimeValue expiresIn;
Request() {
}
@ -101,11 +105,20 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
this.duration = TimeValue.parseTimeValue(duration, DURATION.getPreferredName());
}
public TimeValue getExpiresIn() {
return expiresIn;
}
public void setExpiresIn(String expiration) {
this.expiresIn = TimeValue.parseTimeValue(expiration, EXPIRES_IN.getPreferredName());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.endTime = in.readOptionalString();
this.duration = in.readOptionalWriteable(TimeValue::new);
this.expiresIn = in.readOptionalWriteable(TimeValue::new);
}
@Override
@ -113,11 +126,12 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
super.writeTo(out);
out.writeOptionalString(endTime);
out.writeOptionalWriteable(duration);
out.writeOptionalWriteable(expiresIn);
}
@Override
public int hashCode() {
return Objects.hash(jobId, endTime, duration);
return Objects.hash(jobId, endTime, duration, expiresIn);
}
@Override
@ -129,7 +143,8 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime) && Objects.equals(duration, other.duration);
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime) &&
Objects.equals(duration, other.duration) && Objects.equals(expiresIn, other.expiresIn);
}
@Override
@ -142,6 +157,9 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
if (duration != null) {
builder.field(DURATION.getPreferredName(), duration.getStringRep());
}
if (expiresIn != null) {
builder.field(EXPIRES_IN.getPreferredName(), expiresIn.getStringRep());
}
builder.endObject();
return builder;
}
@ -157,56 +175,64 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
private long id;
private long forecastId;
Response() {
super(null, null);
}
Response(boolean acknowledged, long id) {
Response(boolean acknowledged, long forecastId) {
super(null, null);
this.acknowledged = acknowledged;
this.id = id;
this.forecastId = forecastId;
}
public boolean isacknowledged() {
public boolean isAcknowledged() {
return acknowledged;
}
public long getForecastId() {
return forecastId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
forecastId = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
out.writeLong(forecastId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("acknowledged", acknowledged);
builder.field("id", id);
builder.field(Forecast.FORECAST_ID.getPreferredName(), forecastId);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
public boolean equals(Object obj) {
if (obj == null) {
return false;
Response response = (Response) o;
return acknowledged == response.acknowledged;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return this.acknowledged == other.acknowledged && this.forecastId == other.forecastId;
}
@Override
public int hashCode() {
return Objects.hash(acknowledged);
return Objects.hash(acknowledged, forecastId);
}
}
@ -219,8 +245,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
processManager);
// ThreadPool.Names.SAME, because operations is executed by
// autodetect worker thread
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
@ -239,11 +264,14 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
if (request.getDuration() != null) {
paramsBuilder.duration(request.getDuration());
}
if (request.getExpiresIn() != null) {
paramsBuilder.expiresIn(request.getExpiresIn());
}
ForecastParams params = paramsBuilder.build();
processManager.forecastJob(task, params, e -> {
if (e == null) {
listener.onResponse(new Response(true, params.getId()));
listener.onResponse(new Response(true, params.getForecastId()));
} else {
listener.onFailure(e);
}

View File

@ -222,8 +222,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
createTime = new Date(in.readVLong());
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null;
// TODO: set to V_6_1_0 after backporting
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
establishedModelMemory = in.readOptionalLong();
} else {
establishedModelMemory = null;
@ -484,8 +483,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
} else {
out.writeBoolean(false);
}
// TODO: set to V_6_1_0 after backporting
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
analysisConfig.writeTo(out);
@ -707,8 +705,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
createTime = in.readBoolean() ? new Date(in.readVLong()) : null;
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null;
// TODO: set to V_6_1_0 after backporting
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
establishedModelMemory = in.readOptionalLong();
}
analysisConfig = in.readOptionalWriteable(AnalysisConfig::new);
@ -896,8 +893,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
} else {
out.writeBoolean(false);
}
// TODO: set to V_6_1_0 after backporting
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
out.writeOptionalWriteable(analysisConfig);

View File

@ -124,8 +124,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
}
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
// TODO: set to V_6_1_0 after backporting
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
establishedModelMemory = in.readOptionalLong();
} else {
establishedModelMemory = null;
@ -156,8 +155,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
}
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
// TODO: set to V_6_1_0 after backporting
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
}

View File

@ -340,14 +340,14 @@ public class ElasticsearchMappings {
.endObject();
// Forecast Stats Output
// re-used: PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME
builder.startObject(ForecastRequestStats.START_TIME.getPreferredName())
// re-used: TIMESTAMP, PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME
builder.startObject(ForecastRequestStats.END_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.END_TIME.getPreferredName())
.startObject(ForecastRequestStats.EXPIRY_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.MESSAGE.getPreferredName())
.startObject(ForecastRequestStats.MESSAGES.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ForecastRequestStats.PROGRESS.getPreferredName())

View File

@ -18,12 +18,14 @@ public class ForecastParams {
private final long endTime;
private final long duration;
private final long id;
private final long expiresIn;
private final long forecastId;
private ForecastParams(long id, long endTime, long duration) {
this.id = id;
private ForecastParams(long forecastId, long endTime, long duration, long expiresIn) {
this.forecastId = forecastId;
this.endTime = endTime;
this.duration = duration;
this.expiresIn = expiresIn;
}
/**
@ -42,18 +44,26 @@ public class ForecastParams {
return duration;
}
/**
* The forecast expiration in seconds (duration added to start time)
* @return The expiration in seconds
*/
public long getExpiresIn() {
return expiresIn;
}
/**
* The forecast id
*
* @return The forecast Id
*/
public long getId() {
return id;
public long getForecastId() {
return forecastId;
}
@Override
public int hashCode() {
return Objects.hash(id, endTime, duration);
return Objects.hash(forecastId, endTime, duration, expiresIn);
}
@Override
@ -65,7 +75,8 @@ public class ForecastParams {
return false;
}
ForecastParams other = (ForecastParams) obj;
return Objects.equals(id, other.id) && Objects.equals(endTime, other.endTime) && Objects.equals(duration, other.duration);
return Objects.equals(forecastId, other.forecastId) && Objects.equals(endTime, other.endTime) &&
Objects.equals(duration, other.duration) && Objects.equals(expiresIn, other.expiresIn);
}
public static Builder builder() {
@ -75,6 +86,7 @@ public class ForecastParams {
public static class Builder {
private long endTimeEpochSecs;
private long durationSecs;
private long expiresInSecs;
private long startTime;
private long forecastId;
@ -83,6 +95,9 @@ public class ForecastParams {
endTimeEpochSecs = 0;
forecastId = generateId();
durationSecs = 0;
// because 0 means never expire, the default is -1
expiresInSecs = -1;
}
private long generateId() {
@ -107,12 +122,17 @@ public class ForecastParams {
return this;
}
public Builder expiresIn(TimeValue expiresIn) {
expiresInSecs = expiresIn.seconds();
return this;
}
public ForecastParams build() {
if (endTimeEpochSecs != 0 && durationSecs != 0) {
throw new ElasticsearchParseException(Messages.getMessage(Messages.REST_INVALID_DURATION_AND_ENDTIME));
}
return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs);
return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs, expiresInSecs);
}
}
}

View File

@ -153,7 +153,7 @@ public class ControlMsgToProcessWriter {
public void writeForecastMessage(ForecastParams params) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("forecast_id", params.getId());
.field("forecast_id", params.getForecastId());
if (params.getEndTime() != 0) {
builder.field("end_time", params.getEndTime());
@ -161,6 +161,9 @@ public class ControlMsgToProcessWriter {
if (params.getDuration() != 0) {
builder.field("duration", params.getDuration());
}
if (params.getExpiresIn() != -1) {
builder.field("expires_in", params.getExpiresIn());
}
builder.endObject();
writeMessage(FORECAST_MESSAGE_CODE + builder.string());

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
@ -35,9 +36,9 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
public static final ParseField START_TIME = new ParseField("forecast_start_timestamp");
public static final ParseField END_TIME = new ParseField("forecast_end_timestamp");
public static final ParseField MESSAGE = new ParseField("forecast_message");
public static final ParseField EXPIRY_TIME = new ParseField("forecast_expiry_timestamp");
public static final ParseField MESSAGES = new ParseField("forecast_messages");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
public static final ParseField PROGRESS = new ParseField("forecast_progress");
public static final ParseField PROCESSED_RECORD_COUNT = new ParseField("processed_record_count");
@ -53,11 +54,13 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE);
PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT);
PARSER.declareString(ForecastRequestStats::setMessage, MESSAGE);
PARSER.declareField(ForecastRequestStats::setStartTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG);
PARSER.declareStringArray(ForecastRequestStats::setMessages, MESSAGES);
PARSER.declareField(ForecastRequestStats::setTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), Result.TIMESTAMP, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setEndTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), END_TIME, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setExpiryTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), EXPIRY_TIME, ValueType.LONG);
PARSER.declareDouble(ForecastRequestStats::setProgress, PROGRESS);
PARSER.declareLong(ForecastRequestStats::setProcessingTime, PROCESSING_TIME_MS);
PARSER.declareField(ForecastRequestStats::setStatus, p -> ForecastRequestStatus.fromString(p.text()), STATUS, ValueType.STRING);
@ -89,9 +92,10 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
private final String jobId;
private final long forecastId;
private long recordCount;
private String message;
private List<String> messages;
private Instant dateStarted = Instant.EPOCH;
private Instant dateEnded = Instant.EPOCH;
private Instant dateExpires = Instant.EPOCH;
private double progress;
private long processingTime;
private long memoryUsage;
@ -106,9 +110,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
jobId = in.readString();
forecastId = in.readLong();
recordCount = in.readLong();
message = in.readOptionalString();
if (in.readBoolean()) {
messages = in.readList(StreamInput::readString);
} else {
messages = null;
}
dateStarted = Instant.ofEpochMilli(in.readVLong());
dateEnded = Instant.ofEpochMilli(in.readVLong());
dateExpires = Instant.ofEpochMilli(in.readVLong());
progress = in.readDouble();
processingTime = in.readLong();
setMemoryUsage(in.readLong());
@ -120,9 +129,15 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
out.writeString(jobId);
out.writeLong(forecastId);
out.writeLong(recordCount);
out.writeOptionalString(message);
if (messages != null) {
out.writeBoolean(true);
out.writeStringList(messages);
} else {
out.writeBoolean(false);
}
out.writeVLong(dateStarted.toEpochMilli());
out.writeVLong(dateEnded.toEpochMilli());
out.writeVLong(dateExpires.toEpochMilli());
out.writeDouble(progress);
out.writeLong(processingTime);
out.writeLong(getMemoryUsage());
@ -136,15 +151,18 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.field(FORECAST_ID.getPreferredName(), forecastId);
builder.field(PROCESSED_RECORD_COUNT.getPreferredName(), recordCount);
if (message != null) {
builder.field(MESSAGE.getPreferredName(), message);
if (messages != null) {
builder.field(MESSAGES.getPreferredName(), messages);
}
if (dateStarted.equals(Instant.EPOCH) == false) {
builder.field(START_TIME.getPreferredName(), dateStarted.toEpochMilli());
builder.field(Result.TIMESTAMP.getPreferredName(), dateStarted.toEpochMilli());
}
if (dateEnded.equals(Instant.EPOCH) == false) {
builder.field(END_TIME.getPreferredName(), dateEnded.toEpochMilli());
}
if (dateExpires.equals(Instant.EPOCH) == false) {
builder.field(EXPIRY_TIME.getPreferredName(), dateExpires.toEpochMilli());
}
builder.field(PROGRESS.getPreferredName(), progress);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTime);
builder.field(MEMORY_USAGE.getPreferredName(), getMemoryUsage());
@ -175,19 +193,19 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
return recordCount;
}
public String getMessage() {
return message;
public List<String> getMessages() {
return messages;
}
public void setMessage(String message) {
this.message = message;
public void setMessages(List<String> messages) {
this.messages = messages;
}
public Instant getDateStarted() {
return dateStarted;
}
public void setStartTimeStamp(Instant dateStarted) {
public void setTimeStamp(Instant dateStarted) {
this.dateStarted = dateStarted;
}
@ -199,6 +217,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
this.dateEnded = dateEnded;
}
public void setExpiryTimeStamp(Instant dateExpires) {
this.dateExpires = dateExpires;
}
public Instant getDateExpired() {
return dateExpires;
}
/**
* Progress information of the ForecastRequest in the range 0 to 1,
* while 1 means finished
@ -250,9 +276,10 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
return Objects.equals(this.jobId, that.jobId) &&
this.forecastId == that.forecastId &&
this.recordCount == that.recordCount &&
Objects.equals(this.message, that.message) &&
Objects.equals(this.messages, that.messages) &&
Objects.equals(this.dateStarted, that.dateStarted) &&
Objects.equals(this.dateEnded, that.dateEnded) &&
Objects.equals(this.dateExpires, that.dateExpires) &&
this.progress == that.progress &&
this.processingTime == that.processingTime &&
this.memoryUsage == that.memoryUsage &&
@ -261,7 +288,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, forecastId, recordCount, message, dateStarted, dateEnded, progress,
processingTime, memoryUsage, status);
return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, dateExpires,
progress, processingTime, memoryUsage, status);
}
}

View File

@ -131,9 +131,10 @@ public final class ReservedFieldNames {
Forecast.FORECAST_PREDICTION.getPreferredName(),
Forecast.FORECAST_ID.getPreferredName(),
ForecastRequestStats.START_TIME.getPreferredName(),
//re-use: ForecastRequestStats.TIMESTAMP
ForecastRequestStats.END_TIME.getPreferredName(),
ForecastRequestStats.MESSAGE.getPreferredName(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName(),
ForecastRequestStats.MESSAGES.getPreferredName(),
ForecastRequestStats.PROGRESS.getPreferredName(),
ForecastRequestStats.STATUS.getPreferredName(),

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
@ -13,6 +14,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
@ -29,7 +31,7 @@ import java.util.concurrent.TimeUnit;
* blocking the thread it was called at for too long. It does so by
* chaining the steps together.
*/
abstract class AbstractExpiredJobDataRemover {
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
private final ClusterService clusterService;
@ -37,23 +39,24 @@ abstract class AbstractExpiredJobDataRemover {
this.clusterService = Objects.requireNonNull(clusterService);
}
public void trigger(Runnable onFinish) {
removeData(newJobIterator(), onFinish);
@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
}
private void removeData(Iterator<Job> jobIterator, Runnable onFinish) {
private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> listener) {
if (jobIterator.hasNext() == false) {
onFinish.run();
listener.onResponse(true);
return;
}
Job job = jobIterator.next();
Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, () -> removeData(jobIterator, onFinish));
removeData(jobIterator, listener);
return;
}
long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator, onFinish));
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
}
private Iterator<Job> newJobIterator() {
@ -79,33 +82,13 @@ abstract class AbstractExpiredJobDataRemover {
/**
* Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code onFinish} when they are done in order to continue to the next job.
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish);
protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));
}
private static class VolatileCursorIterator<T> implements Iterator<T> {
private final List<T> items;
private volatile int cursor;
private VolatileCursorIterator(List<T> items) {
this.items = items;
this.cursor = 0;
}
@Override
public boolean hasNext() {
return cursor < items.size();
}
@Override
public T next() {
return items.get(cursor++);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
@ -57,10 +58,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
if (job.getModelSnapshotId() == null) {
// No snapshot to remove
onFinish.run();
listener.onResponse(true);
return;
}
LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
@ -86,7 +87,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
}
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), onFinish);
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener);
} catch (Exception e) {
onFailure(e);
}
@ -94,15 +95,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + job.getId() + "] Search for expired snapshots failed", e);
onFinish.run();
listener.onFailure(new ElasticsearchException("[" + job.getId() + "] Search for expired snapshots failed", e));
}
});
}
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, Runnable onFinish) {
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
if (modelSnapshotIterator.hasNext() == false) {
onFinish.run();
listener.onResponse(true);
return;
}
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
@ -112,7 +112,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
public void onResponse(DeleteModelSnapshotAction.Response response) {
try {
deleteModelSnapshots(modelSnapshotIterator, onFinish);
deleteModelSnapshots(modelSnapshotIterator, listener);
} catch (Exception e) {
onFailure(e);
}
@ -120,9 +120,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e);
deleteModelSnapshots(modelSnapshotIterator, onFinish);
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e));
}
});
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
@ -55,7 +56,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}
@Override
protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) {
protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);
@ -66,7 +67,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
if (bulkByScrollResponse.getDeleted() > 0) {
auditResultsWereDeleted(job.getId(), cutoffEpochMs);
}
onFinish.run();
listener.onResponse(true);
} catch (Exception e) {
onFailure(e);
}
@ -74,8 +75,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
@Override
public void onFailure(Exception e) {
LOGGER.error("Failed to remove expired results for job [" + job.getId() + "]", e);
onFinish.run();
listener.onFailure(new ElasticsearchException("Failed to remove expired results for job [" + job.getId() + "]", e));
}
});
}

View File

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;
import org.elasticsearch.action.ActionListener;
public interface MlDataRemover {
void remove(ActionListener<Boolean> listener);
}

View File

@ -46,6 +46,9 @@ public class RestForecastJobAction extends BaseRestHandler {
if (restRequest.hasParam(ForecastJobAction.Request.DURATION.getPreferredName())) {
request.setDuration(restRequest.param(ForecastJobAction.Request.DURATION.getPreferredName()));
}
if (restRequest.hasParam(ForecastJobAction.Request.EXPIRES_IN.getPreferredName())) {
request.setExpiresIn(restRequest.param(ForecastJobAction.Request.EXPIRES_IN.getPreferredName()));
}
}
return channel -> client.execute(ForecastJobAction.INSTANCE, request, new RestToXContentListener<>(channel));

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import java.util.Iterator;
import java.util.List;
/**
* An iterator whose cursor is volatile. The intended usage
* is to allow safe iteration which is done serially but
* from potentially different threads. In particular, this
* allows iterating over a collection via callbacks, where
* each call deals with the next item and only calls the next
* callback once it's finished.
*/
public class VolatileCursorIterator<T> implements Iterator<T> {
private final List<T> items;
private volatile int cursor;
public VolatileCursorIterator(List<T> items) {
this.items = items;
this.cursor = 0;
}
@Override
public boolean hasNext() {
return cursor < items.size();
}
@Override
public T next() {
return items.get(cursor++);
}
}

View File

@ -139,7 +139,8 @@ public class SecurityIndexSearcherWrapper extends IndexSearcherWrapper {
filter.add(roleQuery, SHOULD);
if (queryShardContext.getMapperService().hasNested()) {
// If access is allowed on root doc then also access is allowed on all nested docs of that root document:
BitSetProducer rootDocs = queryShardContext.bitsetFilter(Queries.newNonNestedFilter());
BitSetProducer rootDocs = queryShardContext.bitsetFilter(
Queries.newNonNestedFilter(queryShardContext.indexVersionCreated()));
ToChildBlockJoinQuery includeNestedDocs = new ToChildBlockJoinQuery(roleQuery, rootDocs);
filter.add(includeNestedDocs, SHOULD);
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -46,7 +47,7 @@ public class MlDailyManagementServiceTests extends ESTestCase {
latch.await(1, TimeUnit.SECONDS);
}
verify(client, org.mockito.Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any());
verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
}
private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {

View File

@ -12,6 +12,8 @@ import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastReque
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class ForecastRequestStatsTests extends AbstractSerializingTestCase<ForecastRequestStats> {
@ -32,14 +34,22 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase<Forec
forecastRequestStats.setRecordCount(randomLong());
}
if (randomBoolean()) {
forecastRequestStats.setMessage(randomAlphaOfLengthBetween(1, 20));
int size = scaledRandomIntBetween(1, 20);
List<String> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
list.add(randomAlphaOfLength(40));
}
forecastRequestStats.setMessages(list);
}
if (randomBoolean()) {
forecastRequestStats.setStartTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
forecastRequestStats.setTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setEndTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setExpiryTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setProgress(randomDouble());
}
@ -65,5 +75,4 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase<Forec
protected ForecastRequestStats doParseInstance(XContentParser parser) throws IOException {
return ForecastRequestStats.PARSER.apply(parser, null);
}
}

View File

@ -53,7 +53,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
private List<SearchRequest> capturedSearchRequests;
private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
private List<SearchResponse> searchResponsesPerCall;
private Runnable onFinish;
private ActionListener<Boolean> listener;
@Before
public void setUpTests() {
@ -64,33 +64,33 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class);
onFinish = mock(Runnable.class);
listener = mock(ActionListener.class);
}
public void testTrigger_GivenJobsWithoutRetentionPolicy() {
public void testRemove_GivenJobsWithoutRetentionPolicy() {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build()
));
createExpiredModelSnapshotsRemover().trigger(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
public void testTrigger_GivenJobWithoutActiveSnapshot() {
public void testRemove_GivenJobWithoutActiveSnapshot() {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
createExpiredModelSnapshotsRemover().trigger(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
public void testTrigger_GivenJobsWithMixedRetentionPolicies() throws IOException {
public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().trigger(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
assertThat(capturedSearchRequests.size(), equalTo(2));
SearchRequest searchRequest = capturedSearchRequests.get(0);
@ -123,10 +123,10 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
verify(onFinish).run();
verify(listener).onResponse(true);
}
public void testTrigger_GivenClientSearchRequestsFail() throws IOException {
public void testRemove_GivenClientSearchRequestsFail() throws IOException {
givenClientSearchRequestsFail();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -140,20 +140,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().trigger(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
assertThat(capturedSearchRequests.size(), equalTo(2));
assertThat(capturedSearchRequests.size(), equalTo(1));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
verify(onFinish).run();
verify(listener).onFailure(any());
}
public void testTrigger_GivenClientDeleteSnapshotRequestsFail() throws IOException {
public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException {
givenClientDeleteModelSnapshotRequestsFail();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -167,26 +165,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots));
createExpiredModelSnapshotsRemover().trigger(onFinish);
createExpiredModelSnapshotsRemover().remove(listener);
assertThat(capturedSearchRequests.size(), equalTo(2));
assertThat(capturedSearchRequests.size(), equalTo(1));
SearchRequest searchRequest = capturedSearchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
searchRequest = capturedSearchRequests.get(1);
assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
verify(onFinish).run();
verify(listener).onFailure(any());
}
private void givenJobs(List<Job> jobs) {

View File

@ -46,7 +46,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
private ClusterService clusterService;
private ClusterState clusterState;
private List<DeleteByQueryRequest> capturedDeleteByQueryRequests;
private Runnable onFinish;
private ActionListener<Boolean> listener;
@Before
public void setUpTests() {
@ -55,43 +55,33 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
client = mock(Client.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]);
ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(null);
return null;
}
}).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any());
onFinish = mock(Runnable.class);
listener = mock(ActionListener.class);
}
public void testTrigger_GivenNoJobs() {
public void testRemove_GivenNoJobs() {
givenClientRequestsSucceed();
givenJobs(Collections.emptyList());
createExpiredResultsRemover().trigger(onFinish);
createExpiredResultsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
public void testTrigger_GivenJobsWithoutRetentionPolicy() {
public void testRemove_GivenJobsWithoutRetentionPolicy() {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("foo").build(),
JobTests.buildJobBuilder("bar").build()
));
createExpiredResultsRemover().trigger(onFinish);
createExpiredResultsRemover().remove(listener);
verify(onFinish).run();
verify(listener).onResponse(true);
Mockito.verifyNoMoreInteractions(client);
}
public void testTrigger_GivenJobsWithAndWithoutRetentionPolicy() throws IOException {
public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception {
givenClientRequestsSucceed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -99,17 +89,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().trigger(onFinish);
createExpiredResultsRemover().remove(listener);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
verify(onFinish).run();
verify(listener).onResponse(true);
}
public void testTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException {
public void testRemove_GivenClientRequestsFailed() throws IOException {
givenClientRequestsFailed();
givenJobs(Arrays.asList(
JobTests.buildJobBuilder("none").build(),
@ -117,14 +107,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
));
createExpiredResultsRemover().trigger(onFinish);
createExpiredResultsRemover().remove(listener);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
assertThat(capturedDeleteByQueryRequests.size(), equalTo(1));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")}));
dbqRequest = capturedDeleteByQueryRequests.get(1);
assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")}));
verify(onFinish).run();
verify(listener).onFailure(any());
}
private void givenClientRequestsSucceed() {
@ -143,7 +131,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
ActionListener<BulkByScrollResponse> listener =
(ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
if (shouldSucceed) {
listener.onResponse(null);
BulkByScrollResponse bulkByScrollResponse = mock(BulkByScrollResponse.class);
when(bulkByScrollResponse.getDeleted()).thenReturn(42L);
listener.onResponse(bulkByScrollResponse);
} else {
listener.onFailure(new RuntimeException("failed"));
}

View File

@ -41,6 +41,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -60,6 +63,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {
"time_field":"time"
}

View File

@ -12,6 +12,9 @@
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -32,6 +35,9 @@
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -339,6 +345,9 @@
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -486,6 +495,9 @@
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent"
}
@ -532,6 +544,9 @@
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"mlcategory"}],
"categorization_field_name": "message"
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent"
}
@ -637,6 +652,9 @@
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"time_field":"time",
"time_format":"epoch"

View File

@ -10,6 +10,9 @@ setup:
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {}
}
@ -24,6 +27,9 @@ setup:
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {}
}
@ -38,6 +44,9 @@ setup:
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {}
}
@ -52,6 +61,9 @@ setup:
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {}
}
@ -65,6 +77,9 @@ setup:
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {}
}

View File

@ -404,6 +404,9 @@
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"time_field":"time"
}
@ -492,6 +495,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent"
}
@ -523,6 +529,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent"
}
@ -538,6 +547,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent"
}
@ -569,6 +581,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {}
}
@ -580,6 +595,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {}
}
@ -591,6 +609,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {}
}
@ -654,6 +675,9 @@
"bucket_span":"1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -713,6 +737,9 @@
"analysis_config" : {
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "20mb"
},
"data_description" : {
"format":"xcontent"
}

View File

@ -12,6 +12,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -38,6 +41,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {
"format" : "xcontent",
"time_field":"time",
@ -201,6 +207,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "10mb"
},
"data_description" : {
}
}

View File

@ -29,6 +29,9 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}],
"influencers": ["airport"]
},
"analysis_limits" : {
"model_memory_limit": "30mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
@ -203,6 +206,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"count","by_field_name":"airline2"}]
},
"analysis_limits" : {
"model_memory_limit": "30mb"
},
"data_description" : {
"time_field":"time"
}
@ -274,6 +280,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "30mb"
},
"data_description" : {
"time_field":"time"
}
@ -288,6 +297,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "30mb"
},
"data_description" : {
"time_field":"time"
}
@ -302,6 +314,9 @@ setup:
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"analysis_limits" : {
"model_memory_limit": "30mb"
},
"data_description" : {
"time_field":"time"
}