[ML] Notify job memory status changes (elastic/x-pack-elasticsearch#4187)

This commit adds job notifications when the memory status
changes. This ensures a job reaching its memory limit is
communicated more visibly to the user so action can be taken.

relates elastic/x-pack-elasticsearch#4173

Original commit: elastic/x-pack-elasticsearch@c7362bd4bc
This commit is contained in:
Dimitris Athanasiou 2018-03-22 14:04:37 +00:00 committed by GitHub
parent d12ee3898d
commit 42eae8b3be
9 changed files with 92 additions and 32 deletions

View File

@ -71,6 +71,10 @@ public final class Messages {
public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted";
public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process";
public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process";
public static final String JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT = "Job memory status changed to soft_limit; memory pruning will now be " +
"more aggressive";
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT = "Job memory status changed to hard_limit at {0}; adjust the " +
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =

View File

@ -75,7 +75,6 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction;
import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
@ -83,6 +82,7 @@ import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
@ -126,7 +126,6 @@ import org.elasticsearch.xpack.ml.action.TransportGetDatafeedsAction;
import org.elasticsearch.xpack.ml.action.TransportGetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.TransportGetFiltersAction;
import org.elasticsearch.xpack.ml.action.TransportGetInfluencersAction;
import org.elasticsearch.xpack.ml.action.TransportMlInfoAction;
import org.elasticsearch.xpack.ml.action.TransportGetJobsAction;
import org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.TransportGetModelSnapshotsAction;
@ -134,6 +133,7 @@ import org.elasticsearch.xpack.ml.action.TransportGetOverallBucketsAction;
import org.elasticsearch.xpack.ml.action.TransportGetRecordsAction;
import org.elasticsearch.xpack.ml.action.TransportIsolateDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportKillProcessAction;
import org.elasticsearch.xpack.ml.action.TransportMlInfoAction;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction;
import org.elasticsearch.xpack.ml.action.TransportPersistJobAction;
import org.elasticsearch.xpack.ml.action.TransportPostCalendarEventsAction;
@ -356,7 +356,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
return emptyList();
}
Auditor auditor = new Auditor(client, clusterService);
Auditor auditor = new Auditor(client, clusterService.nodeName());
JobProvider jobProvider = new JobProvider(client, settings);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService, auditor, client, notifier);

View File

@ -51,7 +51,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
}
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService);
Auditor auditor = new Auditor(client, clusterService.nodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client),

View File

