[ML] Fix calendar and filter updates from non-master nodes ()

Job updates or changes to calendars or filters may
result into updating the job process if it has been
running. To preserve the order of updates, process
updates are queued through the UpdateJobProcessNotifier
which is only running on the master node. All actions
performing such updates must run on the master node.

However, the CRUD actions for calendars and filters
are not master node actions. They have been submitting
the updates to the UpdateJobProcessNotifier even though
it might have not been running (given the action was
run on a non-master node). When that happens, the update
never reaches the process.

This commit fixes this problem by ensuring the notifier
runs on all nodes and by ensuring the process update action
gets the resources again before updating the process
(instead of having those resources passed in the request).

This ensures that even if the order of the updates
gets messed up, the latest update will read the latest
state of those resource and the process will get back
in sync.

This leaves us with 2 types of updates:

  1. updates to the job config should happen on the master
  node. This is because we cannot refetch the entire job
  and update it. We need to know the parts that have been changed.

  2. updates to resources the job uses. Those can be handled
  on non-master nodes but they should be re-fetched by the
  update process action.

Closes 
This commit is contained in:
Dimitris Athanasiou 2018-07-05 13:14:12 +01:00 committed by GitHub
parent f40581caa0
commit 9c11bf1e12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 211 additions and 60 deletions
x-pack/plugin/ml/src

@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.ml.job;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
@ -31,9 +32,26 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request;
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response;
public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener {
/**
* This class serves as a queue for updates to the job process.
* Queueing is important for 2 reasons: first, it throttles the updates
* to the process, and second and most important, it preserves the order of the updates
* for actions that run on the master node. For preserving the order of the updates
* to the job config, it's necessary to handle the whole update chain on the master
* node. However, for updates to resources the job uses (e.g. calendars, filters),
* they can be handled on non-master nodes as long as the update process action
* is fetching the latest version of those resources from the index instead of
* using the version that existed while the handling action was at work. This makes
* sure that even if the order of updates gets reversed, the final process update
* will fetch the valid state of those external resources ensuring the process is
* in sync.
*/
public class UpdateJobProcessNotifier extends AbstractComponent {
private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class);
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LinkedBlockingQueue<UpdateHolder> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStart() {
start();
}
@Override
public void beforeStop() {
stop();
@ -56,16 +80,6 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
return orderedJobUpdates.offer(new UpdateHolder(update, listener));
}
@Override
public void onMaster() {
start();
}
@Override
public void offMaster() {
stop();
}
private void start() {
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
}
@ -79,12 +93,6 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
}
}
@Override
public String executorName() {
// SAME is ok here, because both start() and stop() are inexpensive:
return ThreadPool.Names.SAME;
}
private void processNextUpdate() {
List<UpdateHolder> updates = new ArrayList<>(orderedJobUpdates.size());
try {
@ -101,6 +109,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
}
UpdateHolder updateHolder = updatesIterator.next();
UpdateParams update = updateHolder.update;
if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
assert clusterService.localNode().isMasterNode();
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
+ update.getJobId() + "] will be ignored");
executeProcessUpdates(updatesIterator);
return;
}
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
update.isUpdateScheduledEvents());

@ -17,28 +17,27 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.Closeable;
import java.io.IOException;
@ -46,7 +45,6 @@ import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
@ -206,30 +204,29 @@ public class AutodetectCommunicator implements Closeable {
}
}
public void writeUpdateProcessMessage(UpdateParams updateParams, List<ScheduledEvent> scheduledEvents,
BiConsumer<Void, Exception> handler) {
public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
if (updateParams.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
if (update.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig());
}
// Filters have to be written before detectors
if (updateParams.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter()));
if (update.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
}
// Add detector rules
if (updateParams.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
if (update.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
if (update.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
if (detectorUpdate.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules());
}
}
}
// Add scheduled events; null means there's no update but an empty list means we should clear any events in the process
if (scheduledEvents != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan());
if (update.getScheduledEvents() != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan());
}
return null;

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
@ -22,24 +20,26 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.JobManager;
@ -47,10 +47,12 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
@ -82,6 +84,8 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class AutodetectProcessManager extends AbstractComponent {
@ -156,7 +160,7 @@ public class AutodetectProcessManager extends AbstractComponent {
}
}
public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
public synchronized void closeAllJobsOnThisNode(String reason) {
int numJobs = processByAllocation.size();
if (numJobs != 0) {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
@ -322,8 +326,7 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams,
Consumer<Exception> handler) {
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
@ -332,25 +335,59 @@ public class AutodetectProcessManager extends AbstractComponent {
return;
}
UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder();
updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig());
updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates());
// Step 3. Set scheduled events on message and write update process message
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
events -> {
communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> {
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
},
handler::accept);
}, handler
);
if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
// Step 2. Set the filter on the message and get scheduled events
ActionListener<MlFilter> filterListener = ActionListener.wrap(
filter -> {
updateProcessMessage.setFilter(filter);
if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
} else {
eventsListener.onResponse(null);
}
}, handler
);
// Step 1. Get the filter
if (updateParams.getFilter() == null) {
filterListener.onResponse(null);
} else {
eventsListener.onResponse(null);
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request();
getFilterRequest.setFilterId(updateParams.getFilter().getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest,
new ActionListener<GetFiltersAction.Response>() {
@Override
public void onResponse(GetFiltersAction.Response response) {
filterListener.onResponse(response.getFilters().results().get(0));
}
@Override
public void onFailure(Exception e) {
handler.accept(e);
}
});
}
}

