From 295665b1ea941e65b6d2f2b40797e618c51b2229 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 17 Jan 2020 16:25:06 +0000 Subject: [PATCH] [ML] Add audit warning for 1000 categories found early in job (#51146) If 1000 different category definitions are created for a job in the first 100 buckets it processes then an audit warning will now be created. (This will cause a yellow warning triangle in the ML UI's jobs list.) Such a large number of categories suggests that the field that categorization is working on is not well suited to the ML categorization functionality. --- .../xpack/core/ml/job/messages/Messages.java | 2 + .../output/AutodetectResultProcessor.java | 34 ++++++-- .../AutodetectResultProcessorTests.java | 87 ++++++++++++++++--- 3 files changed, 104 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index ef0fcd4fdb1..dbebc580b57 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -135,6 +135,8 @@ public final class Messages { "Adjust the analysis_limits.model_memory_limit setting to ensure all data is analyzed"; public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2 = "Job memory status changed to hard_limit at {0}; adjust the " + "analysis_limits.model_memory_limit setting to ensure all data is analyzed"; + public static final String JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES = "{0} categories observed in the first [{1}] buckets." + + " This suggests an inappropriate categorization_field_name has been chosen."; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates"; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY = diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 422f13926d4..51043d4da42 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -74,6 +74,9 @@ public class AutodetectResultProcessor { private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class); + static final long EARLY_BUCKET_THRESHOLD = 100; + static final int EXCESSIVE_EARLY_CATEGORY_COUNT = 1000; + private final Client client; private final AnomalyDetectionAuditor auditor; private final String jobId; @@ -87,7 +90,9 @@ public class AutodetectResultProcessor { private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; - private int bucketCount; // only used from the process() thread, so doesn't need to be volatile + private long priorRunsBucketCount; + private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile + private boolean excessiveCategoryWarningIssued; // only used from the process() thread, so doesn't need to be volatile private final JobResultsPersister.Builder bulkResultsPersister; private boolean deleteInterimRequired; @@ -122,6 +127,7 @@ public class AutodetectResultProcessor { this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive); this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister); this.deleteInterimRequired = true; + this.priorRunsBucketCount = timingStats.getBucketCount(); } public void process() { @@ -140,7 +146,7 @@ public class AutodetectResultProcessor { } catch (Exception e) { LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); } - LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); + LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount); } catch (Exception e) { failed = true; @@ -166,7 +172,7 @@ public class AutodetectResultProcessor { } private void readResults() { - bucketCount = 0; + currentRunBucketCount = 0; try { Iterator iterator = process.readAutodetectResults(); while (iterator.hasNext()) { @@ -174,7 +180,7 @@ public class AutodetectResultProcessor { AutodetectResult result = iterator.next(); processResult(result); if (result.getBucket() != null) { - LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); + LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount); } } catch (Exception e) { if (isAlive() == false) { @@ -212,7 +218,7 @@ public class AutodetectResultProcessor { // results are also interim timingStatsReporter.reportBucket(bucket); bulkResultsPersister.persistBucket(bucket).executeRequest(); - ++bucketCount; + ++currentRunBucketCount; } List records = result.getRecords(); if (records != null && !records.isEmpty()) { @@ -224,7 +230,7 @@ public class AutodetectResultProcessor { } CategoryDefinition categoryDefinition = result.getCategoryDefinition(); if (categoryDefinition != null) { - persister.persistCategoryDefinition(categoryDefinition, this::isAlive); + processCategoryDefinition(categoryDefinition); } ModelPlot modelPlot = result.getModelPlot(); if (modelPlot != null) { @@ -308,6 +314,22 @@ public class AutodetectResultProcessor { } } + private void processCategoryDefinition(CategoryDefinition categoryDefinition) { + persister.persistCategoryDefinition(categoryDefinition, this::isAlive); + if (categoryDefinition.getCategoryId() == EXCESSIVE_EARLY_CATEGORY_COUNT && + priorRunsBucketCount + currentRunBucketCount < EARLY_BUCKET_THRESHOLD && + excessiveCategoryWarningIssued == false) { + auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, EXCESSIVE_EARLY_CATEGORY_COUNT, + // Add 1 because category definitions are written before buckets + 1L + priorRunsBucketCount + currentRunBucketCount)); + // This flag won't be retained if the job is closed and reopened, or if the job migrates to another node. + // This means it's possible the audit message is generated multiple times. However, that's not a + // disaster, and is also very unlikely in the the (best practice) cases where initial lookback covers + // more than 100 buckets. + excessiveCategoryWarningIssued = true; + } + } + private void processModelSizeStats(ModelSizeStats modelSizeStats) { LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index cf3a915e36c..6cf130f7967 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -133,7 +133,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).commitStateWrites(JOB_ID); } - public void testProcessResult_bucket() throws Exception { + public void testProcessResult_bucket() { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); @@ -150,7 +150,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister, never()).deleteInterimResults(JOB_ID); } - public void testProcessResult_bucket_deleteInterimRequired() throws Exception { + public void testProcessResult_bucket_deleteInterimRequired() { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); @@ -167,7 +167,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).deleteInterimResults(JOB_ID); } - public void testProcessResult_records() throws Exception { + public void testProcessResult_records() { AutodetectResult result = mock(AutodetectResult.class); List records = Arrays.asList( @@ -183,7 +183,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_influencers() throws Exception { + public void testProcessResult_influencers() { AutodetectResult result = mock(AutodetectResult.class); List influencers = Arrays.asList( @@ -199,9 +199,10 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_categoryDefinition() throws Exception { + public void testProcessResult_categoryDefinition() { AutodetectResult result = mock(AutodetectResult.class); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); + when(categoryDefinition.getCategoryId()).thenReturn(1L); when(result.getCategoryDefinition()).thenReturn(categoryDefinition); processorUnderTest.setDeleteInterimRequired(false); @@ -212,7 +213,66 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); } - public void testProcessResult_flushAcknowledgement() throws Exception { + public void testProcessResult_excessiveCategoryDefinitionCountEarly() { + int iterations = 3; + int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2; + + processorUnderTest.setDeleteInterimRequired(false); + + AutodetectResult result = mock(AutodetectResult.class); + for (int iteration = 1; iteration <= iterations; ++iteration) { + for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) { + CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID); + categoryDefinition.setCategoryId(categoryId); + when(result.getCategoryDefinition()).thenReturn(categoryDefinition); + + processorUnderTest.processResult(result); + } + } + + verify(bulkBuilder, never()).executeRequest(); + verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(auditor).warning(eq(JOB_ID), eq(Messages.getMessage(Messages.JOB_AUDIT_EXCESSIVE_EARLY_CATEGORIES, + AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT, 1))); + } + + public void testProcessResult_highCategoryDefinitionCountLateOn() { + int iterations = 3; + int categoryCount = AutodetectResultProcessor.EXCESSIVE_EARLY_CATEGORY_COUNT * 2; + + processorUnderTest.setDeleteInterimRequired(false); + + when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); + when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); + + AutodetectResult bucketResult = mock(AutodetectResult.class); + final int numPriorBuckets = (int) AutodetectResultProcessor.EARLY_BUCKET_THRESHOLD + 1; + for (int i = 0; i < numPriorBuckets; ++i) { + Bucket bucket = new Bucket(JOB_ID, new Date(i * 1000 + 1000000), BUCKET_SPAN_MS); + when(bucketResult.getBucket()).thenReturn(bucket); + processorUnderTest.processResult(bucketResult); + } + + AutodetectResult categoryResult = mock(AutodetectResult.class); + for (int iteration = 1; iteration <= iterations; ++iteration) { + for (int categoryId = 1; categoryId <= categoryCount; ++categoryId) { + CategoryDefinition categoryDefinition = new CategoryDefinition(JOB_ID); + categoryDefinition.setCategoryId(categoryId); + when(categoryResult.getCategoryDefinition()).thenReturn(categoryDefinition); + processorUnderTest.processResult(categoryResult); + } + } + + verify(bulkBuilder).persistTimingStats(any(TimingStats.class)); + verify(bulkBuilder, times(numPriorBuckets)).persistBucket(any(Bucket.class)); + verify(bulkBuilder, times(numPriorBuckets)).executeRequest(); + verify(persister, times(iterations * categoryCount)).persistCategoryDefinition(any(CategoryDefinition.class), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(auditor, never()).warning(eq(JOB_ID), anyString()); + } + + public void testProcessResult_flushAcknowledgement() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); @@ -228,12 +288,13 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder).executeRequest(); } - public void testProcessResult_flushAcknowledgementMustBeProcessedLast() throws Exception { + public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { AutodetectResult result = mock(AutodetectResult.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(flushAcknowledgement.getId()).thenReturn(JOB_ID); when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); + when(categoryDefinition.getCategoryId()).thenReturn(1L); when(result.getCategoryDefinition()).thenReturn(categoryDefinition); processorUnderTest.setDeleteInterimRequired(false); @@ -248,7 +309,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); } - public void testProcessResult_modelPlot() throws Exception { + public void testProcessResult_modelPlot() { AutodetectResult result = mock(AutodetectResult.class); ModelPlot modelPlot = mock(ModelPlot.class); when(result.getModelPlot()).thenReturn(modelPlot); @@ -260,7 +321,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder).persistModelPlot(modelPlot); } - public void testProcessResult_modelSizeStats() throws Exception { + public void testProcessResult_modelSizeStats() { AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); when(result.getModelSizeStats()).thenReturn(modelSizeStats); @@ -273,7 +334,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).persistModelSizeStats(eq(modelSizeStats), any()); } - public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() throws Exception { + public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { TimeValue delay = TimeValue.timeValueSeconds(5); // Set up schedule delay time when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString())) @@ -313,7 +374,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb")); } - public void testProcessResult_modelSnapshot() throws Exception { + public void testProcessResult_modelSnapshot() { AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) .setSnapshotId("a_snapshot_id") @@ -337,7 +398,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); } - public void testProcessResult_quantiles_givenRenormalizationIsEnabled() throws Exception { + public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles); @@ -354,7 +415,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(renormalizer).renormalize(quantiles); } - public void testProcessResult_quantiles_givenRenormalizationIsDisabled() throws Exception { + public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles);