Extract TimingStats-related functionality into TimingStatsReporter (#43371) (#43557)

This commit is contained in:
Przemysław Witek 2019-06-25 15:48:39 +02:00 committed by GitHub
parent c594a956e2
commit b15e40ffad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 447 additions and 363 deletions

View File

@ -864,7 +864,7 @@ public class ElasticsearchMappings {
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.startObject(TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}

View File

@ -31,7 +31,7 @@ public class TimingStats implements ToXContentObject, Writeable {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");
public static final ParseField TYPE = new ParseField("timing_stats");
@ -49,7 +49,7 @@ public class TimingStats implements ToXContentObject, Writeable {
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
}
public static String documentId(String jobId) {
@ -185,7 +185,7 @@ public class TimingStats implements ToXContentObject, Writeable {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
@ -219,34 +219,4 @@ public class TimingStats implements ToXContentObject, Writeable {
public String toString() {
return Strings.toString(this);
}
/**
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
*/
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
|| differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
}
/**
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
* This can be interpreted as values { value1, value2 } differing significantly from each other.
* This method also returns:
* - {@code true} in case one value is {@code null} while the other is not.
* - {@code false} in case both values are {@code null}.
*/
static boolean differSignificantly(Double value1, Double value2) {
if (value1 != null && value2 != null) {
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
}
return (value1 != null) || (value2 != null);
}
/**
* Minimum ratio of values that is interpreted as values being similar.
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
*/
private static final double MIN_VALID_RATIO = 0.9;
}

View File

@ -179,7 +179,7 @@ public final class ReservedFieldNames {
TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
GetResult._ID,
GetResult._INDEX,

View File

@ -13,7 +13,6 @@ import org.hamcrest.Matcher;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
@ -124,33 +123,6 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
}
public void testTimingStatsDifferSignificantly() {
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
is(true));
}
public void testValuesDifferSignificantly() {
assertThat(TimingStats.differSignificantly((Double) null, (Double) null), is(false));
assertThat(TimingStats.differSignificantly(1.0, null), is(true));
assertThat(TimingStats.differSignificantly(null, 1.0), is(true));
assertThat(TimingStats.differSignificantly(0.9, 1.0), is(false));
assertThat(TimingStats.differSignificantly(1.0, 0.9), is(false));
assertThat(TimingStats.differSignificantly(0.9, 1.000001), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.899999), is(true));
assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
}
/**
* Creates a matcher of {@link TimingStats}s that matches when an examined stats are equal
* to the specified <code>operand</code>, within a range of +/- <code>error</code>.

View File

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import java.util.Objects;
/**
* {@link TimingStatsReporter} class handles the logic of persisting {@link TimingStats} if they changed significantly since the last time
* they were persisted.
*
* This class is not thread-safe.
*/
public class TimingStatsReporter {
/** Persisted timing stats. May be stale. */
private TimingStats persistedTimingStats;
/** Current timing stats. */
private TimingStats currentTimingStats;
/** Object used to persist current timing stats. */
private JobResultsPersister.Builder bulkResultsPersister;
public TimingStatsReporter(TimingStats timingStats, JobResultsPersister.Builder jobResultsPersister) {
Objects.requireNonNull(timingStats);
this.persistedTimingStats = new TimingStats(timingStats);
this.currentTimingStats = new TimingStats(timingStats);
this.bulkResultsPersister = Objects.requireNonNull(jobResultsPersister);
}
public TimingStats getCurrentTimingStats() {
return new TimingStats(currentTimingStats);
}
public void reportBucketProcessingTime(long bucketProcessingTimeMs) {
currentTimingStats.updateStats(bucketProcessingTimeMs);
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush();
}
}
public void flush() {
persistedTimingStats = new TimingStats(currentTimingStats);
bulkResultsPersister.persistTimingStats(persistedTimingStats);
}
/**
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
*/
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.getMinBucketProcessingTimeMs(), stats2.getMinBucketProcessingTimeMs())
|| differSignificantly(stats1.getMaxBucketProcessingTimeMs(), stats2.getMaxBucketProcessingTimeMs())
|| differSignificantly(stats1.getAvgBucketProcessingTimeMs(), stats2.getAvgBucketProcessingTimeMs())
|| differSignificantly(stats1.getExponentialAvgBucketProcessingTimeMs(), stats2.getExponentialAvgBucketProcessingTimeMs());
}
/**
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
* This can be interpreted as values { value1, value2 } differing significantly from each other.
* This method also returns:
* - {@code true} in case one value is {@code null} while the other is not.
* - {@code false} in case both values are {@code null}.
*/
static boolean differSignificantly(Double value1, Double value2) {
if (value1 != null && value2 != null) {
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
}
return (value1 != null) || (value2 != null);
}
/**
* Minimum ratio of values that is interpreted as values being similar.
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
*/
private static final double MIN_VALID_RATIO = 0.9;
}