@ -417,7 +417,7 @@ public class AutodetectProcessManager extends AbstractComponent {
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
onProcessCrash(jobTask));
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
client, auditor, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
autodetectParams.modelSnapshot() != null);
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {

View File

@ -12,10 +12,13 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@ -32,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.time.Duration;
import java.util.Date;
@ -68,6 +72,7 @@ public class AutoDetectResultProcessor {
private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);
private final Client client;
private final Auditor auditor;
private final String jobId;
private final Renormalizer renormalizer;
private final JobResultsPersister persister;
@ -88,15 +93,16 @@ public class AutoDetectResultProcessor {
private volatile long latestEstablishedModelMemory;
private volatile boolean haveNewLatestModelSizeStats;
public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
this(client, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener());
this(client, auditor, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener());
}
AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot,
FlushListener flushListener) {
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
this.jobId = Objects.requireNonNull(jobId);
this.renormalizer = Objects.requireNonNull(renormalizer);
this.persister = Objects.requireNonNull(persister);
@ -284,9 +290,11 @@ public class AutoDetectResultProcessor {
modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(),
modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus());
persister.persistModelSizeStats(modelSizeStats);
notifyModelMemoryStatusChange(context, modelSizeStats);
latestModelSizeStats = modelSizeStats;
haveNewLatestModelSizeStats = true;
persister.persistModelSizeStats(modelSizeStats);
// This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
// because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
// we'll NEVER consider memory usage to be established during this period
@ -297,6 +305,18 @@ public class AutoDetectResultProcessor {
}
}
private void notifyModelMemoryStatusChange(Context context, ModelSizeStats modelSizeStats) {
ModelSizeStats.MemoryStatus memoryStatus = modelSizeStats.getMemoryStatus();
if (memoryStatus != latestModelSizeStats.getMemoryStatus()) {
if (memoryStatus == ModelSizeStats.MemoryStatus.SOFT_LIMIT) {
auditor.warning(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
} else if (memoryStatus == ModelSizeStats.MemoryStatus.HARD_LIMIT) {
auditor.error(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT,
new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES).toString()));
}
}
}
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId)
.setModelSnapshotId(modelSnapshot.getSnapshotId())

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -31,23 +30,23 @@ public class Auditor {
private static final Logger LOGGER = Loggers.getLogger(Auditor.class);
private final Client client;
private final ClusterService clusterService;
private final String nodeName;
public Auditor(Client client, ClusterService clusterService) {
public Auditor(Client client, String nodeName) {
this.client = Objects.requireNonNull(client);
this.clusterService = clusterService;
this.nodeName = Objects.requireNonNull(nodeName);
}
public void info(String jobId, String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message, clusterService.localNode().getName()));
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message, nodeName));
}
public void warning(String jobId, String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message, clusterService.localNode().getName()));
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message, nodeName));
}
public void error(String jobId, String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message, clusterService.localNode().getName()));
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message, nodeName));
}
private void indexDoc(String type, ToXContent toXContent) {

View File

@ -22,9 +22,6 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@ -37,8 +34,11 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
@ -46,6 +46,7 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.BucketTests;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinitionTests;
import org.elasticsearch.xpack.ml.job.results.ModelPlotTests;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.After;
import org.junit.Before;
@ -96,10 +97,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
public void createComponents() throws Exception {
Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
Auditor auditor = new Auditor(client(), "test_node");
jobProvider = new JobProvider(client(), builder.build());
renormalizer = mock(Renormalizer.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer,
resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) {
@Override
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {

View File

@ -11,11 +11,14 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@ -30,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.InOrder;
@ -62,6 +66,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
private static final String JOB_ID = "_id";
private Client client;
private Auditor auditor;
private Renormalizer renormalizer;
private JobResultsPersister persister;
private JobProvider jobProvider;
@ -71,6 +76,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
@Before
public void setUpMocks() {
client = mock(Client.class);
auditor = mock(Auditor.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
@ -78,7 +84,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
persister = mock(JobResultsPersister.class);
jobProvider = mock(JobProvider.class);
flushListener = mock(FlushListener.class);
processorUnderTest = new AutoDetectResultProcessor(client, JOB_ID, renormalizer, persister, jobProvider,
processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobProvider,
new ModelSizeStats.Builder(JOB_ID).build(), false, flushListener);
}
@ -276,10 +282,46 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
verifyNoMoreInteractions(persister);
// No interactions with the jobProvider confirms that the established memory calculation did not run
verifyNoMoreInteractions(jobProvider);
verifyNoMoreInteractions(jobProvider, auditor);
assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
}
public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
// First one with soft_limit
ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
// Another with soft_limit
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
// Now with hard_limit
modelSizeStats = new ModelSizeStats.Builder(JOB_ID)
.setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT)
.setModelBytes(new ByteSizeValue(512, ByteSizeUnit.MB).getBytes())
.build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
// And another with hard_limit
modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT).build();
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(context, result);
// We should have only fired to notifications: one for soft_limit and one for hard_limit
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb"));
verifyNoMoreInteractions(auditor);
}
public void testProcessResult_modelSizeStatsAfterManyBuckets() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);

View File

@ -7,8 +7,6 @@ package org.elasticsearch.xpack.ml.notifications;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -34,7 +32,6 @@ import static org.mockito.Mockito.when;
public class AuditorTests extends ESTestCase {
private Client client;
private ClusterService clusterService;
private ArgumentCaptor<IndexRequest> indexRequestCaptor;
@Before
@ -43,16 +40,12 @@ public class AuditorTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
clusterService = mock(ClusterService.class);
DiscoveryNode dNode = mock(DiscoveryNode.class);
when(dNode.getName()).thenReturn("this_node_has_a_name");
when(clusterService.localNode()).thenReturn(dNode);
indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
}
public void testInfo() throws IOException {
Auditor auditor = new Auditor(client, clusterService);
Auditor auditor = new Auditor(client, "node_1");
auditor.info("foo", "Here is my info");
verify(client).index(indexRequestCaptor.capture(), any());
@ -67,7 +60,7 @@ public class AuditorTests extends ESTestCase {
}
public void testWarning() throws IOException {
Auditor auditor = new Auditor(client, clusterService);
Auditor auditor = new Auditor(client, "node_1");
auditor.warning("bar", "Here is my warning");
verify(client).index(indexRequestCaptor.capture(), any());
@ -82,7 +75,7 @@ public class AuditorTests extends ESTestCase {
}
public void testError() throws IOException {
Auditor auditor = new Auditor(client, clusterService);
Auditor auditor = new Auditor(client, "node_1");
auditor.error("foobar", "Here is my error");
verify(client).index(indexRequestCaptor.capture(), any());