[ML] Add notification for loading snapshot (elastic/x-pack-elasticsearch#970)

As the snapshot that is loaded is an important operational
aspect of a job, this change adds a notification that displays
the loaded snapshot with its latest_record_timestamp and the
job's latest_record_timestamp. Having both allows us to discover
when a job is recovering after a node failure.

relates elastic/x-pack-elasticsearch#872

Original commit: elastic/x-pack-elasticsearch@c2dee495a2
This commit is contained in:
Dimitris Athanasiou 2017-04-05 16:43:14 +01:00 committed by GitHub
parent 7b48bac9f4
commit 0be4082ad7
3 changed files with 112 additions and 11 deletions

View File

@ -296,7 +296,7 @@ public class MachineLearning implements ActionPlugin {
threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry); normalizerFactory, xContentRegistry, auditor);
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, jobProvider, DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, jobProvider,
System::currentTimeMillis, auditor, persistentTasksService); System::currentTimeMillis, auditor, persistentTasksService);

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -39,10 +40,12 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -50,6 +53,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -62,7 +66,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -90,13 +93,15 @@ public class AutodetectProcessManager extends AbstractComponent {
private final int maxAllowedRunningJobs; private final int maxAllowedRunningJobs;
private NamedXContentRegistry xContentRegistry; private final NamedXContentRegistry xContentRegistry;
private final Auditor auditor;
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool,
JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NamedXContentRegistry xContentRegistry) { NamedXContentRegistry xContentRegistry, Auditor auditor) {
super(settings); super(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -106,11 +111,10 @@ public class AutodetectProcessManager extends AbstractComponent {
this.normalizerFactory = normalizerFactory; this.normalizerFactory = normalizerFactory;
this.jobManager = jobManager; this.jobManager = jobManager;
this.jobProvider = jobProvider; this.jobProvider = jobProvider;
this.jobResultsPersister = jobResultsPersister; this.jobResultsPersister = jobResultsPersister;
this.jobDataCountsPersister = jobDataCountsPersister; this.jobDataCountsPersister = jobDataCountsPersister;
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
this.auditor = auditor;
} }
public synchronized void closeAllJobs(String reason) throws IOException { public synchronized void closeAllJobs(String reason) throws IOException {
@ -239,12 +243,18 @@ public class AutodetectProcessManager extends AbstractComponent {
RestStatus.TOO_MANY_REQUESTS); RestStatus.TOO_MANY_REQUESTS);
} }
notifyLoadingSnapshot(jobId, autodetectParams);
if (autodetectParams.dataCounts().getProcessedRecordCount() > 0) { if (autodetectParams.dataCounts().getProcessedRecordCount() > 0) {
if (autodetectParams.modelSnapshot() == null) { if (autodetectParams.modelSnapshot() == null) {
logger.warn("[{}] No model snapshot could be found for a job with processed records", jobId); String msg = "No model snapshot could be found for a job with processed records";
logger.warn("[{}] {}", jobId, msg);
auditor.warning(jobId, "No model snapshot could be found for a job with processed records");
} }
if (autodetectParams.quantiles() == null) { if (autodetectParams.quantiles() == null) {
logger.warn("[{}] No quantiles could be found for a job with processed records", jobId); String msg = "No quantiles could be found for a job with processed records";
logger.warn("[{}] {}", jobId);
auditor.warning(jobId, msg);
} }
} }
@ -284,6 +294,29 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) {
ModelSnapshot modelSnapshot = autodetectParams.modelSnapshot();
StringBuilder msgBuilder = new StringBuilder("Loading model snapshot [");
if (modelSnapshot == null) {
msgBuilder.append("N/A");
} else {
msgBuilder.append(modelSnapshot.getSnapshotId());
msgBuilder.append("] with latest_record_timestamp [");
Date snapshotLatestRecordTimestamp = modelSnapshot.getLatestRecordTimeStamp();
msgBuilder.append(snapshotLatestRecordTimestamp == null ? "N/A" :
XContentBuilder.DEFAULT_DATE_PRINTER.print(
snapshotLatestRecordTimestamp.getTime()));
}
msgBuilder.append("], job latest_record_timestamp [");
Date jobLatestRecordTimestamp = autodetectParams.dataCounts().getLatestRecordTimeStamp();
msgBuilder.append(jobLatestRecordTimestamp == null ? "N/A"
: XContentBuilder.DEFAULT_DATE_PRINTER.print(jobLatestRecordTimestamp.getTime()));
msgBuilder.append("]");
String msg = msgBuilder.toString();
logger.info("[{}] {}", jobId, msg);
auditor.info(jobId, msg);
}
/** /**
* Stop the running job and mark it as finished.<br> * Stop the running job and mark it as finished.<br>
* *

View File

@ -38,6 +38,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -62,6 +63,7 @@ import static org.elasticsearch.mock.orig.Mockito.doReturn;
import static org.elasticsearch.mock.orig.Mockito.doThrow; import static org.elasticsearch.mock.orig.Mockito.doThrow;
import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -85,6 +87,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private JobResultsPersister jobResultsPersister; private JobResultsPersister jobResultsPersister;
private JobDataCountsPersister jobDataCountsPersister; private JobDataCountsPersister jobDataCountsPersister;
private NormalizerFactory normalizerFactory; private NormalizerFactory normalizerFactory;
private Auditor auditor;
private DataCounts dataCounts = new DataCounts("foo"); private DataCounts dataCounts = new DataCounts("foo");
private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build();
@ -100,6 +103,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class));
jobDataCountsPersister = mock(JobDataCountsPersister.class); jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class); normalizerFactory = mock(NormalizerFactory.class);
auditor = mock(Auditor.class);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
@ -147,7 +151,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3); settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()))); normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor));
doReturn(executorService).when(manager).createAutodetectExecutorService(any()); doReturn(executorService).when(manager).createAutodetectExecutorService(any());
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
@ -328,7 +332,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess; (j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList())); normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
JobTask jobTask = mock(JobTask.class); JobTask jobTask = mock(JobTask.class);
expectThrows(EsRejectedExecutionException.class, expectThrows(EsRejectedExecutionException.class,
@ -336,6 +340,69 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(autodetectProcess, times(1)).close(); verify(autodetectProcess, times(1)).close();
} }
public void testCreate_givenFirstTime() throws IOException {
modelSnapshot = null;
AutodetectProcessManager manager = createNonSpyManager("foo");
JobTask jobTask = mock(JobTask.class);
manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {});
String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]";
verify(auditor).info("foo", expectedNotification);
verifyNoMoreInteractions(auditor);
}
public void testCreate_givenExistingModelSnapshot() throws IOException {
modelSnapshot = new ModelSnapshot.Builder("foo").setSnapshotId("snapshot-1")
.setLatestRecordTimeStamp(new Date(0L)).build();
dataCounts = new DataCounts("foo");
dataCounts.setLatestRecordTimeStamp(new Date(1L));
AutodetectProcessManager manager = createNonSpyManager("foo");
JobTask jobTask = mock(JobTask.class);
manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {});
String expectedNotification = "Loading model snapshot [snapshot-1] with " +
"latest_record_timestamp [1970-01-01T00:00:00.000Z], " +
"job latest_record_timestamp [1970-01-01T00:00:00.001Z]";
verify(auditor).info("foo", expectedNotification);
verifyNoMoreInteractions(auditor);
}
public void testCreate_givenNonZeroCountsAndNoModelSnapshotNorQuantiles() throws IOException {
modelSnapshot = null;
quantiles = null;
dataCounts = new DataCounts("foo");
dataCounts.setLatestRecordTimeStamp(new Date(0L));
dataCounts.incrementProcessedRecordCount(42L);
AutodetectProcessManager manager = createNonSpyManager("foo");
JobTask jobTask = mock(JobTask.class);
manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {});
String expectedNotification = "Loading model snapshot [N/A], " +
"job latest_record_timestamp [1970-01-01T00:00:00.000Z]";
verify(auditor).info("foo", expectedNotification);
verify(auditor).warning("foo", "No model snapshot could be found for a job with processed records");
verify(auditor).warning("foo", "No quantiles could be found for a job with processed records");
verifyNoMoreInteractions(auditor);
}
private AutodetectProcessManager createNonSpyManager(String jobId) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
when(threadPool.executor(anyString())).thenReturn(executorService);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mock(ThreadPool.Cancellable.class));
when(jobManager.getJobOrThrowIfUnknown(jobId)).thenReturn(createJobDetails(jobId));
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory =
(j, modelSnapshot, quantiles, filters, i, e, onProcessCrash) -> autodetectProcess;
return new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
}
private AutodetectParams buildAutodetectParams() { private AutodetectParams buildAutodetectParams() {
return new AutodetectParams.Builder("foo") return new AutodetectParams.Builder("foo")
.setDataCounts(dataCounts) .setDataCounts(dataCounts)
@ -357,7 +424,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,
threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister,
autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList())); autodetectProcessFactory, normalizerFactory,
new NamedXContentRegistry(Collections.emptyList()), auditor);
manager = spy(manager); manager = spy(manager);
doReturn(communicator).when(manager).create(any(), any(), eq(buildAutodetectParams()), anyBoolean(), any()); doReturn(communicator).when(manager).create(any(), any(), eq(buildAutodetectParams()), anyBoolean(), any());
return manager; return manager;