From 0be4082ad758da68c43b65a1829ae6fd47fb1ee9 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 5 Apr 2017 16:43:14 +0100 Subject: [PATCH] [ML] Add notification for loading snapshot (elastic/x-pack-elasticsearch#970) As the snapshot that is loaded is an important operational aspect of a job, this change adds a notification that displays the loaded snapshot with its latest_record_timestamp and the job's latest_record_timestamp. Having both allows us to discover when a job is recovering after a node failure. relates elastic/x-pack-elasticsearch#872 Original commit: elastic/x-pack-elasticsearch@c2dee495a21f5be87cd24dea4f4d766b89916072 --- .../xpack/ml/MachineLearning.java | 2 +- .../autodetect/AutodetectProcessManager.java | 47 ++++++++++-- .../AutodetectProcessManagerTests.java | 74 ++++++++++++++++++- 3 files changed, 112 insertions(+), 11 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b6d6ea02264..2c160593fa6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -296,7 +296,7 @@ public class MachineLearning implements ActionPlugin { threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, xContentRegistry); + normalizerFactory, xContentRegistry, auditor); PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, jobProvider, System::currentTimeMillis, auditor, persistentTasksService); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 7ca2f44cc06..1eaf0fb8408 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; @@ -39,10 +40,12 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -50,6 +53,7 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; +import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; @@ -62,7 +66,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -90,13 +93,15 @@ public class AutodetectProcessManager extends AbstractComponent { private final int maxAllowedRunningJobs; - private NamedXContentRegistry xContentRegistry; + private final NamedXContentRegistry xContentRegistry; + + private final Auditor auditor; public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, - NamedXContentRegistry xContentRegistry) { + NamedXContentRegistry xContentRegistry, Auditor auditor) { super(settings); this.client = client; this.threadPool = threadPool; @@ -106,11 +111,10 @@ public class AutodetectProcessManager extends AbstractComponent { this.normalizerFactory = normalizerFactory; this.jobManager = jobManager; this.jobProvider = jobProvider; - this.jobResultsPersister = jobResultsPersister; this.jobDataCountsPersister = jobDataCountsPersister; - this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); + this.auditor = auditor; } public synchronized void closeAllJobs(String reason) throws IOException { @@ -239,12 +243,18 @@ public class AutodetectProcessManager extends AbstractComponent { RestStatus.TOO_MANY_REQUESTS); } + notifyLoadingSnapshot(jobId, autodetectParams); + if (autodetectParams.dataCounts().getProcessedRecordCount() > 0) { if (autodetectParams.modelSnapshot() == null) { - logger.warn("[{}] No model snapshot could be found for a job with processed records", jobId); + String msg = "No model snapshot could be found for a job with processed records"; + logger.warn("[{}] {}", jobId, msg); + auditor.warning(jobId, "No model snapshot could be found for a job with processed records"); } if (autodetectParams.quantiles() == null) { - logger.warn("[{}] No quantiles could be found for a job with processed records", jobId); + String msg = "No quantiles could be found for a job with processed records"; + logger.warn("[{}] {}", jobId); + auditor.warning(jobId, msg); } } @@ -284,6 +294,29 @@ public class AutodetectProcessManager extends AbstractComponent { } + private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) { + ModelSnapshot modelSnapshot = autodetectParams.modelSnapshot(); + StringBuilder msgBuilder = new StringBuilder("Loading model snapshot ["); + if (modelSnapshot == null) { + msgBuilder.append("N/A"); + } else { + msgBuilder.append(modelSnapshot.getSnapshotId()); + msgBuilder.append("] with latest_record_timestamp ["); + Date snapshotLatestRecordTimestamp = modelSnapshot.getLatestRecordTimeStamp(); + msgBuilder.append(snapshotLatestRecordTimestamp == null ? "N/A" : + XContentBuilder.DEFAULT_DATE_PRINTER.print( + snapshotLatestRecordTimestamp.getTime())); + } + msgBuilder.append("], job latest_record_timestamp ["); + Date jobLatestRecordTimestamp = autodetectParams.dataCounts().getLatestRecordTimeStamp(); + msgBuilder.append(jobLatestRecordTimestamp == null ? "N/A" + : XContentBuilder.DEFAULT_DATE_PRINTER.print(jobLatestRecordTimestamp.getTime())); + msgBuilder.append("]"); + String msg = msgBuilder.toString(); + logger.info("[{}] {}", jobId, msg); + auditor.info(jobId, msg); + } + /** * Stop the running job and mark it as finished.
* diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 3da2859e6cd..e6f318e08fa 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.Mockito; @@ -62,6 +63,7 @@ import static org.elasticsearch.mock.orig.Mockito.doReturn; import static org.elasticsearch.mock.orig.Mockito.doThrow; import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verify; +import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; @@ -85,6 +87,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobResultsPersister jobResultsPersister; private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; + private Auditor auditor; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -100,6 +103,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); + auditor = mock(Auditor.class); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); doAnswer(invocationOnMock -> { @@ -147,7 +151,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList()))); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor)); doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doAnswer(invocationOnMock -> { @@ -328,7 +332,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, new NamedXContentRegistry(Collections.emptyList())); + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); JobTask jobTask = mock(JobTask.class); expectThrows(EsRejectedExecutionException.class, @@ -336,6 +340,69 @@ public class AutodetectProcessManagerTests extends ESTestCase { verify(autodetectProcess, times(1)).close(); } + public void testCreate_givenFirstTime() throws IOException { + modelSnapshot = null; + AutodetectProcessManager manager = createNonSpyManager("foo"); + + JobTask jobTask = mock(JobTask.class); + manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {}); + + String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]"; + verify(auditor).info("foo", expectedNotification); + verifyNoMoreInteractions(auditor); + } + + public void testCreate_givenExistingModelSnapshot() throws IOException { + modelSnapshot = new ModelSnapshot.Builder("foo").setSnapshotId("snapshot-1") + .setLatestRecordTimeStamp(new Date(0L)).build(); + dataCounts = new DataCounts("foo"); + dataCounts.setLatestRecordTimeStamp(new Date(1L)); + AutodetectProcessManager manager = createNonSpyManager("foo"); + + JobTask jobTask = mock(JobTask.class); + manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {}); + + String expectedNotification = "Loading model snapshot [snapshot-1] with " + + "latest_record_timestamp [1970-01-01T00:00:00.000Z], " + + "job latest_record_timestamp [1970-01-01T00:00:00.001Z]"; + verify(auditor).info("foo", expectedNotification); + verifyNoMoreInteractions(auditor); + } + + public void testCreate_givenNonZeroCountsAndNoModelSnapshotNorQuantiles() throws IOException { + modelSnapshot = null; + quantiles = null; + dataCounts = new DataCounts("foo"); + dataCounts.setLatestRecordTimeStamp(new Date(0L)); + dataCounts.incrementProcessedRecordCount(42L); + AutodetectProcessManager manager = createNonSpyManager("foo"); + + JobTask jobTask = mock(JobTask.class); + manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {}); + + String expectedNotification = "Loading model snapshot [N/A], " + + "job latest_record_timestamp [1970-01-01T00:00:00.000Z]"; + verify(auditor).info("foo", expectedNotification); + verify(auditor).warning("foo", "No model snapshot could be found for a job with processed records"); + verify(auditor).warning("foo", "No quantiles could be found for a job with processed records"); + verifyNoMoreInteractions(auditor); + } + + private AutodetectProcessManager createNonSpyManager(String jobId) { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + when(threadPool.executor(anyString())).thenReturn(executorService); + when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class)); + when(jobManager.getJobOrThrowIfUnknown(jobId)).thenReturn(createJobDetails(jobId)); + AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); + AutodetectProcessFactory autodetectProcessFactory = + (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; + return new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, + jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, + normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor); + } + private AutodetectParams buildAutodetectParams() { return new AutodetectParams.Builder("foo") .setDataCounts(dataCounts) @@ -357,7 +424,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, - autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList())); + autodetectProcessFactory, normalizerFactory, + new NamedXContentRegistry(Collections.emptyList()), auditor); manager = spy(manager); doReturn(communicator).when(manager).create(any(), any(), eq(buildAutodetectParams()), anyBoolean(), any()); return manager;