View File

@ -517,12 +517,13 @@ public class AutodetectProcessManager implements ClusterStateListener {
jobId,
renormalizer,
jobResultsPersister,
process,
autodetectParams.modelSizeStats(),
autodetectParams.timingStats());
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autodetectExecutorService.submit(() -> processor.process(process));
autodetectExecutorService.submit(processor::process);
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.TimingStatsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -53,7 +54,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* A runnable class that reads the autodetect process output in the
* {@link #process(AutodetectProcess)} method and persists parsed
* {@link #process()} method and persists parsed
* results via the {@linkplain JobResultsPersister} passed in the constructor.
* <p>
* Has methods to register and remove alert observers.
@ -77,6 +78,8 @@ public class AutodetectResultProcessor {
private final String jobId;
private final Renormalizer renormalizer;
private final JobResultsPersister persister;
private final AutodetectProcess process;
private final TimingStatsReporter timingStatsReporter;
final CountDownLatch completionLatch = new CountDownLatch(1);
final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
@ -84,57 +87,55 @@ public class AutodetectResultProcessor {
private volatile boolean processKilled;
private volatile boolean failed;
private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
private final JobResultsPersister.Builder bulkResultsPersister;
private boolean deleteInterimRequired;
/**
* New model size stats are read as the process is running
*/
private volatile ModelSizeStats latestModelSizeStats;
/**
* Current timing stats
*/
private volatile TimingStats timingStats;
/**
* Persisted timing stats. May be stale
*/
private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile
public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
public AutodetectResultProcessor(Client client,
Auditor auditor,
String jobId,
Renormalizer renormalizer,
JobResultsPersister persister,
AutodetectProcess process,
ModelSizeStats latestModelSizeStats,
TimingStats timingStats) {
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
this(client, auditor, jobId, renormalizer, persister, process, latestModelSizeStats, timingStats, new FlushListener());
}
// Visible for testing
AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
JobResultsPersister persister, AutodetectProcess autodetectProcess, ModelSizeStats latestModelSizeStats,
TimingStats timingStats,
FlushListener flushListener) {
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
this.jobId = Objects.requireNonNull(jobId);
this.renormalizer = Objects.requireNonNull(renormalizer);
this.persister = Objects.requireNonNull(persister);
this.process = Objects.requireNonNull(autodetectProcess);
this.flushListener = Objects.requireNonNull(flushListener);
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
this.persistedTimingStats = Objects.requireNonNull(timingStats);
this.timingStats = new TimingStats(persistedTimingStats);
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId);
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
this.deleteInterimRequired = true;
}
public void process(AutodetectProcess process) {
Context context = new Context(jobId, persister.bulkPersisterBuilder(jobId));
public void process() {
// If a function call in this throws for some reason we don't want it
// to kill the results reader thread as autodetect will be blocked
// trying to write its output.
try {
readResults(process, context);
readResults();
try {
if (processKilled == false) {
context.bulkResultsPersister
.persistTimingStats(timingStats)
.executeRequest();
timingStatsReporter.flush();
bulkResultsPersister.executeRequest();
}
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
@ -164,14 +165,14 @@ public class AutodetectResultProcessor {
}
}
private void readResults(AutodetectProcess process, Context context) {
private void readResults() {
bucketCount = 0;
try {
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(context, result);
processResult(result);
if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
}
@ -195,35 +196,35 @@ public class AutodetectResultProcessor {
renormalizer.shutdown();
}
void processResult(Context context, AutodetectResult result) {
void processResult(AutodetectResult result) {
if (processKilled) {
return;
}
Bucket bucket = result.getBucket();
if (bucket != null) {
if (context.deleteInterimRequired) {
if (deleteInterimRequired) {
// Delete any existing interim results generated by a Flush command
// which have not been replaced or superseded by new results.
LOGGER.trace("[{}] Deleting interim results", context.jobId);
persister.deleteInterimResults(context.jobId);
context.deleteInterimRequired = false;
LOGGER.trace("[{}] Deleting interim results", jobId);
persister.deleteInterimResults(jobId);
deleteInterimRequired = false;
}
// persist after deleting interim results in case the new
// results are also interim
processTimingStats(context, bucket.getProcessingTimeMs());
context.bulkResultsPersister.persistBucket(bucket).executeRequest();
timingStatsReporter.reportBucketProcessingTime(bucket.getProcessingTimeMs());
bulkResultsPersister.persistBucket(bucket).executeRequest();
++bucketCount;
}
List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) {
context.bulkResultsPersister.persistRecords(records);
bulkResultsPersister.persistRecords(records);
}
List<Influencer> influencers = result.getInfluencers();
if (influencers != null && !influencers.isEmpty()) {
context.bulkResultsPersister.persistInfluencers(influencers);
bulkResultsPersister.persistInfluencers(influencers);
}
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
if (categoryDefinition != null) {
@ -231,16 +232,16 @@ public class AutodetectResultProcessor {
}
ModelPlot modelPlot = result.getModelPlot();
if (modelPlot != null) {
context.bulkResultsPersister.persistModelPlot(modelPlot);
bulkResultsPersister.persistModelPlot(modelPlot);
}
Forecast forecast = result.getForecast();
if (forecast != null) {
context.bulkResultsPersister.persistForecast(forecast);
bulkResultsPersister.persistForecast(forecast);
}
ForecastRequestStats forecastRequestStats = result.getForecastRequestStats();
if (forecastRequestStats != null) {
LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
context.bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
// execute the bulk request only in some cases or in doubt
// otherwise rely on the count-based trigger
@ -252,13 +253,13 @@ public class AutodetectResultProcessor {
case SCHEDULED:
case FINISHED:
default:
context.bulkResultsPersister.executeRequest();
bulkResultsPersister.executeRequest();
}
}
ModelSizeStats modelSizeStats = result.getModelSizeStats();
if (modelSizeStats != null) {
processModelSizeStats(context, modelSizeStats);
processModelSizeStats(modelSizeStats);
}
ModelSnapshot modelSnapshot = result.getModelSnapshot();
if (modelSnapshot != null) {
@ -270,64 +271,55 @@ public class AutodetectResultProcessor {
}
Quantiles quantiles = result.getQuantiles();
if (quantiles != null) {
LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", context.jobId, quantiles.getTimestamp());
LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", jobId, quantiles.getTimestamp());
persister.persistQuantiles(quantiles);
context.bulkResultsPersister.executeRequest();
bulkResultsPersister.executeRequest();
if (processKilled == false && renormalizer.isEnabled()) {
// We need to make all results written up to these quantiles available for renormalization
persister.commitResultWrites(context.jobId);
LOGGER.debug("[{}] Quantiles queued for renormalization", context.jobId);
persister.commitResultWrites(jobId);
LOGGER.debug("[{}] Quantiles queued for renormalization", jobId);
renormalizer.renormalize(quantiles);
}
}
FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
if (flushAcknowledgement != null) {
LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", context.jobId, flushAcknowledgement.getId());
LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", jobId, flushAcknowledgement.getId());
// Commit previous writes here, effectively continuing
// the flush from the C++ autodetect process right
// through to the data store
context.bulkResultsPersister.executeRequest();
persister.commitResultWrites(context.jobId);
bulkResultsPersister.executeRequest();
persister.commitResultWrites(jobId);
flushListener.acknowledgeFlush(flushAcknowledgement);
// Interim results may have been produced by the flush,
// which need to be
// deleted when the next finalized results come through
context.deleteInterimRequired = true;
deleteInterimRequired = true;
}
}
private void processTimingStats(Context context, long bucketProcessingTimeMs) {
timingStats.updateStats(bucketProcessingTimeMs);
if (TimingStats.differSignificantly(timingStats, persistedTimingStats)) {
context.bulkResultsPersister.persistTimingStats(timingStats);
persistedTimingStats = timingStats;
timingStats = new TimingStats(persistedTimingStats);
}
}
private void processModelSizeStats(Context context, ModelSizeStats modelSizeStats) {
private void processModelSizeStats(ModelSizeStats modelSizeStats) {
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(),
modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus());
persister.persistModelSizeStats(modelSizeStats);
notifyModelMemoryStatusChange(context, modelSizeStats);
notifyModelMemoryStatusChange(modelSizeStats);
latestModelSizeStats = modelSizeStats;
}
private void notifyModelMemoryStatusChange(Context context, ModelSizeStats modelSizeStats) {
private void notifyModelMemoryStatusChange(ModelSizeStats modelSizeStats) {
ModelSizeStats.MemoryStatus memoryStatus = modelSizeStats.getMemoryStatus();
if (memoryStatus != latestModelSizeStats.getMemoryStatus()) {
if (memoryStatus == ModelSizeStats.MemoryStatus.SOFT_LIMIT) {
auditor.warning(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
} else if (memoryStatus == ModelSizeStats.MemoryStatus.HARD_LIMIT) {
if (modelSizeStats.getModelBytesMemoryLimit() == null || modelSizeStats.getModelBytesExceeded() == null) {
auditor.error(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2,
auditor.error(jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2,
new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES).toString()));
} else {
auditor.error(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT,
auditor.error(jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT,
new ByteSizeValue(modelSizeStats.getModelBytesMemoryLimit(), ByteSizeUnit.BYTES).toString(),
new ByteSizeValue(modelSizeStats.getModelBytesExceeded(), ByteSizeUnit.BYTES).toString()));
}
@ -419,25 +411,19 @@ public class AutodetectResultProcessor {
return failed;
}
static class Context {
private final String jobId;
private JobResultsPersister.Builder bulkResultsPersister;
boolean deleteInterimRequired;
Context(String jobId, JobResultsPersister.Builder bulkResultsPersister) {
this.jobId = jobId;
this.deleteInterimRequired = true;
this.bulkResultsPersister = bulkResultsPersister;
}
}
public ModelSizeStats modelSizeStats() {
return latestModelSizeStats;
}
public TimingStats timingStats() {
return timingStats;
return timingStatsReporter.getCurrentTimingStats();
}
boolean isDeleteInterimRequired() {
return deleteInterimRequired;
}
void setDeleteInterimRequired(boolean deleteInterimRequired) {
this.deleteInterimRequired = deleteInterimRequired;
}
}

View File

@ -50,7 +50,6 @@ import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@ -77,6 +76,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutodetectResultProcessor resultProcessor;
private Renormalizer renormalizer;
private AutodetectProcess process;
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
@ -90,9 +90,17 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
Auditor auditor = new Auditor(client(), "test_node");
jobResultsProvider = new JobResultsProvider(client(), builder.build());
renormalizer = mock(Renormalizer.class);
process = mock(AutodetectProcess.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
resultProcessor = new AutodetectResultProcessor(
client(),
auditor,
JOB_ID,
renormalizer,
new JobResultsPersister(client()),
process,
new ModelSizeStats.Builder(JOB_ID).build(),
new TimingStats(JOB_ID)) {
@Override
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
@ -110,25 +118,26 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
}
public void testProcessResults() throws Exception {
ResultsBuilder builder = new ResultsBuilder();
ResultsBuilder resultsBuilder = new ResultsBuilder();
Bucket bucket = createBucket(false);
builder.addBucket(bucket);
resultsBuilder.addBucket(bucket);
List<AnomalyRecord> records = createRecords(false);
builder.addRecords(records);
resultsBuilder.addRecords(records);
List<Influencer> influencers = createInfluencers(false);
builder.addInfluencers(influencers);
resultsBuilder.addInfluencers(influencers);
CategoryDefinition categoryDefinition = createCategoryDefinition();
builder.addCategoryDefinition(categoryDefinition);
resultsBuilder.addCategoryDefinition(categoryDefinition);
ModelPlot modelPlot = createModelPlot();
builder.addModelPlot(modelPlot);
resultsBuilder.addModelPlot(modelPlot);
ModelSizeStats modelSizeStats = createModelSizeStats();
builder.addModelSizeStats(modelSizeStats);
resultsBuilder.addModelSizeStats(modelSizeStats);
ModelSnapshot modelSnapshot = createModelSnapshot();
builder.addModelSnapshot(modelSnapshot);
resultsBuilder.addModelSnapshot(modelSnapshot);
Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
resultsBuilder.addQuantiles(quantiles);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(builder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
BucketsQueryBuilder bucketsQuery = new BucketsQueryBuilder().includeInterim(true);
@ -167,7 +176,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
}
public void testProcessResults_TimingStats() throws Exception {
ResultsBuilder resultBuilder = new ResultsBuilder()
ResultsBuilder resultsBuilder = new ResultsBuilder()
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
@ -178,8 +187,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
.addBucket(createBucket(true, 1000))
.addBucket(createBucket(true, 100))
.addBucket(createBucket(true, 1000));
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(resultBuilder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
TimingStats timingStats = resultProcessor.timingStats();
@ -194,11 +204,12 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception {
when(renormalizer.isEnabled()).thenReturn(true);
ResultsBuilder builder = new ResultsBuilder();
ResultsBuilder resultsBuilder = new ResultsBuilder();
Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
resultsBuilder.addQuantiles(quantiles);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(builder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
Optional<Quantiles> persistedQuantiles = getQuantiles();
@ -210,11 +221,12 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception {
when(renormalizer.isEnabled()).thenReturn(false);
ResultsBuilder builder = new ResultsBuilder();
ResultsBuilder resultsBuilder = new ResultsBuilder();
Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
resultsBuilder.addQuantiles(quantiles);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(builder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
Optional<Quantiles> persistedQuantiles = getQuantiles();
@ -227,14 +239,15 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
Bucket nonInterimBucket = createBucket(false);
Bucket interimBucket = createBucket(true);
ResultsBuilder resultBuilder = new ResultsBuilder()
ResultsBuilder resultsBuilder = new ResultsBuilder()
.addRecords(createRecords(true))
.addInfluencers(createInfluencers(true))
.addBucket(interimBucket) // this will persist the interim results
.addFlushAcknowledgement(createFlushAcknowledgement())
.addBucket(nonInterimBucket); // and this will delete the interim results
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(resultBuilder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
@ -255,7 +268,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
Bucket finalBucket = createBucket(true);
List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
ResultsBuilder resultBuilder = new ResultsBuilder()
ResultsBuilder resultsBuilder = new ResultsBuilder()
.addRecords(createRecords(true))
.addInfluencers(createInfluencers(true))
.addBucket(createBucket(true)) // this will persist the interim results
@ -265,8 +278,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
.addFlushAcknowledgement(createFlushAcknowledgement())
.addRecords(finalAnomalyRecords)
.addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(resultBuilder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
@ -285,12 +299,13 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
List<AnomalyRecord> firstSetOfRecords = createRecords(false);
List<AnomalyRecord> secondSetOfRecords = createRecords(false);
ResultsBuilder resultBuilder = new ResultsBuilder()
ResultsBuilder resultsBuilder = new ResultsBuilder()
.addRecords(firstSetOfRecords)
.addBucket(bucket) // bucket triggers persistence
.addRecords(secondSetOfRecords);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
resultProcessor.process(resultBuilder.buildTestProcess());
resultProcessor.process();
resultProcessor.awaitCompletion();
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
@ -389,9 +404,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return new FlushAcknowledgement(randomAlphaOfLength(5), randomDate());
}
private class ResultsBuilder {
private static class ResultsBuilder {
private List<AutodetectResult> results = new ArrayList<>();
private final List<AutodetectResult> results = new ArrayList<>();
ResultsBuilder addBucket(Bucket bucket) {
results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null));
@ -438,12 +453,8 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return this;
}
AutodetectProcess buildTestProcess() {
AutodetectResult[] results = this.results.toArray(new AutodetectResult[0]);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(results).iterator());
return process;
Iterable<AutodetectResult> build() {
return results;
}
}

View File

@ -838,7 +838,7 @@ public class JobResultsProviderTests extends ESTestCase {
timingStatsMap.put(TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0);
timingStatsMap.put(TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0);
timingStatsMap.put(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0);
timingStatsMap.put(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0);
timingStatsMap.put(TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0);
List<Map<String, Object>> source = Arrays.asList(timingStatsMap);
SearchResponse response = createSearchResponse(source);

View File

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.junit.Before;
import org.mockito.InOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
public class TimingStatsReporterTests extends ESTestCase {
private static final String JOB_ID = "my-job-id";
private JobResultsPersister.Builder bulkResultsPersister;
@Before
public void setUpTests() {
bulkResultsPersister = mock(JobResultsPersister.Builder.class);
}
public void testGetCurrentTimingStats() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStatsReporter reporter = new TimingStatsReporter(stats, bulkResultsPersister);
assertThat(reporter.getCurrentTimingStats(), equalTo(stats));
verifyZeroInteractions(bulkResultsPersister);
}
public void testReporting() {
TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID)));
reporter.reportBucketProcessingTime(10);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)));
reporter.reportBucketProcessingTime(20);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1)));
reporter.reportBucketProcessingTime(15);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 20.0, 15.0, 10.149)));
InOrder inOrder = inOrder(bulkResultsPersister);
inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0));
inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1));
inOrder.verifyNoMoreInteractions();
}
public void testFlush() {
TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID)));
reporter.reportBucketProcessingTime(10);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)));
reporter.reportBucketProcessingTime(10);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 2, 10.0, 10.0, 10.0, 10.0)));
reporter.reportBucketProcessingTime(10);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)));
reporter.flush();
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)));
InOrder inOrder = inOrder(bulkResultsPersister);
inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0));
inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0));
inOrder.verifyNoMoreInteractions();
}
public void testTimingStatsDifferSignificantly() {
assertThat(
TimingStatsReporter.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStatsReporter.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStatsReporter.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
is(true));
}
public void testValuesDifferSignificantly() {
assertThat(TimingStatsReporter.differSignificantly((Double) null, (Double) null), is(false));
assertThat(TimingStatsReporter.differSignificantly(1.0, null), is(true));
assertThat(TimingStatsReporter.differSignificantly(null, 1.0), is(true));
assertThat(TimingStatsReporter.differSignificantly(0.9, 1.0), is(false));
assertThat(TimingStatsReporter.differSignificantly(1.0, 0.9), is(false));
assertThat(TimingStatsReporter.differSignificantly(0.9, 1.000001), is(true));
assertThat(TimingStatsReporter.differSignificantly(1.0, 0.899999), is(true));
assertThat(TimingStatsReporter.differSignificantly(0.0, 1.0), is(true));
assertThat(TimingStatsReporter.differSignificantly(1.0, 0.0), is(true));
}
}

View File

@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
@ -77,6 +78,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
private Auditor auditor;
private Renormalizer renormalizer;
private JobResultsPersister persister;
private JobResultsPersister.Builder bulkBuilder;
private AutodetectProcess process;
private FlushListener flushListener;
private AutodetectResultProcessor processorUnderTest;
private ScheduledThreadPoolExecutor executor;
@ -91,8 +94,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
auditor = mock(Auditor.class);
renormalizer = mock(Renormalizer.class);
persister = mock(JobResultsPersister.class);
when(persister.persistModelSnapshot(any(), any()))
.thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true));
bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
process = mock(AutodetectProcess.class);
flushListener = mock(FlushListener.class);
processorUnderTest = new AutodetectResultProcessor(
client,
@ -100,6 +104,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
JOB_ID,
renormalizer,
persister,
process,
new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(),
new TimingStats(JOB_ID),
flushListener);
@ -107,143 +112,120 @@ public class AutodetectResultProcessorTests extends ESTestCase {
@After
public void cleanup() {
verifyNoMoreInteractions(auditor, renormalizer, persister);
executor.shutdown();
}
public void testProcess() throws TimeoutException {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
processorUnderTest.process(process);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
processorUnderTest.process();
processorUnderTest.awaitCompletion();
verify(renormalizer, times(1)).waitUntilIdle();
assertEquals(0, processorUnderTest.completionLatch.getCount());
assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
verify(renormalizer).waitUntilIdle();
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitStateWrites(JOB_ID);
}
public void testProcessResult_bucket() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder).persistBucket(bucket);
verify(bulkBuilder).executeRequest();
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister, never()).deleteInterimResults(JOB_ID);
verifyNoMoreInteractions(persister);
}
public void testProcessResult_bucket_deleteInterimRequired() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = true;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
verify(persister, times(1)).deleteInterimResults(JOB_ID);
verifyNoMoreInteractions(persister);
assertFalse(context.deleteInterimRequired);
processorUnderTest.processResult(result);
assertFalse(processorUnderTest.isDeleteInterimRequired());
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder).persistBucket(bucket);
verify(bulkBuilder).executeRequest();
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).deleteInterimResults(JOB_ID);
}
public void testProcessResult_records() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context("foo", bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123);
AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123);
List<AnomalyRecord> records = Arrays.asList(record1, record2);
List<AnomalyRecord> records =
Arrays.asList(
new AnomalyRecord(JOB_ID, new Date(123), 123),
new AnomalyRecord(JOB_ID, new Date(123), 123));
when(result.getRecords()).thenReturn(records);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistRecords(records);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);
verify(persister).bulkPersisterBuilder(JOB_ID);
}
public void testProcessResult_influencers() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123);
Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123);
List<Influencer> influencers = Arrays.asList(influencer1, influencer2);
List<Influencer> influencers =
Arrays.asList(
new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123),
new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123));
when(result.getInfluencers()).thenReturn(influencers);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistInfluencers(influencers);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder).persistInfluencers(influencers);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);
verify(persister).bulkPersisterBuilder(JOB_ID);
}
public void testProcessResult_categoryDefinition() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
processorUnderTest.processResult(context, result);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder, never()).executeRequest();
verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
verifyNoMoreInteractions(persister);
verify(persister).persistCategoryDefinition(categoryDefinition);
verify(persister).bulkPersisterBuilder(JOB_ID);
}
public void testProcessResult_flushAcknowledgement() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
processorUnderTest.processResult(context, result);
verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
verify(persister, times(1)).commitResultWrites(JOB_ID);
verify(bulkBuilder, times(1)).executeRequest();
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
assertTrue(processorUnderTest.isDeleteInterimRequired());
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(flushListener).acknowledgeFlush(flushAcknowledgement);
verify(persister).commitResultWrites(JOB_ID);
verify(bulkBuilder).executeRequest();
}
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
@ -251,64 +233,61 @@ public class AutodetectResultProcessorTests extends ESTestCase {
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
processorUnderTest.processResult(context, result);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
assertTrue(processorUnderTest.isDeleteInterimRequired());
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(bulkBuilder, times(1)).executeRequest();
inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
inOrder.verify(persister).bulkPersisterBuilder(JOB_ID);
inOrder.verify(persister).persistCategoryDefinition(categoryDefinition);
inOrder.verify(bulkBuilder).executeRequest();
inOrder.verify(persister).commitResultWrites(JOB_ID);
inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement);
}
public void testProcessResult_modelPlot() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelPlot modelPlot = mock(ModelPlot.class);
when(result.getModelPlot()).thenReturn(modelPlot);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistModelPlot(modelPlot);
verifyNoMoreInteractions(persister);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(bulkBuilder).persistModelPlot(modelPlot);
}
public void testProcessResult_modelSizeStats() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
verifyNoMoreInteractions(persister);
assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats)));
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).persistModelSizeStats(modelSizeStats);
}
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
TimeValue delay = TimeValue.timeValueSeconds(5);
// Set up schedule delay time
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
.thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
setupScheduleDelayTime(TimeValue.timeValueSeconds(5));
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
processorUnderTest.setDeleteInterimRequired(false);
// First one with soft_limit
ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
processorUnderTest.processResult(result);
// Another with soft_limit
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
processorUnderTest.processResult(result);
// Now with hard_limit
modelSizeStats = new ModelSizeStats.Builder(JOB_ID)
@ -317,115 +296,104 @@ public class AutodetectResultProcessorTests extends ESTestCase {
.setModelBytesExceeded(new ByteSizeValue(1, ByteSizeUnit.KB).getBytes())
.build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
processorUnderTest.processResult(result);
// And another with hard_limit
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class));
// We should have only fired two notifications: one for soft_limit and one for hard_limit
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
verifyNoMoreInteractions(auditor);
}
public void testProcessResult_modelSnapshot() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
.setSnapshotId("a_snapshot_id")
.setMinVersion(Version.CURRENT)
.build();
.setSnapshotId("a_snapshot_id")
.setMinVersion(Version.CURRENT)
.build();
when(result.getModelSnapshot()).thenReturn(modelSnapshot);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
when(persister.persistModelSnapshot(any(), any()))
.thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true));
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID,
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
verifyNoMoreInteractions(persister);
}
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
when(renormalizer.isEnabled()).thenReturn(true);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).persistQuantiles(quantiles);
verify(bulkBuilder).executeRequest();
verify(persister).commitResultWrites(JOB_ID);
verify(renormalizer, times(1)).isEnabled();
verify(renormalizer, times(1)).renormalize(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer);
verify(renormalizer).isEnabled();
verify(renormalizer).renormalize(quantiles);
}
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
when(renormalizer.isEnabled()).thenReturn(false);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).persistQuantiles(quantiles);
verify(bulkBuilder).executeRequest();
verify(renormalizer, times(1)).isEnabled();
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer);
verify(renormalizer).isEnabled();
}
public void testAwaitCompletion() throws TimeoutException {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
processorUnderTest.process(process);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
processorUnderTest.process();
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());
assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits());
assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitStateWrites(JOB_ID);
verify(renormalizer).waitUntilIdle();
}
public void testPersisterThrowingDoesntBlockProcessing() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutodetectResult autodetectResult = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
when(autodetectResult.getModelSnapshot()).thenReturn(modelSnapshot);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.isProcessAlive()).thenReturn(true);
when(process.isProcessAliveAfterWaiting()).thenReturn(true);
when(process.readAutodetectResults()).thenReturn(iterator);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult, autodetectResult).iterator());
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any());
processorUnderTest.process(process);
processorUnderTest.process();
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
}
@ -433,44 +401,36 @@ public class AutodetectResultProcessorTests extends ESTestCase {
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
assertFalse(processorUnderTest.isFailed());
processorUnderTest.process(process);
processorUnderTest.process();
assertTrue(processorUnderTest.isFailed());
// Wait for flush should return immediately
FlushAcknowledgement flushAcknowledgement = processorUnderTest.waitForFlushAcknowledgement(
"foo", Duration.of(300, ChronoUnit.SECONDS));
FlushAcknowledgement flushAcknowledgement =
processorUnderTest.waitForFlushAcknowledgement(JOB_ID, Duration.of(300, ChronoUnit.SECONDS));
assertThat(flushAcknowledgement, is(nullValue()));
verify(persister).bulkPersisterBuilder(JOB_ID);
}
public void testKill() throws TimeoutException {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenReturn(true).thenReturn(false);
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
processorUnderTest.setProcessKilled();
processorUnderTest.process(process);
processorUnderTest.process();
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());
assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits());
assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
verify(persister, times(1)).commitResultWrites(JOB_ID);
verify(persister, times(1)).commitStateWrites(JOB_ID);
verify(persister).bulkPersisterBuilder(JOB_ID);
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitStateWrites(JOB_ID);
verify(renormalizer, never()).renormalize(any());
verify(renormalizer).shutdown();
verify(renormalizer, times(1)).waitUntilIdle();
verify(flushListener, times(1)).clear();
}
private void setupScheduleDelayTime(TimeValue delay) {
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
.thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
verify(renormalizer).waitUntilIdle();
verify(flushListener).clear();
}
}