From 5b7c38da7f9a5d5b3e0aa6206b379b8f7428daee Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 30 Jan 2018 16:35:02 +0000 Subject: [PATCH] [ML] UpdateProcessNotifier should drain the update queue (elastic/x-pack-elasticsearch#3774) The notifier is scheduled to run once per second. Currently, it simply polls for the next update in the queue. However, when there are multiple updates queued up, there is no reason to wait for subsequent runs in order to execute the rest of the updates. This commit changes the notifier to drain the queue each time it runs. It then serially executes the updates. relates elastic/x-pack-elasticsearch#3769 Original commit: elastic/x-pack-elasticsearch@7a433c17f272852b08f355785bc0762fca101295 --- .../ml/job/UpdateJobProcessNotifier.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 2bba7cfc17f..6501c5532b0 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -18,7 +18,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; +import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -81,17 +85,20 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } private void processNextUpdate() { + List updates = new ArrayList<>(orderedJobUpdates.size()); try { - UpdateParams updateParams = orderedJobUpdates.poll(); - if (updateParams != null) { - executeRemoteJob(updateParams); - } + orderedJobUpdates.drainTo(updates); + executeProcessUpdates(new VolatileCursorIterator<>(updates)); } catch (Exception e) { - logger.error("Unable while processing next job update", e); + logger.error("Error while processing next job update", e); } } - void executeRemoteJob(UpdateParams update) { + void executeProcessUpdates(Iterator updatesIterator) { + if (updatesIterator.hasNext() == false) { + return; + } + UpdateParams update = updatesIterator.next(); Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); @@ -104,6 +111,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } else { logger.error("Failed to update remote job [{}]", update.getJobId()); } + executeProcessUpdates(updatesIterator); } @Override @@ -116,7 +124,9 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } else { logger.error("Failed to update remote job [" + update.getJobId() + "]", e); } + executeProcessUpdates(updatesIterator); } }); } + }