[ML] UpdateProcessNotifier should drain the update queue (elastic/x-pack-elasticsearch#3774)

The notifier is scheduled to run once per second. Currently,
it simply polls for the next update in the queue. However,
when there are multiple updates queued up, there is no
reason to wait for subsequent runs in order to execute the
rest of the updates.

This commit changes the notifier to drain the queue each time
it runs. It then serially executes the updates.

relates elastic/x-pack-elasticsearch#3769

Original commit: elastic/x-pack-elasticsearch@7a433c17f2
This commit is contained in:
Dimitris Athanasiou 2018-01-30 16:35:02 +00:00 committed by GitHub
parent ce77a3dd6d
commit 5b7c38da7f
1 changed files with 16 additions and 6 deletions

View File

@ -18,7 +18,11 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -81,17 +85,20 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
} }
private void processNextUpdate() { private void processNextUpdate() {
List<UpdateParams> updates = new ArrayList<>(orderedJobUpdates.size());
try { try {
UpdateParams updateParams = orderedJobUpdates.poll(); orderedJobUpdates.drainTo(updates);
if (updateParams != null) { executeProcessUpdates(new VolatileCursorIterator<>(updates));
executeRemoteJob(updateParams);
}
} catch (Exception e) { } catch (Exception e) {
logger.error("Unable while processing next job update", e); logger.error("Error while processing next job update", e);
} }
} }
void executeRemoteJob(UpdateParams update) { void executeProcessUpdates(Iterator<UpdateParams> updatesIterator) {
if (updatesIterator.hasNext() == false) {
return;
}
UpdateParams update = updatesIterator.next();
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
update.isUpdateScheduledEvents()); update.isUpdateScheduledEvents());
@ -104,6 +111,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
} else { } else {
logger.error("Failed to update remote job [{}]", update.getJobId()); logger.error("Failed to update remote job [{}]", update.getJobId());
} }
executeProcessUpdates(updatesIterator);
} }
@Override @Override
@ -116,7 +124,9 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
} else { } else {
logger.error("Failed to update remote job [" + update.getJobId() + "]", e); logger.error("Failed to update remote job [" + update.getJobId() + "]", e);
} }
executeProcessUpdates(updatesIterator);
} }
}); });
} }
} }