[ML] Remove model snapshot restore_priority and... (elastic/x-pack-elasticsearch#598)

...correctly update the job's model_snapshot_id whenever
a new model snapshot is persisted. Load the snapshot that
corresponds to the job's model_snapshot_id when a job is
opened.

This PR makes a series of changes that completes the change
of having job's active model snapshot persisted on the job.

Original commit: elastic/x-pack-elasticsearch@c2e23fa1ee
This commit is contained in:
Dimitris Athanasiou 2017-02-20 17:45:28 +00:00 committed by GitHub
parent b58b92dc79
commit 6c97ac7691
24 changed files with 226 additions and 183 deletions

View File

@ -305,21 +305,16 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
Consumer<Exception> errorHandler) {
logger.info("Reverting to snapshot '" + request.getSnapshotId() + "'");
provider.modelSnapshots(request.getJobId(), 0, 1, null, null,
ModelSnapshot.TIMESTAMP.getPreferredName(), true, request.getSnapshotId(), request.getDescription(),
page -> {
List<ModelSnapshot> revertCandidates = page.results();
if (revertCandidates == null || revertCandidates.isEmpty()) {
throw new ResourceNotFoundException(
Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId()));
}
ModelSnapshot modelSnapshot = revertCandidates.get(0);
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
handler.accept(modelSnapshot);
}, errorHandler);
provider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> {
if (modelSnapshot == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(),
request.getJobId()));
}
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them as they are not necessary for the revert op
modelSnapshot.setQuantiles(null);
handler.accept(modelSnapshot);
}, errorHandler);
}
private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldDataListener(

View File

@ -194,7 +194,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
});
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, client, wrappedListener);
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener);
}
private void updateProcess(Request request, PutJobAction.Response updateConfigResponse,

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -197,9 +198,39 @@ public class JobManager extends AbstractComponent {
});
}
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, Client client,
ActionListener<PutJobAction.Response> actionListener) {
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) {
Job job = getJobOrThrowIfUnknown(jobId);
validate(jobUpdate, job, isValid -> {
if (isValid) {
internalJobUpdate(jobId, jobUpdate, request, actionListener);
} else {
actionListener.onFailure(new IllegalArgumentException("Invalid update to job [" + jobId + "]"));
}
}, actionListener::onFailure);
}
private void validate(JobUpdate jobUpdate, Job job, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
if (jobUpdate.getModelSnapshotId() != null) {
jobProvider.getModelSnapshot(job.getId(), jobUpdate.getModelSnapshotId(), newModelSnapshot -> {
if (newModelSnapshot == null) {
throw new ResourceNotFoundException(
Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, jobUpdate.getModelSnapshotId(), job.getId()));
}
jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
if (oldModelSnapshot != null && newModelSnapshot.getTimestamp().before(oldModelSnapshot.getTimestamp())) {
throw new IllegalArgumentException("Job [" + job.getId() + "] has a more recent model snapshot ["
+ oldModelSnapshot.getSnapshotId() + "]");
}
handler.accept(true);
}, errorHandler);
}, errorHandler);
} else {
handler.accept(true);
}
}
private void internalJobUpdate(String jobId, JobUpdate jobUpdate, AckedRequest request,
ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + jobId,
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private Job updatedJob;

View File

@ -6,7 +6,7 @@
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -36,6 +36,7 @@ public class JobUpdate implements Writeable, ToXContent {
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
}
private final String description;
@ -48,12 +49,14 @@ public class JobUpdate implements Writeable, ToXContent {
private final Long resultsRetentionDays;
private final List<String> categorizationFilters;
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private JobUpdate(@Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
@Nullable ModelDebugConfig modelDebugConfig, @Nullable AnalysisLimits analysisLimits,
@Nullable Long backgroundPersistInterval, @Nullable Long renormalizationWindowDays,
@Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays,
@Nullable List<String> categorisationFilters, @Nullable Map<String, Object> customSettings) {
@Nullable List<String> categorisationFilters, @Nullable Map<String, Object> customSettings,
@Nullable String modelSnapshotId) {
this.description = description;
this.detectorUpdates = detectorUpdates;
this.modelDebugConfig = modelDebugConfig;
@ -64,6 +67,7 @@ public class JobUpdate implements Writeable, ToXContent {
this.resultsRetentionDays = resultsRetentionDays;
this.categorizationFilters = categorisationFilters;
this.customSettings = customSettings;
this.modelSnapshotId = modelSnapshotId;
}
public JobUpdate(StreamInput in) throws IOException {
@ -85,6 +89,7 @@ public class JobUpdate implements Writeable, ToXContent {
categorizationFilters = null;
}
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
@ -104,6 +109,7 @@ public class JobUpdate implements Writeable, ToXContent {
out.writeStringList(categorizationFilters);
}
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
}
public String getDescription() {
@ -146,6 +152,10 @@ public class JobUpdate implements Writeable, ToXContent {
return customSettings;
}
public String getModelSnapshotId() {
return modelSnapshotId;
}
public boolean isAutodetectProcessUpdate() {
return modelDebugConfig != null || detectorUpdates != null;
}
@ -183,6 +193,9 @@ public class JobUpdate implements Writeable, ToXContent {
if (customSettings != null) {
builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings);
}
if (modelSnapshotId != null) {
builder.field(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
}
builder.endObject();
return builder;
}
@ -245,6 +258,9 @@ public class JobUpdate implements Writeable, ToXContent {
if (customSettings != null) {
builder.setCustomSettings(customSettings);
}
if (modelSnapshotId != null) {
builder.setModelSnapshotId(modelSnapshotId);
}
return builder.build();
}
@ -270,13 +286,15 @@ public class JobUpdate implements Writeable, ToXContent {
&& Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays)
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings);
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId);
}
@Override
public int hashCode() {
return Objects.hash(description, detectorUpdates, modelDebugConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings);
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId);
}
public static class DetectorUpdate implements Writeable, ToXContent {
@ -383,6 +401,7 @@ public class JobUpdate implements Writeable, ToXContent {
private Long resultsRetentionDays;
private List<String> categorizationFilters;
private Map<String, Object> customSettings;
private String modelSnapshotId;
public Builder() {}
@ -436,9 +455,15 @@ public class JobUpdate implements Writeable, ToXContent {
return this;
}
public Builder setModelSnapshotId(String modelSnapshotId) {
this.modelSnapshotId = modelSnapshotId;
return this;
}
public JobUpdate build() {
return new JobUpdate(description, detectorUpdates, modelDebugConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings);
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId);
}
}
}