@ -49,6 +49,15 @@ public final class UpdateParams {
return filter;
}
/**
* Returns true if the update params include a job update,
* ie an update to the job config directly rather than an
* update to external resources a job uses (e.g. calendars, filters).
*/
public boolean isJobUpdate() {
return modelPlotConfig != null || detectorUpdates != null;
}
public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import java.util.List;
public final class UpdateProcessMessage {
@Nullable private final ModelPlotConfig modelPlotConfig;
@Nullable private final List<JobUpdate.DetectorUpdate> detectorUpdates;
@Nullable private final MlFilter filter;
@Nullable private final List<ScheduledEvent> scheduledEvents;
private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig, @Nullable List<JobUpdate.DetectorUpdate> detectorUpdates,
@Nullable MlFilter filter, List<ScheduledEvent> scheduledEvents) {
this.modelPlotConfig = modelPlotConfig;
this.detectorUpdates = detectorUpdates;
this.filter = filter;
this.scheduledEvents = scheduledEvents;
}
@Nullable
public ModelPlotConfig getModelPlotConfig() {
return modelPlotConfig;
}
@Nullable
public List<JobUpdate.DetectorUpdate> getDetectorUpdates() {
return detectorUpdates;
}
@Nullable
public MlFilter getFilter() {
return filter;
}
@Nullable
public List<ScheduledEvent> getScheduledEvents() {
return scheduledEvents;
}
public static class Builder {
@Nullable private ModelPlotConfig modelPlotConfig;
@Nullable private List<JobUpdate.DetectorUpdate> detectorUpdates;
@Nullable private MlFilter filter;
@Nullable private List<ScheduledEvent> scheduledEvents;
public Builder setModelPlotConfig(ModelPlotConfig modelPlotConfig) {
this.modelPlotConfig = modelPlotConfig;
return this;
}
public Builder setDetectorUpdates(List<JobUpdate.DetectorUpdate> detectorUpdates) {
this.detectorUpdates = detectorUpdates;
return this;
}
public Builder setFilter(MlFilter filter) {
this.filter = filter;
return this;
}
public Builder setScheduledEvents(List<ScheduledEvent> scheduledEvents) {
this.scheduledEvents = scheduledEvents;
return this;
}
public UpdateProcessMessage build() {
return new UpdateProcessMessage(modelPlotConfig, detectorUpdates, filter, scheduledEvents);
}
}
}

@ -95,11 +95,12 @@ public class AutodetectCommunicatorTests extends ESTestCase {
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(0, "updated description", Collections.singletonList(updatedRule)));
UpdateParams updateParams = UpdateParams.builder("foo").detectorUpdates(detectorUpdates).build();
List<ScheduledEvent> events = Collections.singletonList(
ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10)));
UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder().setDetectorUpdates(detectorUpdates);
updateProcessMessage.setScheduledEvents(events);
communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {}));
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), ((aVoid, e) -> {}));
verify(process).writeUpdateDetectorRulesMessage(eq(0), eq(Collections.singletonList(updatedRule)));
verify(process).writeUpdateScheduledEventsMessage(events, AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN);

@ -48,6 +48,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
@ -489,8 +490,15 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
UpdateParams updateParams = UpdateParams.builder("foo").modelPlotConfig(modelConfig).detectorUpdates(detectorUpdates).build();
manager.writeUpdateProcessMessage(jobTask, updateParams, e -> {});
verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(null), any());
ArgumentCaptor<UpdateProcessMessage> captor = ArgumentCaptor.forClass(UpdateProcessMessage.class);
verify(communicator).writeUpdateProcessMessage(captor.capture(), any());
UpdateProcessMessage updateProcessMessage = captor.getValue();
assertThat(updateProcessMessage.getModelPlotConfig(), equalTo(modelConfig));
assertThat(updateProcessMessage.getDetectorUpdates(), equalTo(detectorUpdates));
}
public void testJobHasActiveAutodetectProcess() {