[ML] Drive categorization warning notifications from annotations (#59393)

With the introduction of per-partition categorization the old
logic for creating a job notification for categorization status
"warn" does not work.  However, the C++ code is already writing
annotations for categorization status "warn" that take into
account whether per-partition categorization is being used and
which partition(s) the warnings relate to.  Therefore, this
change alters the Java results processor to create notifications
based on the annotations the C++ writes.  (It is arguable that
we don't need both annotations and notifications, but they show
up in different ways in the UI: only annotations are visible in
results and only notifications set the warning symbol in the
jobs list.  This means it's best to have both.)

Backport of #59377
This commit is contained in:
David Roberts 2020-07-13 15:28:57 +01:00 committed by GitHub
parent c228532ebd
commit b5e8250a4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 46 deletions

View File

@ -145,8 +145,6 @@ public final class Messages {
"Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_AUDIT_CATEGORIZATION_STATUS_WARN = "categorization_status changed to [{0}] after [{1}] buckets." +
" This suggests an inappropriate categorization_field_name has been chosen.";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =

View File

@ -21,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizationStatus;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerStats;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
@ -282,6 +281,7 @@ public class AutodetectResultProcessor {
Annotation annotation = result.getAnnotation();
if (annotation != null) {
bulkAnnotationsPersister.persistAnnotation(annotation);
notifyCategorizationStatusChange(annotation);
}
Forecast forecast = result.getForecast();
if (forecast != null) {
@ -396,7 +396,6 @@ public class AutodetectResultProcessor {
persister.persistModelSizeStats(modelSizeStats, this::isAlive);
notifyModelMemoryStatusChange(modelSizeStats);
notifyCategorizationStatusChange(modelSizeStats);
latestModelSizeStats = modelSizeStats;
}
@ -419,13 +418,11 @@ public class AutodetectResultProcessor {
}
}
private void notifyCategorizationStatusChange(ModelSizeStats modelSizeStats) {
CategorizationStatus categorizationStatus = modelSizeStats.getCategorizationStatus();
if (categorizationStatus != latestModelSizeStats.getCategorizationStatus()) {
if (categorizationStatus == CategorizationStatus.WARN) {
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, categorizationStatus,
priorRunsBucketCount + currentRunBucketCount));
}
private void notifyCategorizationStatusChange(Annotation annotation) {
if (annotation.getEvent() == Annotation.Event.CATEGORIZATION_STATUS_CHANGE) {
long bucketCount = priorRunsBucketCount + currentRunBucketCount;
auditor.warning(jobId, annotation.getAnnotation() + " after "
+ bucketCount + ((bucketCount == 1) ? " bucket" : " buckets"));
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizationStatus;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
@ -68,6 +67,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
@ -293,6 +293,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(bulkAnnotationsPersister).persistAnnotation(annotation);
if (annotation.getEvent() == Annotation.Event.CATEGORIZATION_STATUS_CHANGE) {
verify(auditor).warning(eq(JOB_ID), anyString());
}
}
public void testProcessResult_modelSizeStats() {
@ -343,46 +346,27 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
}
public void testProcessResult_modelSizeStatsWithCategorizationStatusChanges() {
public void testProcessResult_categorizationStatusChangeAnnotationCausesNotification() {
AutodetectResult result = mock(AutodetectResult.class);
processorUnderTest.setDeleteInterimRequired(false);
// First one with ok
ModelSizeStats modelSizeStats =
new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(CategorizationStatus.OK).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);
// Now one with warn
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(CategorizationStatus.WARN).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);
// Another with warn
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(CategorizationStatus.WARN).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
Annotation annotation = new Annotation.Builder()
.setType(Annotation.Type.ANNOTATION)
.setJobId(JOB_ID)
.setAnnotation("Categorization status changed to 'warn' for partition 'foo'")
.setEvent(Annotation.Event.CATEGORIZATION_STATUS_CHANGE)
.setCreateTime(new Date())
.setCreateUsername(XPackUser.NAME)
.setTimestamp(new Date())
.setPartitionFieldName("part")
.setPartitionFieldValue("foo")
.build();
when(result.getAnnotation()).thenReturn(annotation);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any());
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
}
public void testProcessResult_modelSizeStatsWithFirstCategorizationStatusWarn() {
AutodetectResult result = mock(AutodetectResult.class);
processorUnderTest.setDeleteInterimRequired(false);
// First one with warn - this works because a default constructed ModelSizeStats has CategorizationStatus.OK
ModelSizeStats modelSizeStats =
new ModelSizeStats.Builder(JOB_ID).setCategorizationStatus(CategorizationStatus.WARN).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistModelSizeStats(any(ModelSizeStats.class), any());
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
verify(bulkAnnotationsPersister).persistAnnotation(annotation);
verify(auditor).warning(JOB_ID, "Categorization status changed to 'warn' for partition 'foo' after 0 buckets");
}
public void testProcessResult_modelSnapshot() {