View File

@ -574,9 +574,6 @@ public class ElasticsearchMappings {
.startObject(ModelSnapshot.DESCRIPTION.getPreferredName())
.field(TYPE, TEXT)
.endObject()
.startObject(ModelSnapshot.RESTORE_PRIORITY.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(ModelSnapshot.SNAPSHOT_ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()

View File

@ -106,20 +106,21 @@ public class JobDataDeleter {
* @param modelSnapshot the model snapshot to delete
*/
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
String snapshotId = modelSnapshot.getSnapshotId();
String snapshotDocId = ModelSnapshot.documentId(modelSnapshot);
int docCount = modelSnapshot.getSnapshotDocCount();
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
// Deduce the document IDs of the state documents from the information
// in the snapshot document - we cannot query the state itself as it's
// too big and has no mappings
for (int i = 0; i < docCount; ++i) {
String stateId = snapshotId + '_' + i;
// too big and has no mappings.
// Note: state docs are 1-based
for (int i = 1; i <= docCount; ++i) {
String stateId = snapshotDocId + '_' + i;
bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ModelState.TYPE.getPreferredName(), stateId));
++deletedModelStateCount;
}
bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()),
ModelSnapshot.TYPE.getPreferredName(), snapshotId));
ModelSnapshot.TYPE.getPreferredName(), snapshotDocId));
++deletedModelSnapshotCount;
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
@ -29,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -60,6 +62,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
@ -971,7 +974,20 @@ public class JobProvider {
}
/**
* Get model snapshots for the job ordered by descending restore priority.
* Get a job's model snapshot by its id
*/
public void getModelSnapshot(String jobId, @Nullable String modelSnapshotId, Consumer<ModelSnapshot> handler,
Consumer<Exception> errorHandler) {
if (modelSnapshotId == null) {
handler.accept(null);
return;
}
get(jobId, AnomalyDetectorsIndex.jobResultsIndexName(jobId), ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(jobId, modelSnapshotId), handler, errorHandler, ModelSnapshot.PARSER, () -> null);
}
/**
* Get model snapshots for the job ordered by descending timestamp (newest first).
*
* @param jobId the job id
* @param from number of snapshots to from
@ -979,7 +995,7 @@ public class JobProvider {
*/
public void modelSnapshots(String jobId, int from, int size, Consumer<QueryPage<ModelSnapshot>> handler,
Consumer<Exception> errorHandler) {
modelSnapshots(jobId, from, size, null, false, QueryBuilders.matchAllQuery(), handler, errorHandler);
modelSnapshots(jobId, from, size, null, true, QueryBuilders.matchAllQuery(), handler, errorHandler);
}
/**
@ -1036,7 +1052,7 @@ public class JobProvider {
Consumer<QueryPage<ModelSnapshot>> handler,
Consumer<Exception> errorHandler) {
if (Strings.isEmpty(sortField)) {
sortField = ModelSnapshot.RESTORE_PRIORITY.getPreferredName();
sortField = ModelSnapshot.TIMESTAMP.getPreferredName();
}
FieldSortBuilder sb = new FieldSortBuilder(sortField)

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
@ -239,13 +240,13 @@ public class JobResultsPersister extends AbstractComponent {
*/
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
modelSnapshot.documentId());
ModelSnapshot.documentId(modelSnapshot));
persistable.persist(AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId()));
}
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
String index = AnomalyDetectorsIndex.jobResultsIndexName(modelSnapshot.getJobId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), modelSnapshot.documentId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot));
try {
indexRequest.source(toXContentBuilder(modelSnapshot));
} catch (IOException e) {

View File

@ -211,8 +211,7 @@ public class AutodetectProcessManager extends AbstractComponent {
void gatherRequiredInformation(String jobId, MultiConsumer handler, Consumer<Exception> errorHandler) {
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
jobProvider.dataCounts(jobId, dataCounts -> {
jobProvider.modelSnapshots(jobId, 0, 1, page -> {
ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0);
jobProvider.getModelSnapshot(jobId, job.getModelSnapshotId(), modelSnapshot -> {
jobProvider.getQuantiles(jobId, quantiles -> {
Set<String> ids = job.getAnalysisConfig().extractReferencedFilters();
jobProvider.getFilters(filterDocument -> handler.accept(dataCounts, modelSnapshot, quantiles, filterDocument),
@ -248,7 +247,7 @@ public class AutodetectProcessManager extends AbstractComponent {
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, modelSnapshot, quantiles, filters,
ignoreDowntime, executorService);
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(jobId, renormalizer, jobResultsPersister);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(client, jobId, renormalizer, jobResultsPersister);
try {
executorService.submit(() -> processor.process(process, usePerPartitionNormalization));
} catch (EsRejectedExecutionException e) {

View File

@ -7,7 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
@ -48,6 +53,7 @@ public class AutoDetectResultProcessor {
private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);
private final Client client;
private final String jobId;
private final Renormalizer renormalizer;
private final JobResultsPersister persister;
@ -57,11 +63,13 @@ public class AutoDetectResultProcessor {
private volatile ModelSizeStats latestModelSizeStats;
public AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister) {
this(jobId, renormalizer, persister, new FlushListener());
public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister) {
this(client, jobId, renormalizer, persister, new FlushListener());
}
AutoDetectResultProcessor(String jobId, Renormalizer renormalizer, JobResultsPersister persister, FlushListener flushListener) {
AutoDetectResultProcessor(Client client,String jobId, Renormalizer renormalizer, JobResultsPersister persister,
FlushListener flushListener) {
this.client = client;
this.jobId = jobId;
this.renormalizer = renormalizer;
this.persister = persister;
@ -144,6 +152,7 @@ public class AutoDetectResultProcessor {
ModelSnapshot modelSnapshot = result.getModelSnapshot();
if (modelSnapshot != null) {
persister.persistModelSnapshot(modelSnapshot);
updateModelSnapshotIdOnJob(modelSnapshot);
}
Quantiles quantiles = result.getQuantiles();
if (quantiles != null) {
@ -168,6 +177,22 @@ public class AutoDetectResultProcessor {
}
}
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder().setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update);
client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
}
@Override
public void onFailure(Exception e) {
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e);
}
});
}
public void awaitCompletion() {
try {
completionLatch.await();

View File

@ -31,7 +31,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
*/
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField RESTORE_PRIORITY = new ParseField("restore_priority");
public static final ParseField SNAPSHOT_ID = new ParseField("snapshot_id");
public static final ParseField SNAPSHOT_DOC_COUNT = new ParseField("snapshot_doc_count");
public static final ParseField LATEST_RECORD_TIME = new ParseField("latest_record_time_stamp");
@ -59,7 +58,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ValueType.VALUE);
PARSER.declareString(ModelSnapshot::setDescription, DESCRIPTION);
PARSER.declareLong(ModelSnapshot::setRestorePriority, RESTORE_PRIORITY);
PARSER.declareString(ModelSnapshot::setSnapshotId, SNAPSHOT_ID);
PARSER.declareInt(ModelSnapshot::setSnapshotDocCount, SNAPSHOT_DOC_COUNT);
PARSER.declareObject(ModelSnapshot::setModelSizeStats, ModelSizeStats.PARSER, ModelSizeStats.RESULT_TYPE_FIELD);
@ -87,7 +85,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
private final String jobId;
private Date timestamp;
private String description;
private long restorePriority;
private String snapshotId;
private int snapshotDocCount;
private ModelSizeStats modelSizeStats;
@ -105,7 +102,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
timestamp = new Date(in.readLong());
}
description = in.readOptionalString();
restorePriority = in.readLong();
snapshotId = in.readOptionalString();
snapshotDocCount = in.readInt();
if (in.readBoolean()) {
@ -131,7 +127,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
out.writeLong(timestamp.getTime());
}
out.writeOptionalString(description);
out.writeLong(restorePriority);
out.writeOptionalString(snapshotId);
out.writeInt(snapshotDocCount);
boolean hasModelSizeStats = modelSizeStats != null;
@ -166,7 +161,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
}
builder.field(RESTORE_PRIORITY.getPreferredName(), restorePriority);
if (snapshotId != null) {
builder.field(SNAPSHOT_ID.getPreferredName(), snapshotId);
}
@ -191,10 +185,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
return jobId;
}
public String documentId() {
return jobId + "-" + snapshotId;
}
public Date getTimestamp() {
return timestamp;
}
@ -211,14 +201,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
this.description = description;
}
public long getRestorePriority() {
return restorePriority;
}
public void setRestorePriority(long restorePriority) {
this.restorePriority = restorePriority;
}
public String getSnapshotId() {
return snapshotId;
}
@ -269,8 +251,8 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, description, restorePriority, snapshotId, quantiles,
snapshotDocCount, modelSizeStats, latestRecordTimeStamp, latestResultTimeStamp);
return Objects.hash(jobId, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats, latestRecordTimeStamp,
latestResultTimeStamp);
}
/**
@ -291,7 +273,6 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.timestamp, that.timestamp)
&& Objects.equals(this.description, that.description)
&& this.restorePriority == that.restorePriority
&& Objects.equals(this.snapshotId, that.snapshotId)
&& this.snapshotDocCount == that.snapshotDocCount
&& Objects.equals(this.modelSizeStats, that.modelSizeStats)
@ -299,4 +280,12 @@ public class ModelSnapshot extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.latestRecordTimeStamp, that.latestRecordTimeStamp)
&& Objects.equals(this.latestResultTimeStamp, that.latestResultTimeStamp);
}
public static String documentId(ModelSnapshot snapshot) {
return documentId(snapshot.getJobId(), snapshot.getSnapshotId());
}
public static String documentId(String jobId, String snapshotId) {
return jobId + "-" + snapshotId;
}
}

View File

@ -137,7 +137,6 @@ public final class ReservedFieldNames {
ModelSizeStats.LOG_TIME_FIELD.getPreferredName(),
ModelSnapshot.DESCRIPTION.getPreferredName(),
ModelSnapshot.RESTORE_PRIORITY.getPreferredName(),
ModelSnapshot.SNAPSHOT_ID.getPreferredName(),
ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName(),
ModelSnapshot.LATEST_RECORD_TIME.getPreferredName(),

View File

@ -158,7 +158,7 @@ rest.invalid.from.size.sum = The sum of parameters ''from'' and ''size'' cannot
rest.start.after.end = Invalid time range: end time ''{0}'' is earlier than start time ''{1}''.
rest.reset.bucket.no.latency = Bucket resetting is not supported when no latency is configured.
rest.job.not.closed.revert = Can only revert to a model snapshot when the job is closed.
rest.no.such.model.snapshot = No matching model snapshot exists for job ''{0}''
rest.no.such.model.snapshot = No model snapshot with id [{0}] exists for job [{1}]
rest.description.already.used = Model snapshot description ''{0}'' has already been used for job ''{1}''
rest.cannot.delete.highest.priority = Model snapshot ''{0}'' is the active snapshot for job ''{1}'', so cannot be deleted

View File

@ -76,11 +76,14 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private Renormalizer renormalizer;
private JobResultsPersister jobResultsPersister;
private JobProvider jobProvider;
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutoDetectResultProcessor resultProcessor;
@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
return Settings.builder().put(super.nodeSettings())
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
.build();
}
@Override
@ -95,11 +98,17 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
jobProvider = new JobProvider(client(), 1, builder.build());
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister) {
@Override
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
}
};
}
public void testProcessResults() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
ResultsBuilder builder = new ResultsBuilder();
Bucket bucket = createBucket(false);
@ -151,6 +160,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<ModelSnapshot> persistedModelSnapshot = getModelSnapshots();
assertEquals(1, persistedModelSnapshot.count());
assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
assertEquals(Arrays.asList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests);
Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
@ -159,7 +169,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void testDeleteInterimResults() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
Bucket nonInterimBucket = createBucket(false);
Bucket interimBucket = createBucket(true);
@ -189,7 +198,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void testMultipleFlushesBetweenPersisting() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
Bucket finalBucket = createBucket(true);
List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
@ -220,7 +228,6 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void testEndOfStreamTriggersPersisting() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor = new AutoDetectResultProcessor(JOB_ID, renormalizer, jobResultsPersister);
Bucket bucket = createBucket(false);
List<AnomalyRecord> firstSetOfRecords = createRecords(false);
List<AnomalyRecord> secondSetOfRecords = createRecords(false);

View File

@ -70,6 +70,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
if (randomBoolean()) {
update.setCustomSettings(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
update.setModelSnapshotId(randomAsciiOfLength(10));
}
return update.build();
}
@ -111,6 +114,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong());
updateBuilder.setCategorizationFilters(categorizationFilters);
updateBuilder.setCustomSettings(customSettings);
updateBuilder.setModelSnapshotId(randomAsciiOfLength(10));
JobUpdate update = updateBuilder.build();
Job.Builder jobBuilder = new Job.Builder("foo");
@ -133,6 +137,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
assertEquals(update.getResultsRetentionDays(), updatedJob.getResultsRetentionDays());
assertEquals(update.getCategorizationFilters(), updatedJob.getAnalysisConfig().getCategorizationFilters());
assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings());
assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId());
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getDescription(),

View File

@ -35,9 +35,8 @@ public class JobDataDeleterTests extends ESTestCase {
public void testDeleteResultsFromTime() {
final long TOTAL_HIT_COUNT = 100L;
final int PER_SCROLL_SEARCH_HIT_COUNT = 20;
SearchResponse response = createSearchResponseWithHits(TOTAL_HIT_COUNT, PER_SCROLL_SEARCH_HIT_COUNT);
SearchResponse response = createSearchResponseWithHits(TOTAL_HIT_COUNT);
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
Client client = new MockClientBuilder("myCluster")
@ -94,18 +93,18 @@ public class JobDataDeleterTests extends ESTestCase {
.prepareDelete(eq(AnomalyDetectorsIndex.jobStateIndexName()), eq(ModelState.TYPE.getPreferredName()), anyString());
verify(client, times(1))
.prepareDelete(eq(AnomalyDetectorsIndex.jobResultsIndexName(jobId)), eq(ModelSnapshot.TYPE.getPreferredName()),
eq("snap-1"));
eq("foo-snap-1"));
}
private SearchResponse createSearchResponseWithHits(long totalHitCount, int hitsPerSearchResult) {
SearchHits hits = mockSearchHits(totalHitCount, hitsPerSearchResult);
private SearchResponse createSearchResponseWithHits(long totalHitCount) {
SearchHits hits = mockSearchHits(totalHitCount);
SearchResponse searchResponse = Mockito.mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(hits);
when(searchResponse.getScrollId()).thenReturn("scroll1");
return searchResponse;
}
private SearchHits mockSearchHits(long totalHitCount, int hitsPerSearchResult) {
private SearchHits mockSearchHits(long totalHitCount) {
List<SearchHit> hitList = new ArrayList<>();
for (int i=0; i<20; i++) {

View File

@ -90,7 +90,6 @@ import static org.mockito.Mockito.when;
public class JobProviderTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "foo";
private static final String STATE_INDEX_NAME = ".ml-state";
public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception {
GetResponse getResponse = createGetResponse(false, null);
@ -1089,7 +1088,6 @@ public class JobProviderTests extends ESTestCase {
Map<String, Object> recordMap1 = new HashMap<>();
recordMap1.put("job_id", "foo");
recordMap1.put("description", "snapshot1");
recordMap1.put("restore_priority", 1);
recordMap1.put("timestamp", now.getTime());
recordMap1.put("snapshot_doc_count", 5);
recordMap1.put("latest_record_time_stamp", now.getTime());
@ -1097,7 +1095,6 @@ public class JobProviderTests extends ESTestCase {
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("description", "snapshot2");
recordMap2.put("restore_priority", 999);
recordMap2.put("timestamp", now.getTime());
recordMap2.put("snapshot_doc_count", 6);
recordMap2.put("latest_record_time_stamp", now.getTime());
@ -1123,14 +1120,12 @@ public class JobProviderTests extends ESTestCase {
assertEquals(now, snapshots.get(0).getLatestRecordTimeStamp());
assertEquals(now, snapshots.get(0).getLatestResultTimeStamp());
assertEquals("snapshot1", snapshots.get(0).getDescription());
assertEquals(1L, snapshots.get(0).getRestorePriority());
assertEquals(5, snapshots.get(0).getSnapshotDocCount());
assertEquals(now, snapshots.get(1).getTimestamp());
assertEquals(now, snapshots.get(1).getLatestRecordTimeStamp());
assertEquals(now, snapshots.get(1).getLatestResultTimeStamp());
assertEquals("snapshot2", snapshots.get(1).getDescription());
assertEquals(999L, snapshots.get(1).getRestorePriority());
assertEquals(6, snapshots.get(1).getSnapshotDocCount());
}
@ -1143,7 +1138,6 @@ public class JobProviderTests extends ESTestCase {
Map<String, Object> recordMap1 = new HashMap<>();
recordMap1.put("job_id", "foo");
recordMap1.put("description", "snapshot1");
recordMap1.put("restore_priority", 1);
recordMap1.put("timestamp", now.getTime());
recordMap1.put("snapshot_doc_count", 5);
recordMap1.put("latest_record_time_stamp", now.getTime());
@ -1151,7 +1145,6 @@ public class JobProviderTests extends ESTestCase {
Map<String, Object> recordMap2 = new HashMap<>();
recordMap2.put("job_id", "foo");
recordMap2.put("description", "snapshot2");
recordMap2.put("restore_priority", 999);
recordMap2.put("timestamp", now.getTime());
recordMap2.put("snapshot_doc_count", 6);
recordMap2.put("latest_record_time_stamp", now.getTime());
@ -1178,14 +1171,12 @@ public class JobProviderTests extends ESTestCase {
assertEquals(now, snapshots.get(0).getLatestRecordTimeStamp());
assertEquals(now, snapshots.get(0).getLatestResultTimeStamp());
assertEquals("snapshot1", snapshots.get(0).getDescription());
assertEquals(1L, snapshots.get(0).getRestorePriority());
assertEquals(5, snapshots.get(0).getSnapshotDocCount());
assertEquals(now, snapshots.get(1).getTimestamp());
assertEquals(now, snapshots.get(1).getLatestRecordTimeStamp());
assertEquals(now, snapshots.get(1).getLatestResultTimeStamp());
assertEquals("snapshot2", snapshots.get(1).getDescription());
assertEquals(999L, snapshots.get(1).getRestorePriority());
assertEquals(6, snapshots.get(1).getSnapshotDocCount());
String queryString = qbHolder[0].toString();

View File

@ -61,7 +61,6 @@ import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@ -104,10 +103,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}).when(jobProvider).dataCounts(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<QueryPage<ModelSnapshot>> handler = (Consumer<QueryPage<ModelSnapshot>>) invocationOnMock.getArguments()[3];
handler.accept(new QueryPage<>(Collections.singletonList(modelSnapshot), 1, ModelSnapshot.RESULTS_FIELD));
Consumer<ModelSnapshot> handler = (Consumer<ModelSnapshot>) invocationOnMock.getArguments()[2];
handler.accept(modelSnapshot);
return null;
}).when(jobProvider).modelSnapshots(any(), anyInt(), anyInt(), any(), any());
}).when(jobProvider).getModelSnapshot(anyString(), anyString(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Quantiles> handler = (Consumer<Quantiles>) invocationOnMock.getArguments()[1];

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.client.Client;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
@ -19,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.junit.Before;
import org.mockito.InOrder;
import java.util.Arrays;
@ -27,6 +31,8 @@ import java.util.Iterator;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -39,6 +45,21 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
private static final String JOB_ID = "_id";
private Client client;
private Renormalizer renormalizer;
private JobResultsPersister persister;
private FlushListener flushListener;
private AutoDetectResultProcessor processorUnderTest;
@Before
public void setUpMocks() {
client = mock(Client.class);
renormalizer = mock(Renormalizer.class);
persister = mock(JobResultsPersister.class);
flushListener = mock(FlushListener.class);
processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, flushListener);
}
public void testProcess() {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
@ -47,28 +68,22 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister);
processor.process(process, randomBoolean());
processorUnderTest.process(process, randomBoolean());
verify(renormalizer, times(1)).waitUntilIdle();
assertEquals(0, processor.completionLatch.getCount());
assertEquals(0, processorUnderTest.completionLatch.getCount());
}
public void testProcessResult_bucket() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
@ -78,19 +93,16 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_bucket_deleteInterimRequired() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = true;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
@ -101,11 +113,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_records() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
context.deleteInterimRequired = false;
@ -114,7 +123,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123, 2);
List<AnomalyRecord> records = Arrays.asList(record1, record2);
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
@ -122,11 +131,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_records_isPerPartitionNormalization() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
context.deleteInterimRequired = false;
@ -137,7 +143,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
record2.setPartitionFieldValue("pValue");
List<AnomalyRecord> records = Arrays.asList(record1, record2);
when(result.getRecords()).thenReturn(records);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class));
verify(bulkBuilder, times(1)).persistRecords(records);
@ -146,11 +152,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_influencers() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -159,7 +162,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123, 1);
List<Influencer> influencers = Arrays.asList(influencer1, influencer2);
when(result.getInfluencers()).thenReturn(influencers);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistInfluencers(influencers);
verify(bulkBuilder, never()).executeRequest();
@ -167,18 +170,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_categoryDefinition() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, never()).executeRequest();
verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
@ -186,12 +186,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_flushAcknowledgement() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -199,7 +195,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(persister, times(1)).commitResultWrites(JOB_ID);
@ -209,11 +205,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -225,7 +217,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(bulkBuilder, times(1)).executeRequest();
@ -236,69 +228,62 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelDebugOutput() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelDebugOutput modelDebugOutput = mock(ModelDebugOutput.class);
when(result.getModelDebugOutput()).thenReturn(modelDebugOutput);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelDebugOutput(modelDebugOutput);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_modelSizeStats() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
verifyNoMoreInteractions(persister);
assertEquals(modelSizeStats, processor.modelSizeStats());
assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
}
public void testProcessResult_modelSnapshot() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
ModelSnapshot modelSnapshot = new ModelSnapshot(JOB_ID);
modelSnapshot.setSnapshotId("a_snapshot_id");
when(result.getModelSnapshot()).thenReturn(modelSnapshot);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelSnapshot(modelSnapshot);
UpdateJobAction.Request expectedJobUpdateRequest = new UpdateJobAction.Request(JOB_ID,
new JobUpdate.Builder().setModelSnapshotId("a_snapshot_id").build());
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
verifyNoMoreInteractions(persister);
}
public void testProcessResult_quantiles() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(JOB_ID, renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
processor.processResult(context, result);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(renormalizer, times(1)).renormalize(quantiles);

View File

@ -17,7 +17,6 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
private static final Date DEFAULT_TIMESTAMP = new Date();
private static final String DEFAULT_DESCRIPTION = "a snapshot";
private static final String DEFAULT_ID = "my_id";
private static final long DEFAULT_PRIORITY = 1234L;
private static final int DEFAULT_DOC_COUNT = 7;
private static final Date DEFAULT_LATEST_RESULT_TIMESTAMP = new Date(12345678901234L);
private static final Date DEFAULT_LATEST_RECORD_TIMESTAMP = new Date(12345678904321L);
@ -67,17 +66,6 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
assertFalse(modelSnapshot2.equals(modelSnapshot1));
}
public void testEquals_GivenDifferentRestorePriority() {
ModelSnapshot modelSnapshot1 = createFullyPopulated();
ModelSnapshot modelSnapshot2 = createFullyPopulated();
modelSnapshot2.setRestorePriority(modelSnapshot2.getRestorePriority() + 1);
assertFalse(modelSnapshot1.equals(modelSnapshot2));
assertFalse(modelSnapshot2.equals(modelSnapshot1));
}
public void testEquals_GivenDifferentId() {
ModelSnapshot modelSnapshot1 = createFullyPopulated();
ModelSnapshot modelSnapshot2 = createFullyPopulated();
@ -146,7 +134,6 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
modelSnapshot.setTimestamp(DEFAULT_TIMESTAMP);
modelSnapshot.setDescription(DEFAULT_DESCRIPTION);
modelSnapshot.setRestorePriority(DEFAULT_PRIORITY);
modelSnapshot.setSnapshotId(DEFAULT_ID);
modelSnapshot.setSnapshotDocCount(DEFAULT_DOC_COUNT);
ModelSizeStats.Builder modelSizeStatsBuilder = new ModelSizeStats.Builder("foo");
@ -163,7 +150,6 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
ModelSnapshot modelSnapshot = new ModelSnapshot(randomAsciiOfLengthBetween(1, 20));
modelSnapshot.setTimestamp(new Date(TimeUtils.dateStringToEpoch(randomTimeValue())));
modelSnapshot.setDescription(randomAsciiOfLengthBetween(1, 20));
modelSnapshot.setRestorePriority(randomLong());
modelSnapshot.setSnapshotId(randomAsciiOfLengthBetween(1, 20));
modelSnapshot.setSnapshotDocCount(randomInt());
ModelSizeStats.Builder stats = new ModelSizeStats.Builder(randomAsciiOfLengthBetween(1, 20));
@ -212,4 +198,17 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
protected ModelSnapshot parseInstance(XContentParser parser) {
return ModelSnapshot.PARSER.apply(parser, null);
}
public void testDocumentId() {
ModelSnapshot snapshot1 = new ModelSnapshot("foo");
snapshot1.setSnapshotId("1");
ModelSnapshot snapshot2 = new ModelSnapshot("foo");
snapshot2.setSnapshotId("2");
ModelSnapshot snapshot3 = new ModelSnapshot("bar");
snapshot3.setSnapshotId("1");
assertEquals("foo-1", ModelSnapshot.documentId(snapshot1));
assertEquals("foo-2", ModelSnapshot.documentId(snapshot2));
assertEquals("bar-1", ModelSnapshot.documentId(snapshot3));
}
}

View File

@ -34,7 +34,6 @@ setup:
{
"job_id": "foo",
"timestamp": "2016-06-02T00:00:00Z",
"restore_priority": "1",
"snapshot_id": "foo1",
"description": "first",
"latest_record_time_stamp": "2016-06-02T00:00:00Z",
@ -71,7 +70,6 @@ setup:
{
"job_id": "foo",
"timestamp": "2016-06-01T00:00:00Z",
"restore_priority": "2",
"snapshot_id": "foo2",
"description": "second",
"latest_record_time_stamp": "2016-06-01T00:00:00Z",

View File

@ -8,22 +8,20 @@ setup:
properties:
"timestamp":
type: date
"restore_priority":
type: integer
- do:
index:
index: .ml-anomalies-foo
type: model_snapshot
id: "foo-1"
body: { "job_id": "foo", "snapshot_id": "1", "timestamp": "2016-06-02T00:00:00Z", "restore_priority": "1" }
body: { "job_id": "foo", "snapshot_id": "1", "timestamp": "2016-06-02T00:00:00Z" }
- do:
index:
index: .ml-anomalies-foo
type: model_snapshot
id: "foo-2"
body: { "job_id": "foo", "snapshot_id": "2", "timestamp": "2016-06-01T00:00:00Z", "restore_priority": "2" }
body: { "job_id": "foo", "snapshot_id": "2", "timestamp": "2016-06-01T00:00:00Z" }
- do:
indices.refresh:
@ -36,7 +34,6 @@ setup:
job_id: "foo"
- match: { count: 2 }
- match: { model_snapshots.0.restore_priority: 2 }
- match: { model_snapshots.0.timestamp: 1464739200000 }
---
@ -47,7 +44,6 @@ setup:
snapshot_id: "_all"
- match: { count: 2 }
- match: { model_snapshots.0.restore_priority: 2 }
- match: { model_snapshots.0.timestamp: 1464739200000 }
---
@ -58,7 +54,6 @@ setup:
snapshot_id: "2"
- match: { count: 1 }
- match: { model_snapshots.0.restore_priority: 2 }
- match: { model_snapshots.0.timestamp: 1464739200000 }
---
@ -70,7 +65,6 @@ setup:
end: "2016-07-01T00:00:00Z"
- match: { count: 2 }
- match: { model_snapshots.0.restore_priority: 2 }
- match: { model_snapshots.0.timestamp: 1464739200000 }
---
@ -81,7 +75,6 @@ setup:
desc: false
- match: { count: 2 }
- match: { model_snapshots.0.restore_priority: 1 }
- match: { model_snapshots.0.timestamp: 1464825600000 }
---
@ -92,7 +85,6 @@ setup:
size: 1
- match: { count: 2 }
- match: { model_snapshots.0.restore_priority: 2 }
- match: { model_snapshots.0.timestamp: 1464739200000 }
- length: { model_snapshots: 1 }
@ -104,6 +96,5 @@ setup:
from: 1
- match: { count: 2 }
- match: { model_snapshots.0.restore_priority: 1 }
- match: { model_snapshots.0.timestamp: 1464825600000 }
- length: { model_snapshots: 1 }

View File

@ -34,7 +34,6 @@ setup:
{
"job_id": "foo",
"timestamp": "2016-06-02T00:00:00Z",
"restore_priority": "1",
"snapshot_id": "foo1",
"description": "first",
"latest_record_time_stamp": "2016-06-02T00:00:00Z",
@ -50,7 +49,6 @@ setup:
{
"job_id": "foo",
"timestamp": "2016-06-01T00:00:00Z",
"restore_priority": "2",
"snapshot_id": "foo2",
"description": "second",
"latest_record_time_stamp": "2016-06-01T00:00:00Z",
@ -151,7 +149,6 @@ setup:
- match: { acknowledged: true }
- match: { model.job_id: "foo" }
- match: { model.timestamp: 1464825600000 }
- match: { model.restore_priority: 1 }
- match: { model.snapshot_id: "foo1" }
- match: { model.snapshot_doc_count: 0 }
@ -163,7 +160,6 @@ setup:
- match: { acknowledged: true }
- match: { model.job_id: "foo" }
- match: { model.timestamp: 1464739200000 }
- match: { model.restore_priority: 2 }
- match: { model.snapshot_id: "foo2" }
- match: { model.snapshot_doc_count: 0 }

View File

@ -8,8 +8,6 @@ setup:
properties:
"timestamp":
type: date
"restore_priority":
type: integer
- do:
index:
@ -20,7 +18,6 @@ setup:
{
"job_id" : "foo",
"timestamp": "2016-06-02T00:00:00Z",
"restore_priority": "1",
"snapshot_id": "foo"
}
@ -33,7 +30,6 @@ setup:
{
"job_id": "foo",
"timestamp": "2016-06-01T00:00:00Z",
"restore_priority": "2",
"snapshot_id": "bar",
"description": "bar"
}
@ -86,7 +82,6 @@ setup:
description: "new_description"
- match: { count: 1 }
- match: { model_snapshots.0.restore_priority: 1 }
- match: { model_snapshots.0.timestamp: 1464825600000 }
---