[ML] Add audit warning for 1000 categories found early in job (#51146)
If 1000 different category definitions are created for a job in the first 100 buckets it processes then an audit warning will now be created. (This will cause a yellow warning triangle in the ML UI's jobs list.) Such a large number of categories suggests that the field that categorization is working on is not well suited to the ML categorization functionality.
This commit is contained in:
parent
da73c9104e
commit
295665b1ea
|
@ -135,6 +135,8 @@ public final class Messages {
|
|||
"Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed";
|
||||
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " +
|
||||
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
|
||||
public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." +
|
||||
" This suggests an inappropriate categorization_field_name has been chosen.";
|
||||
|
||||
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
|
||||
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =
|
||||
|
|
|
@ -74,6 +74,9 @@ public class AutodetectResultProcessor {
|
|||
|
||||
private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
|
||||
|
||||
static final long EARLY_BUCKET_THRESHOLD = 100;
|
||||
static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000;
|
||||
|
||||
private final Client client;
|
||||
private final AnomalyDetectionAuditor auditor;
|
||||
private final String jobId;
|
||||
|
@ -87,7 +90,9 @@ public class AutodetectResultProcessor {
|
|||
private final FlushListener flushListener;
|
||||
private volatile boolean processKilled;
|
||||
private volatile boolean failed;
|
||||
private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
|
||||
private long priorRunsBucketCount;
|
||||
private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
|
||||
private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile
|
||||
private final JobResultsPersister.Builder bulkResultsPersister;
|
||||
private boolean deleteInterimRequired;
|
||||
|
||||
|
@ -122,6 +127,7 @@ public class AutodetectResultProcessor {
|
|||
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
|
||||
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
|
||||
this.deleteInterimRequired = true;
|
||||
this.priorRunsBucketCount = timingStats.getBucketCount();
|
||||
}
|
||||
|
||||
public void process() {
|
||||
|
@ -140,7 +146,7 @@ public class AutodetectResultProcessor {
|
|||
} catch (Exception e) {
|
||||
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
|
||||
}
|
||||
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
|
||||
LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
failed = true;
|
||||
|
@ -166,7 +172,7 @@ public class AutodetectResultProcessor {
|
|||
}
|
||||
|
||||
private void readResults() {
|
||||
bucketCount = 0;
|
||||
currentRunBucketCount = 0;
|
||||
try {
|
||||
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
|
||||
while (iterator.hasNext()) {
|
||||
|
@ -174,7 +180,7 @@ public class AutodetectResultProcessor {
|
|||
AutodetectResult result = iterator.next();
|
||||
processResult(result);
|
||||
if (result.getBucket() != null) {
|
||||
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
|
||||
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (isAlive() == false) {
|
||||
|
@ -212,7 +218,7 @@ public class AutodetectResultProcessor {
|
|||
// results are also interim
|
||||
timingStatsReporter.reportBucket(bucket);
|
||||
bulkResultsPersister.persistBucket(bucket).executeRequest();
|
||||
++bucketCount;
|
||||
++currentRunBucketCount;
|
||||
}
|
||||
List<AnomalyRecord> records = result.getRecords();
|
||||
if (records != null && !records.isEmpty()) {
|
||||
|
@ -224,7 +230,7 @@ public class AutodetectResultProcessor {
|
|||
}
|
||||
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
|
||||
if (categoryDefinition != null) {
|
||||
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
|
||||
processCategoryDefinition(categoryDefinition);
|
||||
}
|
||||
ModelPlot modelPlot = result.getModelPlot();
|
||||
if (modelPlot != null) {
|
||||
|
@ -308,6 +314,22 @@ public class AutodetectResultProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private void processCategoryDefinition(CategoryDefinition categoryDefinition) {
|
||||
persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
|
||||
if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT &&
|
||||
priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD &&
|
||||
excessiveCategoryWarningIssued == false) {
|
||||
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT,
|
||||
// Add 1 because category definitions are written before buckets
|
||||
1L + priorRunsBucketCount + currentRunBucketCount));
|
||||
// This flag won't be retained if the job is closed and reopened, or if the job migrates to another node.
|
||||
// This means it's possible the audit message is generated multiple times. However, that's not a
|
||||
// disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers
|
||||
// more than 100 buckets.
|
||||
excessiveCategoryWarningIssued = true;
|
||||
}
|
||||
}
|
||||
|
||||
private void processModelSizeStats(ModelSizeStats modelSizeStats) {
|
||||
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
|
||||
jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
|
||||
|
|
|
@ -133,7 +133,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister).commitStateWrites(JOB_ID);
|
||||
}
|
||||
|
||||
public void testProcessResult_bucket() throws Exception {
|
||||
public void testProcessResult_bucket() {
|
||||
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
|
||||
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
|
@ -150,7 +150,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister, never()).deleteInterimResults(JOB_ID);
|
||||
}
|
||||
|
||||
public void testProcessResult_bucket_deleteInterimRequired() throws Exception {
|
||||
public void testProcessResult_bucket_deleteInterimRequired() {
|
||||
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
|
||||
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
|
@ -167,7 +167,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister).deleteInterimResults(JOB_ID);
|
||||
}
|
||||
|
||||
public void testProcessResult_records() throws Exception {
|
||||
public void testProcessResult_records() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
List<AnomalyRecord> records =
|
||||
Arrays.asList(
|
||||
|
@ -183,7 +183,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
|
||||
}
|
||||
|
||||
public void testProcessResult_influencers() throws Exception {
|
||||
public void testProcessResult_influencers() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
List<Influencer> influencers =
|
||||
Arrays.asList(
|
||||
|
@ -199,9 +199,10 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
|
||||
}
|
||||
|
||||
public void testProcessResult_categoryDefinition() throws Exception {
|
||||
public void testProcessResult_categoryDefinition() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
|
||||
when(categoryDefinition.getCategoryId()).thenReturn(1L);
|
||||
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
|
||||
|
||||
processorUnderTest.setDeleteInterimRequired(false);
|
||||
|
@ -212,7 +213,66 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
|
||||
}
|
||||
|
||||
public void testProcessResult_flushAcknowledgement() throws Exception {
|
||||
public void testProcessResult_excessiveCategoryDefinitionCountEarly() {
|
||||
int iterations = 3;
|
||||
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
|
||||
|
||||
processorUnderTest.setDeleteInterimRequired(false);
|
||||
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
for (int iteration = 1; iteration <= iterations; ++iteration) {
|
||||
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
|
||||
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
|
||||
categoryDefinition.setCategoryId(categoryId);
|
||||
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
|
||||
|
||||
processorUnderTest.processResult(result);
|
||||
}
|
||||
}
|
||||
|
||||
verify(bulkBuilder, never()).executeRequest();
|
||||
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
|
||||
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
|
||||
verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES,
|
||||
AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1)));
|
||||
}
|
||||
|
||||
public void testProcessResult_highCategoryDefinitionCountLateOn() {
|
||||
int iterations = 3;
|
||||
int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2;
|
||||
|
||||
processorUnderTest.setDeleteInterimRequired(false);
|
||||
|
||||
when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
|
||||
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
|
||||
|
||||
AutodetectResult bucketResult = mock(AutodetectResult.class);
|
||||
final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1;
|
||||
for (int i = 0; i < numPriorBuckets; ++i) {
|
||||
Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS);
|
||||
when(bucketResult.getBucket()).thenReturn(bucket);
|
||||
processorUnderTest.processResult(bucketResult);
|
||||
}
|
||||
|
||||
AutodetectResult categoryResult = mock(AutodetectResult.class);
|
||||
for (int iteration = 1; iteration <= iterations; ++iteration) {
|
||||
for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) {
|
||||
CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID);
|
||||
categoryDefinition.setCategoryId(categoryId);
|
||||
when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition);
|
||||
processorUnderTest.processResult(categoryResult);
|
||||
}
|
||||
}
|
||||
|
||||
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
|
||||
verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class));
|
||||
verify(bulkBuilder, times(numPriorBuckets)).executeRequest();
|
||||
verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any());
|
||||
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
|
||||
verify(auditor, never()).warning(eq(JOB_ID), anyString());
|
||||
}
|
||||
|
||||
public void testProcessResult_flushAcknowledgement() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
|
||||
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
|
||||
|
@ -228,12 +288,13 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(bulkBuilder).executeRequest();
|
||||
}
|
||||
|
||||
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception {
|
||||
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
|
||||
when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
|
||||
when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
|
||||
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
|
||||
when(categoryDefinition.getCategoryId()).thenReturn(1L);
|
||||
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
|
||||
|
||||
processorUnderTest.setDeleteInterimRequired(false);
|
||||
|
@ -248,7 +309,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
|
||||
}
|
||||
|
||||
public void testProcessResult_modelPlot() throws Exception {
|
||||
public void testProcessResult_modelPlot() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
ModelPlot modelPlot = mock(ModelPlot.class);
|
||||
when(result.getModelPlot()).thenReturn(modelPlot);
|
||||
|
@ -260,7 +321,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(bulkBuilder).persistModelPlot(modelPlot);
|
||||
}
|
||||
|
||||
public void testProcessResult_modelSizeStats() throws Exception {
|
||||
public void testProcessResult_modelSizeStats() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
|
||||
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
|
||||
|
@ -273,7 +334,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(persister).persistModelSizeStats(eq(modelSizeStats), any());
|
||||
}
|
||||
|
||||
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception {
|
||||
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
|
||||
TimeValue delay = TimeValue.timeValueSeconds(5);
|
||||
// Set up schedule delay time
|
||||
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
|
||||
|
@ -313,7 +374,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
|
||||
}
|
||||
|
||||
public void testProcessResult_modelSnapshot() throws Exception {
|
||||
public void testProcessResult_modelSnapshot() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
|
||||
.setSnapshotId("a_snapshot_id")
|
||||
|
@ -337,7 +398,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
|
||||
}
|
||||
|
||||
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception {
|
||||
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
Quantiles quantiles = mock(Quantiles.class);
|
||||
when(result.getQuantiles()).thenReturn(quantiles);
|
||||
|
@ -354,7 +415,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
|
|||
verify(renormalizer).renormalize(quantiles);
|
||||
}
|
||||
|
||||
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception {
|
||||
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
Quantiles quantiles = mock(Quantiles.class);
|
||||
when(result.getQuantiles()).thenReturn(quantiles);
|
||||
|
|
Loading…
Reference in New Issue