diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 2bba7cfc17f..6501c5532b0 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -18,7 +18,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; +import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -81,17 +85,20 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } private void processNextUpdate() { + List updates = new ArrayList<>(orderedJobUpdates.size()); try { - UpdateParams updateParams = orderedJobUpdates.poll(); - if (updateParams != null) { - executeRemoteJob(updateParams); - } + orderedJobUpdates.drainTo(updates); + executeProcessUpdates(new VolatileCursorIterator<>(updates)); } catch (Exception e) { - logger.error("Unable while processing next job update", e); + logger.error("Error while processing next job update", e); } } - void executeRemoteJob(UpdateParams update) { + void executeProcessUpdates(Iterator updatesIterator) { + if (updatesIterator.hasNext() == false) { + return; + } + UpdateParams update = updatesIterator.next(); Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); @@ -104,6 +111,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } else { logger.error("Failed to update remote job [{}]", update.getJobId()); } + executeProcessUpdates(updatesIterator); } @Override @@ -116,7 +124,9 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } else { logger.error("Failed to update remote job [" + update.getJobId() + "]", e); } + executeProcessUpdates(updatesIterator); } }); } + }