[7.x] Make Annotation a result type (#56342) (#57508)

This commit is contained in:
Przemysław Witek 2020-06-02 11:56:41 +02:00 committed by GitHub
parent 9bc9d01b84
commit ea6cfb7c3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 155 additions and 72 deletions

View File

@ -54,6 +54,11 @@ public class Annotation implements ToXContentObject, Writeable {
}
}
/**
* Result type is needed due to the fact that {@link Annotation} can be returned from C++ as an ML result.
*/
public static final ParseField RESULTS_FIELD = new ParseField("annotation");
public static final ParseField ANNOTATION = new ParseField("annotation");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField CREATE_USERNAME = new ParseField("create_username");
@ -81,7 +86,8 @@ public class Annotation implements ToXContentObject, Writeable {
/**
* Strict parser for cases when {@link Annotation} is returned from C++ as an ML result.
*/
private static final ObjectParser<Builder, Void> STRICT_PARSER = new ObjectParser<>(ANNOTATION.getPreferredName(), false, Builder::new);
private static final ObjectParser<Builder, Void> STRICT_PARSER =
new ObjectParser<>(RESULTS_FIELD.getPreferredName(), false, Builder::new);
static {
STRICT_PARSER.declareString(Builder::setAnnotation, ANNOTATION);

View File

@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@ -89,13 +90,18 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@ -168,7 +174,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
// Verify that deleting job also deletes associated model snapshots annotations
assertThat(getAnnotations(), empty());
assertThat(
getAnnotations().stream().map(Annotation::getAnnotation).collect(toList()),
everyItem(not(startsWith("Job model snapshot"))));
}
public void testProcessResults() throws Exception {
@ -183,6 +191,8 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
resultsBuilder.addCategoryDefinition(categoryDefinition);
ModelPlot modelPlot = createModelPlot();
resultsBuilder.addModelPlot(modelPlot);
Annotation annotation = createAnnotation();
resultsBuilder.addAnnotation(annotation);
ModelSizeStats modelSizeStats = createModelSizeStats();
resultsBuilder.addModelSizeStats(modelSizeStats);
ModelSnapshot modelSnapshot = createModelSnapshot();
@ -224,17 +234,20 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
assertEquals(Collections.singletonList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests);
// Verify that creating model snapshot also creates associated annotation
List<Annotation> annotations = getAnnotations();
assertThat(annotations, hasSize(1));
assertThat(
annotations.get(0).getAnnotation(),
is(equalTo(
new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage())));
Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
// Verify that there are two annotations:
// 1. one related to creating model snapshot
// 2. one for {@link Annotation} result
List<Annotation> annotations = getAnnotations();
assertThat("Annotations were: " + annotations.toString(), annotations, hasSize(2));
assertThat(
annotations.stream().map(Annotation::getAnnotation).collect(toList()),
containsInAnyOrder(
new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage(),
annotation.getAnnotation()));
}
public void testProcessResults_ModelSnapshot() throws Exception {
@ -466,6 +479,10 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
return new ModelPlotTests().createTestInstance(JOB_ID);
}
private static Annotation createAnnotation() {
return AnnotationTests.randomAnnotation(JOB_ID);
}
private static ModelSizeStats createModelSizeStats() {
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID);
builder.setTimestamp(randomDate());
@ -500,47 +517,53 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
private final List<AutodetectResult> results = new ArrayList<>();
ResultsBuilder addBucket(Bucket bucket) {
results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null));
results.add(
new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addRecords(List<AnomalyRecord> records) {
results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null, null, null));
results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addInfluencers(List<Influencer> influencers) {
results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null, null, null));
results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) {
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, categoryDefinition, null));
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, categoryDefinition, null));
return this;
}
ResultsBuilder addModelPlot(ModelPlot modelPlot) {
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null));
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null, null));
return this;
}
ResultsBuilder addAnnotation(Annotation annotation) {
results.add(new AutodetectResult(null, null, null, null, null, null, null, annotation, null, null, null, null));
return this;
}
ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) {
results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null, null, null));
results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null, null, null, null));
return this;
}
ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) {
results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null, null, null));
results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addQuantiles(Quantiles quantiles) {
results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null));
results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) {
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement));
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, flushAcknowledgement));
return this;
}

View File

@ -71,7 +71,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
open = false;
onProcessCrash.accept("simulated failure");
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, null);
results.add(result);
}
}
@ -104,7 +104,8 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override
public String flushJob(FlushJobParams params) throws IOException {
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement);
AutodetectResult result =
new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null,flushAcknowledgement);
results.add(result);
return FLUSH_ID;
}
@ -121,7 +122,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public void close() throws IOException {
if (open) {
Quantiles quantiles = new Quantiles(jobId, new Date(), "black hole quantiles");
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null);
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null, null);
results.add(result);
open = false;
}

