From c7ba79bc192710bcc59b62f703ae85d7ea310bbe Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 25 Jun 2020 12:26:52 -0400 Subject: [PATCH] [7.x] [ML] make waiting for renormalization optional for internally flushing job (#58537) (#58553) * [ML] make waiting for renormalization optional for internally flushing job (#58537) When flushing, datafeeds only need the guaruntee that the latest bucket has been handled. But, in addition to this, the typical call to flush waits for renormalization to complete. For large jobs, this can take a fair bit of time (even longer than a bucket length). This causes unnecessary delays in handling data. This commit adds a new internal only flag that allows datafeeds (and forecasting) to skip waiting on renormalization. closes #58395 --- .../xpack/core/ml/action/FlushJobAction.java | 25 +++++++- .../ml/action/FlushJobActionRequestTests.java | 58 +++++++++++++++++++ .../ml/action/TransportFlushJobAction.java | 1 + .../xpack/ml/datafeed/DatafeedJob.java | 1 + .../autodetect/AutodetectCommunicator.java | 13 +++-- .../autodetect/params/FlushJobParams.java | 24 +++++++- .../xpack/ml/datafeed/DatafeedJobTests.java | 1 + .../AutodetectCommunicatorTests.java | 2 +- 8 files changed, 116 insertions(+), 9 deletions(-) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index 1eacdb562a5..c1d8ec39907 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -63,6 +63,7 @@ public class FlushJobAction extends ActionType { } private boolean calcInterim = false; + private boolean waitForNormalization = true; private String start; private String end; private String advanceTime; @@ -78,6 +79,11 @@ public class FlushJobAction extends ActionType { end = in.readOptionalString(); advanceTime = in.readOptionalString(); skipTime = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + this.waitForNormalization = in.readBoolean(); + } else { + this.waitForNormalization = true; + } } @Override @@ -88,6 +94,9 @@ public class FlushJobAction extends ActionType { out.writeOptionalString(end); out.writeOptionalString(advanceTime); out.writeOptionalString(skipTime); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeBoolean(waitForNormalization); + } } public Request(String jobId) { @@ -134,9 +143,22 @@ public class FlushJobAction extends ActionType { this.skipTime = skipTime; } + public boolean isWaitForNormalization() { + return waitForNormalization; + } + + /** + * Used internally. Datafeeds do not need to wait renormalization to complete before continuing. + * + * For large jobs, renormalization can take minutes, causing datafeeds to needlessly pause execution. + */ + public void setWaitForNormalization(boolean waitForNormalization) { + this.waitForNormalization = waitForNormalization; + } + @Override public int hashCode() { - return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime); + return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization); } @Override @@ -150,6 +172,7 @@ public class FlushJobAction extends ActionType { Request other = (Request) obj; return Objects.equals(jobId, other.jobId) && calcInterim == other.calcInterim && + waitForNormalization == other.waitForNormalization && Objects.equals(start, other.start) && Objects.equals(end, other.end) && Objects.equals(advanceTime, other.advanceTime) && diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java new file mode 100644 index 00000000000..67eb1770031 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FlushJobActionRequestTests.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; +import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Request; + +public class FlushJobActionRequestTests extends AbstractBWCWireSerializationTestCase { + + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLengthBetween(1, 20)); + if (randomBoolean()) { + request.setWaitForNormalization(randomBoolean()); + } + if (randomBoolean()) { + request.setCalcInterim(randomBoolean()); + } + if (randomBoolean()) { + request.setStart(Long.toString(randomNonNegativeLong())); + } + if (randomBoolean()) { + request.setEnd(Long.toString(randomNonNegativeLong())); + } + if (randomBoolean()) { + request.setAdvanceTime(Long.toString(randomNonNegativeLong())); + } + if (randomBoolean()) { + request.setSkipTime(Long.toString(randomNonNegativeLong())); + } + return request; + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request mutateInstanceForVersion(Request instance, Version version) { + if (version.before(Version.V_7_9_0)) { + Request bwcRequest = new Request(instance.jobId); + bwcRequest.setWaitForNormalization(true); + bwcRequest.setCalcInterim(instance.getCalcInterim()); + bwcRequest.setStart(instance.getStart()); + bwcRequest.setEnd(instance.getEnd()); + bwcRequest.setAdvanceTime(instance.getAdvanceTime()); + bwcRequest.setSkipTime(instance.getSkipTime()); + return bwcRequest; + } + return instance; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java index 25404d17a92..a96406dd84c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java @@ -31,6 +31,7 @@ public class TransportFlushJobAction extends TransportJobTaskAction listener) { FlushJobParams.Builder paramsBuilder = FlushJobParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); + paramsBuilder.waitForNormalization(request.isWaitForNormalization()); if (request.getAdvanceTime() != null) { paramsBuilder.advanceTime(request.getAdvanceTime()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index f9c745d4996..bb9e91a0982 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -184,6 +184,7 @@ class DatafeedJob { long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs; long end = toIntervalStartEpochMs(nowMinusQueryDelay); FlushJobAction.Request request = new FlushJobAction.Request(jobId); + request.setWaitForNormalization(false); request.setCalcInterim(true); request.setAdvanceTime(String.valueOf(end)); run(start, end, request); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 6b98a164aac..4cad2a8468f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -250,14 +250,15 @@ public class AutodetectCommunicator implements Closeable { public void flushJob(FlushJobParams params, BiConsumer handler) { submitOperation(() -> { String flushId = autodetectProcess.flushJob(params); - return waitFlushToCompletion(flushId); + return waitFlushToCompletion(flushId, params.isWaitForNormalization()); }, handler); } public void forecastJob(ForecastParams params, BiConsumer handler) { BiConsumer forecastConsumer = (aVoid, e) -> { if (e == null) { - FlushJobParams flushParams = FlushJobParams.builder().build(); + // Forecasting does not care about normalization of the local data as it is not being queried + FlushJobParams flushParams = FlushJobParams.builder().waitForNormalization(false).build(); flushJob(flushParams, (flushAcknowledgement, flushException) -> { if (flushException != null) { String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", job.getId()); @@ -284,7 +285,7 @@ public class AutodetectCommunicator implements Closeable { } @Nullable - FlushAcknowledgement waitFlushToCompletion(String flushId) throws Exception { + FlushAcknowledgement waitFlushToCompletion(String flushId, boolean waitForNormalization) throws Exception { LOGGER.debug("[{}] waiting for flush", job.getId()); FlushAcknowledgement flushAcknowledgement; @@ -300,10 +301,12 @@ public class AutodetectCommunicator implements Closeable { } if (processKilled == false) { - LOGGER.debug("[{}] Initial flush completed, waiting until renormalizer is idle.", job.getId()); // We also have to wait for the normalizer to become idle so that we block // clients from querying results in the middle of normalization. - autodetectResultProcessor.waitUntilRenormalizerIsIdle(); + if (waitForNormalization) { + LOGGER.debug("[{}] Initial flush completed, waiting until renormalizer is idle.", job.getId()); + autodetectResultProcessor.waitUntilRenormalizerIsIdle(); + } LOGGER.debug("[{}] Flush completed", job.getId()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java index 354b2d4c1b6..fa79a50310d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/FlushJobParams.java @@ -35,11 +35,21 @@ public class FlushJobParams { */ private final Long skipTimeSeconds; - private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds, Long skipTimeSeconds) { + /** + * Should the flush request wait for normalization or not. + */ + private final boolean waitForNormalization; + + private FlushJobParams(boolean calcInterim, + TimeRange timeRange, + Long advanceTimeSeconds, + Long skipTimeSeconds, + boolean waitForNormalization) { this.calcInterim = calcInterim; this.timeRange = Objects.requireNonNull(timeRange); this.advanceTimeSeconds = advanceTimeSeconds; this.skipTimeSeconds = skipTimeSeconds; + this.waitForNormalization = waitForNormalization; } public boolean shouldCalculateInterim() { @@ -76,6 +86,10 @@ public class FlushJobParams { return skipTimeSeconds; } + public boolean isWaitForNormalization() { + return waitForNormalization; + } + public static Builder builder() { return new Builder(); } @@ -101,6 +115,7 @@ public class FlushJobParams { private TimeRange timeRange = TimeRange.builder().build(); private String advanceTime; private String skipTime; + private boolean waitForNormalization = true; public Builder calcInterim(boolean value) { calcInterim = value; @@ -122,6 +137,11 @@ public class FlushJobParams { return this; } + public Builder waitForNormalization(boolean waitForNormalization) { + this.waitForNormalization = waitForNormalization; + return this; + } + public FlushJobParams build() { checkValidFlushArgumentsCombination(); Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime); @@ -130,7 +150,7 @@ public class FlushJobParams { throw ExceptionsHelper.badRequestException("advance_time [" + advanceTime + "] must be later than skip_time [" + skipTime + "]"); } - return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds); + return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization); } private void checkValidFlushArgumentsCombination() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index e11eac7be2b..16b3656bfd9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -262,6 +262,7 @@ public class DatafeedJobTests extends ESTestCase { FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId); flushRequest.setCalcInterim(true); flushRequest.setAdvanceTime("59000"); + flushRequest.setWaitForNormalization(false); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 465a9501e6d..4b128d8fb70 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -128,7 +128,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { when(processor.isFailed()).thenReturn(true); when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null); AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor); - expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo")); + expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo", true)); } public void testFlushJob_throwsIfProcessIsDead() throws IOException {