From 42eae8b3be6f4cbec4fb98682d174f3140ae1179 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 22 Mar 2018 14:04:37 +0000 Subject: [PATCH] [ML] Notify job memory status changes (elastic/x-pack-elasticsearch#4187) This commit adds job notifications when the memory status changes. This ensures a job reaching its memory limit is communicated more visibly to the user so action can be taken. relates elastic/x-pack-elasticsearch#4173 Original commit: elastic/x-pack-elasticsearch@c7362bd4bce6002a73908de5807a306b1b2e2a47 --- .../xpack/core/ml/job/messages/Messages.java | 4 ++ .../xpack/ml/MachineLearning.java | 6 +-- .../TransportDeleteExpiredDataAction.java | 2 +- .../autodetect/AutodetectProcessManager.java | 2 +- .../output/AutoDetectResultProcessor.java | 28 +++++++++-- .../xpack/ml/notifications/Auditor.java | 13 +++--- .../AutodetectResultProcessorIT.java | 10 ++-- .../AutoDetectResultProcessorTests.java | 46 ++++++++++++++++++- .../xpack/ml/notifications/AuditorTests.java | 13 ++---- 9 files changed, 92 insertions(+), 32 deletions(-) diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 18f3f7e1988..2db647b3cc9 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -71,6 +71,10 @@ public final class Messages { public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process"; public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process"; + public static final String JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT = "Job memory status changed to soft_limit; memory pruning will now be " + + "more aggressive"; + public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT = "Job memory status changed to hard_limit at {0}; adjust the " + + "analysis_limits.model_memory_limit setting to ensure all data is analyzed"; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates"; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY = diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 4af6d4f530c..c5402152fea 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -75,7 +75,6 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction; -import org.elasticsearch.xpack.core.ml.action.MlInfoAction; import org.elasticsearch.xpack.core.ml.action.GetJobsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; @@ -83,6 +82,7 @@ import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; +import org.elasticsearch.xpack.core.ml.action.MlInfoAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; @@ -126,7 +126,6 @@ import org.elasticsearch.xpack.ml.action.TransportGetDatafeedsAction; import org.elasticsearch.xpack.ml.action.TransportGetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.TransportGetFiltersAction; import org.elasticsearch.xpack.ml.action.TransportGetInfluencersAction; -import org.elasticsearch.xpack.ml.action.TransportMlInfoAction; import org.elasticsearch.xpack.ml.action.TransportGetJobsAction; import org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction; import org.elasticsearch.xpack.ml.action.TransportGetModelSnapshotsAction; @@ -134,6 +133,7 @@ import org.elasticsearch.xpack.ml.action.TransportGetOverallBucketsAction; import org.elasticsearch.xpack.ml.action.TransportGetRecordsAction; import org.elasticsearch.xpack.ml.action.TransportIsolateDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportKillProcessAction; +import org.elasticsearch.xpack.ml.action.TransportMlInfoAction; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction; import org.elasticsearch.xpack.ml.action.TransportPersistJobAction; import org.elasticsearch.xpack.ml.action.TransportPostCalendarEventsAction; @@ -356,7 +356,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu return emptyList(); } - Auditor auditor = new Auditor(client, clusterService); + Auditor auditor = new Auditor(client, clusterService.nodeName()); JobProvider jobProvider = new JobProvider(client, settings); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool); JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService, auditor, client, notifier); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index e938cb5c264..6cf06695c7c 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -51,7 +51,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction listener) { - Auditor auditor = new Auditor(client, clusterService); + Auditor auditor = new Auditor(client, clusterService.nodeName()); List dataRemovers = Arrays.asList( new ExpiredResultsRemover(client, clusterService, auditor), new ExpiredForecastsRemover(client), diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 15b4778249b..1f4f83025b5 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -417,7 +417,7 @@ public class AutodetectProcessManager extends AbstractComponent { AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, onProcessCrash(jobTask)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), + client, auditor, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 0ac9c4ec969..d707af422c0 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -12,10 +12,13 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -32,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import org.elasticsearch.xpack.ml.notifications.Auditor; import java.time.Duration; import java.util.Date; @@ -68,6 +72,7 @@ public class AutoDetectResultProcessor { private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class); private final Client client; + private final Auditor auditor; private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; @@ -88,15 +93,16 @@ public class AutoDetectResultProcessor { private volatile long latestEstablishedModelMemory; private volatile boolean haveNewLatestModelSizeStats; - public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, + public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { - this(client, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener()); + this(client, auditor, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener()); } - AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, + AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot, FlushListener flushListener) { this.client = Objects.requireNonNull(client); + this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); this.renormalizer = Objects.requireNonNull(renormalizer); this.persister = Objects.requireNonNull(persister); @@ -284,9 +290,11 @@ public class AutoDetectResultProcessor { modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(), modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus()); + persister.persistModelSizeStats(modelSizeStats); + notifyModelMemoryStatusChange(context, modelSizeStats); latestModelSizeStats = modelSizeStats; haveNewLatestModelSizeStats = true; - persister.persistModelSizeStats(modelSizeStats); + // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and // we'll NEVER consider memory usage to be established during this period @@ -297,6 +305,18 @@ public class AutoDetectResultProcessor { } } + private void notifyModelMemoryStatusChange(Context context, ModelSizeStats modelSizeStats) { + ModelSizeStats.MemoryStatus memoryStatus = modelSizeStats.getMemoryStatus(); + if (memoryStatus != latestModelSizeStats.getMemoryStatus()) { + if (memoryStatus == ModelSizeStats.MemoryStatus.SOFT_LIMIT) { + auditor.warning(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT)); + } else if (memoryStatus == ModelSizeStats.MemoryStatus.HARD_LIMIT) { + auditor.error(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, + new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES).toString())); + } + } + } + protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { JobUpdate update = new JobUpdate.Builder(jobId) .setModelSnapshotId(modelSnapshot.getSnapshotId()) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java index 483d5230acb..458ad2a8fa3 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/Auditor.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -31,23 +30,23 @@ public class Auditor { private static final Logger LOGGER = Loggers.getLogger(Auditor.class); private final Client client; - private final ClusterService clusterService; + private final String nodeName; - public Auditor(Client client, ClusterService clusterService) { + public Auditor(Client client, String nodeName) { this.client = Objects.requireNonNull(client); - this.clusterService = clusterService; + this.nodeName = Objects.requireNonNull(nodeName); } public void info(String jobId, String message) { - indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message, clusterService.localNode().getName())); + indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message, nodeName)); } public void warning(String jobId, String message) { - indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message, clusterService.localNode().getName())); + indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message, nodeName)); } public void error(String jobId, String message) { - indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message, clusterService.localNode().getName())); + indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message, nodeName)); } private void indexDoc(String type, ToXContent toXContent) { diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index a6eb3355f9c..484d1648fbb 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -22,9 +22,6 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; -import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; -import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -37,8 +34,11 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; +import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.BucketTests; import org.elasticsearch.xpack.ml.job.results.CategoryDefinitionTests; import org.elasticsearch.xpack.ml.job.results.ModelPlotTests; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.After; import org.junit.Before; @@ -96,10 +97,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { public void createComponents() throws Exception { Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); + Auditor auditor = new Auditor(client(), "test_node"); jobProvider = new JobProvider(client(), builder.build()); renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); - resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, + resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 2d44bd86127..1221f85e61d 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -11,11 +11,14 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -30,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.InOrder; @@ -62,6 +66,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private static final String JOB_ID = "_id"; private Client client; + private Auditor auditor; private Renormalizer renormalizer; private JobResultsPersister persister; private JobProvider jobProvider; @@ -71,6 +76,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { @Before public void setUpMocks() { client = mock(Client.class); + auditor = mock(Auditor.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); @@ -78,7 +84,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { persister = mock(JobResultsPersister.class); jobProvider = mock(JobProvider.class); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, jobProvider, + processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false, flushListener); } @@ -276,10 +282,46 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(persister, times(1)).persistModelSizeStats(modelSizeStats); verifyNoMoreInteractions(persister); // No interactions with the jobProvider confirms that the established memory calculation did not run - verifyNoMoreInteractions(jobProvider); + verifyNoMoreInteractions(jobProvider, auditor); assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); } + public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + context.deleteInterimRequired = false; + AutodetectResult result = mock(AutodetectResult.class); + + // First one with soft_limit + ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + + // Another with soft_limit + modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + + // Now with hard_limit + modelSizeStats = new ModelSizeStats.Builder(JOB_ID) + .setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT) + .setModelBytes(new ByteSizeValue(512, ByteSizeUnit.MB).getBytes()) + .build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + + // And another with hard_limit + modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT).build(); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + + // We should have only fired to notifications: one for soft_limit and one for hard_limit + verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT)); + verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb")); + verifyNoMoreInteractions(auditor); + } + public void testProcessResult_modelSizeStatsAfterManyBuckets() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java index dba42f7c927..e74d1a7f2de 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/notifications/AuditorTests.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.ml.notifications; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -34,7 +32,6 @@ import static org.mockito.Mockito.when; public class AuditorTests extends ESTestCase { private Client client; - private ClusterService clusterService; private ArgumentCaptor indexRequestCaptor; @Before @@ -43,16 +40,12 @@ public class AuditorTests extends ESTestCase { ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - clusterService = mock(ClusterService.class); - DiscoveryNode dNode = mock(DiscoveryNode.class); - when(dNode.getName()).thenReturn("this_node_has_a_name"); - when(clusterService.localNode()).thenReturn(dNode); indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); } public void testInfo() throws IOException { - Auditor auditor = new Auditor(client, clusterService); + Auditor auditor = new Auditor(client, "node_1"); auditor.info("foo", "Here is my info"); verify(client).index(indexRequestCaptor.capture(), any()); @@ -67,7 +60,7 @@ public class AuditorTests extends ESTestCase { } public void testWarning() throws IOException { - Auditor auditor = new Auditor(client, clusterService); + Auditor auditor = new Auditor(client, "node_1"); auditor.warning("bar", "Here is my warning"); verify(client).index(indexRequestCaptor.capture(), any()); @@ -82,7 +75,7 @@ public class AuditorTests extends ESTestCase { } public void testError() throws IOException { - Auditor auditor = new Auditor(client, clusterService); + Auditor auditor = new Auditor(client, "node_1"); auditor.error("foobar", "Here is my error"); verify(client).index(indexRequestCaptor.capture(), any());