View File

@ -88,7 +88,6 @@ public class AutodetectResultProcessor {
private final String jobId;
private final Renormalizer renormalizer;
private final JobResultsPersister persister;
private final AnnotationPersister annotationPersister;
private final AutodetectProcess process;
private final TimingStatsReporter timingStatsReporter;
private final Clock clock;
@ -102,6 +101,7 @@ public class AutodetectResultProcessor {
private final long priorRunsBucketCount;
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
private final JobResultsPersister.Builder bulkResultsPersister;
private final AnnotationPersister.Builder bulkAnnotationsPersister;
private boolean deleteInterimRequired;
/**
@ -131,11 +131,11 @@ public class AutodetectResultProcessor {
this.jobId = Objects.requireNonNull(jobId);
this.renormalizer = Objects.requireNonNull(renormalizer);
this.persister = Objects.requireNonNull(persister);
this.annotationPersister = Objects.requireNonNull(annotationPersister);
this.process = Objects.requireNonNull(autodetectProcess);
this.flushListener = Objects.requireNonNull(flushListener);
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
this.bulkAnnotationsPersister = annotationPersister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
this.clock = Objects.requireNonNull(clock);
this.deleteInterimRequired = true;
@ -155,6 +155,7 @@ public class AutodetectResultProcessor {
if (processKilled == false) {
timingStatsReporter.finishReporting();
bulkResultsPersister.executeRequest();
bulkAnnotationsPersister.executeRequest();
}
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
@ -253,6 +254,7 @@ public class AutodetectResultProcessor {
// results are also interim
timingStatsReporter.reportBucket(bucket);
bulkResultsPersister.persistBucket(bucket).executeRequest();
bulkAnnotationsPersister.executeRequest();
++currentRunBucketCount;
}
List<AnomalyRecord> records = result.getRecords();
@ -271,6 +273,10 @@ public class AutodetectResultProcessor {
if (modelPlot != null) {
bulkResultsPersister.persistModelPlot(modelPlot);
}
Annotation annotation = result.getAnnotation();
if (annotation != null) {
bulkAnnotationsPersister.persistAnnotation(annotation);
}
Forecast forecast = result.getForecast();
if (forecast != null) {
bulkResultsPersister.persistForecast(forecast);
@ -313,7 +319,7 @@ public class AutodetectResultProcessor {
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
updateModelSnapshotOnJob(modelSnapshot);
}
annotationPersister.persistAnnotation(
bulkAnnotationsPersister.persistAnnotation(
ModelSnapshot.annotationDocumentId(modelSnapshot), createModelSnapshotAnnotation(modelSnapshot));
}
Quantiles quantiles = result.getQuantiles();
@ -338,6 +344,7 @@ public class AutodetectResultProcessor {
Exception exception = null;
try {
bulkResultsPersister.executeRequest();
bulkAnnotationsPersister.executeRequest();
persister.commitResultWrites(jobId);
LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
} catch (Exception e) {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@ -38,8 +39,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
public static final ConstructingObjectParser<AutodetectResult, Void> PARSER = new ConstructingObjectParser<>(
TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (List<AnomalyRecord>) a[1], (List<Influencer>) a[2],
(Quantiles) a[3], a[4] == null ? null : ((ModelSnapshot.Builder) a[4]).build(),
a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(), (ModelPlot) a[6],
(Forecast) a[7], (ForecastRequestStats) a[8], (CategoryDefinition) a[9], (FlushAcknowledgement) a[10]));
a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(), (ModelPlot) a[6], (Annotation) a[7],
(Forecast) a[8], (ForecastRequestStats) a[9], (CategoryDefinition) a[10], (FlushAcknowledgement) a[11]));
static {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.STRICT_PARSER, Bucket.RESULT_TYPE_FIELD);
@ -51,6 +52,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.STRICT_PARSER,
ModelSizeStats.RESULT_TYPE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelPlot.STRICT_PARSER, ModelPlot.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Annotation::fromXContent, Annotation.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Forecast.STRICT_PARSER, Forecast.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastRequestStats.STRICT_PARSER,
ForecastRequestStats.RESULTS_FIELD);
@ -65,14 +67,16 @@ public class AutodetectResult implements ToXContentObject, Writeable {
private final ModelSnapshot modelSnapshot;
private final ModelSizeStats modelSizeStats;
private final ModelPlot modelPlot;
private final Annotation annotation;
private final Forecast forecast;
private final ForecastRequestStats forecastRequestStats;
private final CategoryDefinition categoryDefinition;
private final FlushAcknowledgement flushAcknowledgement;
public AutodetectResult(Bucket bucket, List<AnomalyRecord> records, List<Influencer> influencers, Quantiles quantiles,
ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot, Forecast forecast,
ForecastRequestStats forecastRequestStats, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot, Annotation annotation,
Forecast forecast, ForecastRequestStats forecastRequestStats, CategoryDefinition categoryDefinition,
FlushAcknowledgement flushAcknowledgement) {
this.bucket = bucket;
this.records = records;
this.influencers = influencers;
@ -80,6 +84,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
this.modelSnapshot = modelSnapshot;
this.modelSizeStats = modelSizeStats;
this.modelPlot = modelPlot;
this.annotation = annotation;
this.forecast = forecast;
this.forecastRequestStats = forecastRequestStats;
this.categoryDefinition = categoryDefinition;
@ -122,6 +127,15 @@ public class AutodetectResult implements ToXContentObject, Writeable {
} else {
this.modelPlot = null;
}
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
if (in.readBoolean()) {
this.annotation = new Annotation(in);
} else {
this.annotation = null;
}
} else {
this.annotation = null;
}
if (in.readBoolean()) {
this.categoryDefinition = new CategoryDefinition(in);
} else {
@ -159,6 +173,9 @@ public class AutodetectResult implements ToXContentObject, Writeable {
writeNullable(modelSnapshot, out);
writeNullable(modelSizeStats, out);
writeNullable(modelPlot, out);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
writeNullable(annotation, out);
}
writeNullable(categoryDefinition, out);
writeNullable(flushAcknowledgement, out);
@ -194,6 +211,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder);
addNullableField(ModelSizeStats.RESULT_TYPE_FIELD, modelSizeStats, builder);
addNullableField(ModelPlot.RESULTS_FIELD, modelPlot, builder);
addNullableField(Annotation.RESULTS_FIELD, annotation, builder);
addNullableField(Forecast.RESULTS_FIELD, forecast, builder);
addNullableField(ForecastRequestStats.RESULTS_FIELD, forecastRequestStats, builder);
addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder);
@ -242,6 +260,10 @@ public class AutodetectResult implements ToXContentObject, Writeable {
return modelPlot;
}
public Annotation getAnnotation() {
return annotation;
}
public Forecast getForecast() {
return forecast;
}
@ -260,7 +282,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, forecast,
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, annotation, forecast,
forecastRequestStats, modelSizeStats, modelSnapshot, quantiles);
}
@ -279,6 +301,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
Objects.equals(categoryDefinition, other.categoryDefinition) &&
Objects.equals(flushAcknowledgement, other.flushAcknowledgement) &&
Objects.equals(modelPlot, other.modelPlot) &&
Objects.equals(annotation, other.annotation) &&
Objects.equals(forecast, other.forecast) &&
Objects.equals(forecastRequestStats, other.forecastRequestStats) &&
Objects.equals(modelSizeStats, other.modelSizeStats) &&

View File

@ -157,6 +157,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(bulkPersisterBuilder);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
annotationPersister = mock(AnnotationPersister.class);
AnnotationPersister.Builder bulkAnnotationsBuilder = mock(AnnotationPersister.Builder.class);
when(bulkAnnotationsBuilder.shouldRetry(any())).thenReturn(bulkAnnotationsBuilder);
when(annotationPersister.bulkPersisterBuilder(any())).thenReturn(bulkAnnotationsBuilder);
autodetectCommunicator = mock(AutodetectCommunicator.class);
autodetectFactory = mock(AutodetectProcessFactory.class);
normalizerFactory = mock(NormalizerFactory.class);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
@ -87,8 +88,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
private AnomalyDetectionAuditor auditor;
private Renormalizer renormalizer;
private JobResultsPersister persister;
private JobResultsPersister.Builder bulkBuilder;
private JobResultsPersister.Builder bulkResultsPersister;
private AnnotationPersister annotationPersister;
private AnnotationPersister.Builder bulkAnnotationsPersister;
private AutodetectProcess process;
private FlushListener flushListener;
private AutodetectResultProcessor processorUnderTest;
@ -104,10 +106,13 @@ public class AutodetectResultProcessorTests extends ESTestCase {
auditor = mock(AnomalyDetectionAuditor.class);
renormalizer = mock(Renormalizer.class);
persister = mock(JobResultsPersister.class);
bulkBuilder = mock(JobResultsPersister.Builder.class);
when(bulkBuilder.shouldRetry(any())).thenReturn(bulkBuilder);
bulkResultsPersister = mock(JobResultsPersister.Builder.class);
when(bulkResultsPersister.shouldRetry(any())).thenReturn(bulkResultsPersister);
when(persister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkResultsPersister);
annotationPersister = mock(AnnotationPersister.class);
when(persister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkBuilder);
bulkAnnotationsPersister = mock(AnnotationPersister.Builder.class);
when(bulkAnnotationsPersister.shouldRetry(any())).thenReturn(bulkAnnotationsPersister);
when(annotationPersister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkAnnotationsPersister);
process = mock(AutodetectProcess.class);
flushListener = mock(FlushListener.class);
processorUnderTest = new AutodetectResultProcessor(
@ -126,6 +131,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
@After
public void cleanup() {
verify(annotationPersister).bulkPersisterBuilder(eq(JOB_ID));
verifyNoMoreInteractions(auditor, renormalizer, persister, annotationPersister);
executor.shutdown();
}
@ -145,8 +151,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_bucket() {
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
when(bulkResultsPersister.persistTimingStats(any(TimingStats.class))).thenReturn(bulkResultsPersister);
when(bulkResultsPersister.persistBucket(any(Bucket.class))).thenReturn(bulkResultsPersister);
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = new Bucket(JOB_ID, new Date(), BUCKET_SPAN_MS);
when(result.getBucket()).thenReturn(bucket);
@ -154,16 +160,16 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder).persistBucket(bucket);
verify(bulkBuilder).executeRequest();
verify(bulkResultsPersister).persistTimingStats(any(TimingStats.class));
verify(bulkResultsPersister).persistBucket(bucket);
verify(bulkResultsPersister).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, never()).deleteInterimResults(JOB_ID);
}
public void testProcessResult_bucket_deleteInterimRequired() {
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
when(bulkResultsPersister.persistTimingStats(any(TimingStats.class))).thenReturn(bulkResultsPersister);
when(bulkResultsPersister.persistBucket(any(Bucket.class))).thenReturn(bulkResultsPersister);
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = new Bucket(JOB_ID, new Date(), BUCKET_SPAN_MS);
when(result.getBucket()).thenReturn(bucket);
@ -171,9 +177,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(result);
assertFalse(processorUnderTest.isDeleteInterimRequired());
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder).persistBucket(bucket);
verify(bulkBuilder).executeRequest();
verify(bulkResultsPersister).persistTimingStats(any(TimingStats.class));
verify(bulkResultsPersister).persistBucket(bucket);
verify(bulkResultsPersister).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).deleteInterimResults(JOB_ID);
}
@ -189,8 +195,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verify(bulkResultsPersister).persistRecords(records);
verify(bulkResultsPersister, never()).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
@ -205,8 +211,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder).persistInfluencers(influencers);
verify(bulkBuilder, never()).executeRequest();
verify(bulkResultsPersister).persistInfluencers(influencers);
verify(bulkResultsPersister, never()).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
@ -219,7 +225,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(bulkBuilder, never()).executeRequest();
verify(bulkResultsPersister, never()).executeRequest();
verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
@ -237,7 +243,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
verify(persister).commitResultWrites(JOB_ID);
verify(bulkBuilder).executeRequest();
verify(bulkResultsPersister).executeRequest();
}
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
@ -253,10 +259,10 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(result);
assertTrue(processorUnderTest.isDeleteInterimRequired());
InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
InOrder inOrder = inOrder(persister, bulkResultsPersister, flushListener);
inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID));
inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
inOrder.verify(bulkBuilder).executeRequest();
inOrder.verify(bulkResultsPersister).executeRequest();
inOrder.verify(persister).commitResultWrites(JOB_ID);
inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
}
@ -270,7 +276,18 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(bulkBuilder).persistModelPlot(modelPlot);
verify(bulkResultsPersister).persistModelPlot(modelPlot);
}
public void testProcessResult_annotation() {
AutodetectResult result = mock(AutodetectResult.class);
Annotation annotation = AnnotationTests.randomAnnotation(JOB_ID);
when(result.getAnnotation()).thenReturn(annotation);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(bulkAnnotationsPersister).persistAnnotation(annotation);
}
public void testProcessResult_modelSizeStats() {
@ -380,12 +397,6 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
ArgumentCaptor<Annotation> annotationCaptor = ArgumentCaptor.forClass(Annotation.class);
verify(annotationPersister).persistAnnotation(eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture());
Annotation annotation = annotationCaptor.getValue();
Annotation expectedAnnotation =
new Annotation.Builder()
.setAnnotation("Job model snapshot with id [a_snapshot_id] stored")
@ -399,11 +410,12 @@ public class AutodetectResultProcessorTests extends ESTestCase {
.setType(Annotation.Type.ANNOTATION)
.setEvent(Annotation.Event.MODEL_SNAPSHOT_STORED)
.build();
assertThat(annotation, is(equalTo(expectedAnnotation)));
UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID,
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
verify(bulkAnnotationsPersister).persistAnnotation(ModelSnapshot.annotationDocumentId(modelSnapshot), expectedAnnotation);
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
}
@ -418,7 +430,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistQuantiles(eq(quantiles), any());
verify(bulkBuilder).executeRequest();
verify(bulkResultsPersister).executeRequest();
verify(persister).commitResultWrites(JOB_ID);
verify(renormalizer).isEnabled();
verify(renormalizer).renormalize(quantiles);
@ -435,7 +447,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistQuantiles(eq(quantiles), any());
verify(bulkBuilder).executeRequest();
verify(bulkResultsPersister).executeRequest();
verify(renormalizer).isEnabled();
}
@ -509,7 +521,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
}
public void testProcessingOpenedForecasts() {
when(bulkBuilder.persistForecastRequestStats(any(ForecastRequestStats.class))).thenReturn(bulkBuilder);
when(bulkResultsPersister.persistForecastRequestStats(any(ForecastRequestStats.class))).thenReturn(bulkResultsPersister);
AutodetectResult result = mock(AutodetectResult.class);
ForecastRequestStats forecastRequestStats = new ForecastRequestStats("foo", "forecast");
forecastRequestStats.setStatus(ForecastRequestStats.ForecastRequestStatus.OK);
@ -522,8 +534,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.handleOpenForecasts();
verify(bulkBuilder, times(2)).persistForecastRequestStats(argument.capture());
verify(bulkBuilder, times(1)).executeRequest();
verify(bulkResultsPersister, times(2)).persistForecastRequestStats(argument.capture());
verify(bulkResultsPersister, times(1)).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, never()).deleteInterimResults(JOB_ID);
@ -534,7 +546,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
}
public void testProcessingForecasts() {
when(bulkBuilder.persistForecastRequestStats(any(ForecastRequestStats.class))).thenReturn(bulkBuilder);
when(bulkResultsPersister.persistForecastRequestStats(any(ForecastRequestStats.class))).thenReturn(bulkResultsPersister);
AutodetectResult result = mock(AutodetectResult.class);
ForecastRequestStats forecastRequestStats = new ForecastRequestStats("foo", "forecast");
forecastRequestStats.setStatus(ForecastRequestStats.ForecastRequestStatus.OK);
@ -555,8 +567,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
// There shouldn't be any opened forecasts. This call should do nothing
processorUnderTest.handleOpenForecasts();
verify(bulkBuilder, times(2)).persistForecastRequestStats(argument.capture());
verify(bulkBuilder, times(1)).executeRequest();
verify(bulkResultsPersister, times(2)).persistForecastRequestStats(argument.capture());
verify(bulkResultsPersister, times(1)).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, never()).deleteInterimResults(JOB_ID);

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@ -41,6 +43,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
ModelSnapshot modelSnapshot;
ModelSizeStats.Builder modelSizeStats;
ModelPlot modelPlot;
Annotation annotation;
Forecast forecast;
ForecastRequestStats forecastRequestStats;
CategoryDefinition categoryDefinition;
@ -92,6 +95,11 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
} else {
modelPlot = null;
}
if (randomBoolean()) {
annotation = AnnotationTests.randomAnnotation(jobId);
} else {
annotation = null;
}
if (randomBoolean()) {
forecast = new Forecast(jobId, randomAlphaOfLength(20), randomDate(),
randomNonNegativeLong(), randomInt());
@ -116,8 +124,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
flushAcknowledgement = null;
}
return new AutodetectResult(bucket, records, influencers, quantiles, modelSnapshot,
modelSizeStats == null ? null : modelSizeStats.build(), modelPlot, forecast, forecastRequestStats, categoryDefinition,
flushAcknowledgement);
modelSizeStats == null ? null : modelSizeStats.build(), modelPlot, annotation, forecast, forecastRequestStats,
categoryDefinition, flushAcknowledgement);
}
@Override