[7.x] [ML] make waiting for renormalization optional for internally flushing job (#58537) (#58553)

* [ML] make waiting for renormalization optional for internally flushing job (#58537)

When flushing, datafeeds only need the guaruntee that the latest bucket has been handled.

But, in addition to this, the typical call to flush waits for renormalization to complete. For large jobs, this can take a fair bit of time (even longer than a bucket length). This causes unnecessary delays in handling data.

This commit adds a new internal only flag that allows datafeeds (and forecasting) to skip waiting on renormalization.

closes #58395
This commit is contained in:
Benjamin Trent 2020-06-25 12:26:52 -04:00 committed by GitHub
parent 6451187e84
commit c7ba79bc19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 116 additions and 9 deletions

View File

@ -63,6 +63,7 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
}
private boolean calcInterim = false;
private boolean waitForNormalization = true;
private String start;
private String end;
private String advanceTime;
@ -78,6 +79,11 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
end = in.readOptionalString();
advanceTime = in.readOptionalString();
skipTime = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
this.waitForNormalization = in.readBoolean();
} else {
this.waitForNormalization = true;
}
}
@Override
@ -88,6 +94,9 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
out.writeOptionalString(end);
out.writeOptionalString(advanceTime);
out.writeOptionalString(skipTime);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeBoolean(waitForNormalization);
}
}
public Request(String jobId) {
@ -134,9 +143,22 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
this.skipTime = skipTime;
}
public boolean isWaitForNormalization() {
return waitForNormalization;
}
/**
* Used internally. Datafeeds do not need to wait renormalization to complete before continuing.
*
* For large jobs, renormalization can take minutes, causing datafeeds to needlessly pause execution.
*/
public void setWaitForNormalization(boolean waitForNormalization) {
this.waitForNormalization = waitForNormalization;
}
@Override
public int hashCode() {
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime, waitForNormalization);
}
@Override
@ -150,6 +172,7 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) &&
calcInterim == other.calcInterim &&
waitForNormalization == other.waitForNormalization &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(advanceTime, other.advanceTime) &&

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Request;
public class FlushJobActionRequestTests extends AbstractBWCWireSerializationTestCase<Request> {
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(1, 20));
if (randomBoolean()) {
request.setWaitForNormalization(randomBoolean());
}
if (randomBoolean()) {
request.setCalcInterim(randomBoolean());
}
if (randomBoolean()) {
request.setStart(Long.toString(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setEnd(Long.toString(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setAdvanceTime(Long.toString(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setSkipTime(Long.toString(randomNonNegativeLong()));
}
return request;
}
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request mutateInstanceForVersion(Request instance, Version version) {
if (version.before(Version.V_7_9_0)) {
Request bwcRequest = new Request(instance.jobId);
bwcRequest.setWaitForNormalization(true);
bwcRequest.setCalcInterim(instance.getCalcInterim());
bwcRequest.setStart(instance.getStart());
bwcRequest.setEnd(instance.getEnd());
bwcRequest.setAdvanceTime(instance.getAdvanceTime());
bwcRequest.setSkipTime(instance.getSkipTime());
return bwcRequest;
}
return instance;
}
}

View File

@ -31,6 +31,7 @@ public class TransportFlushJobAction extends TransportJobTaskAction<FlushJobActi
ActionListener<FlushJobAction.Response> listener) {
FlushJobParams.Builder paramsBuilder = FlushJobParams.builder();
paramsBuilder.calcInterim(request.getCalcInterim());
paramsBuilder.waitForNormalization(request.isWaitForNormalization());
if (request.getAdvanceTime() != null) {
paramsBuilder.advanceTime(request.getAdvanceTime());
}

View File

@ -184,6 +184,7 @@ class DatafeedJob {
long nowMinusQueryDelay = currentTimeSupplier.get() - queryDelayMs;
long end = toIntervalStartEpochMs(nowMinusQueryDelay);
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setWaitForNormalization(false);
request.setCalcInterim(true);
request.setAdvanceTime(String.valueOf(end));
run(start, end, request);

View File

@ -250,14 +250,15 @@ public class AutodetectCommunicator implements Closeable {
public void flushJob(FlushJobParams params, BiConsumer<FlushAcknowledgement, Exception> handler) {
submitOperation(() -> {
String flushId = autodetectProcess.flushJob(params);
return waitFlushToCompletion(flushId);
return waitFlushToCompletion(flushId, params.isWaitForNormalization());
}, handler);
}
public void forecastJob(ForecastParams params, BiConsumer<Void, Exception> handler) {
BiConsumer<Void, Exception> forecastConsumer = (aVoid, e) -> {
if (e == null) {
FlushJobParams flushParams = FlushJobParams.builder().build();
// Forecasting does not care about normalization of the local data as it is not being queried
FlushJobParams flushParams = FlushJobParams.builder().waitForNormalization(false).build();
flushJob(flushParams, (flushAcknowledgement, flushException) -> {
if (flushException != null) {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", job.getId());
@ -284,7 +285,7 @@ public class AutodetectCommunicator implements Closeable {
}
@Nullable
FlushAcknowledgement waitFlushToCompletion(String flushId) throws Exception {
FlushAcknowledgement waitFlushToCompletion(String flushId, boolean waitForNormalization) throws Exception {
LOGGER.debug("[{}] waiting for flush", job.getId());
FlushAcknowledgement flushAcknowledgement;
@ -300,10 +301,12 @@ public class AutodetectCommunicator implements Closeable {
}
if (processKilled == false) {
LOGGER.debug("[{}] Initial flush completed, waiting until renormalizer is idle.", job.getId());
// We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization.
autodetectResultProcessor.waitUntilRenormalizerIsIdle();
if (waitForNormalization) {
LOGGER.debug("[{}] Initial flush completed, waiting until renormalizer is idle.", job.getId());
autodetectResultProcessor.waitUntilRenormalizerIsIdle();
}
LOGGER.debug("[{}] Flush completed", job.getId());
}

View File

@ -35,11 +35,21 @@ public class FlushJobParams {
*/
private final Long skipTimeSeconds;
private FlushJobParams(boolean calcInterim, TimeRange timeRange, Long advanceTimeSeconds, Long skipTimeSeconds) {
/**
* Should the flush request wait for normalization or not.
*/
private final boolean waitForNormalization;
private FlushJobParams(boolean calcInterim,
TimeRange timeRange,
Long advanceTimeSeconds,
Long skipTimeSeconds,
boolean waitForNormalization) {
this.calcInterim = calcInterim;
this.timeRange = Objects.requireNonNull(timeRange);
this.advanceTimeSeconds = advanceTimeSeconds;
this.skipTimeSeconds = skipTimeSeconds;
this.waitForNormalization = waitForNormalization;
}
public boolean shouldCalculateInterim() {
@ -76,6 +86,10 @@ public class FlushJobParams {
return skipTimeSeconds;
}
public boolean isWaitForNormalization() {
return waitForNormalization;
}
public static Builder builder() {
return new Builder();
}
@ -101,6 +115,7 @@ public class FlushJobParams {
private TimeRange timeRange = TimeRange.builder().build();
private String advanceTime;
private String skipTime;
private boolean waitForNormalization = true;
public Builder calcInterim(boolean value) {
calcInterim = value;
@ -122,6 +137,11 @@ public class FlushJobParams {
return this;
}
public Builder waitForNormalization(boolean waitForNormalization) {
this.waitForNormalization = waitForNormalization;
return this;
}
public FlushJobParams build() {
checkValidFlushArgumentsCombination();
Long advanceTimeSeconds = parseTimeParam("advance_time", advanceTime);
@ -130,7 +150,7 @@ public class FlushJobParams {
throw ExceptionsHelper.badRequestException("advance_time [" + advanceTime + "] must be later than skip_time ["
+ skipTime + "]");
}
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds);
return new FlushJobParams(calcInterim, timeRange, advanceTimeSeconds, skipTimeSeconds, waitForNormalization);
}
private void checkValidFlushArgumentsCombination() {

View File

@ -262,6 +262,7 @@ public class DatafeedJobTests extends ESTestCase {
FlushJobAction.Request flushRequest = new FlushJobAction.Request(jobId);
flushRequest.setCalcInterim(true);
flushRequest.setAdvanceTime("59000");
flushRequest.setWaitForNormalization(false);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());

View File

@ -128,7 +128,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(processor.isFailed()).thenReturn(true);
when(processor.waitForFlushAcknowledgement(anyString(), any())).thenReturn(null);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, processor);
expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo"));
expectThrows(ElasticsearchException.class, () -> communicator.waitFlushToCompletion("foo", true));
}
public void testFlushJob_throwsIfProcessIsDead() throws IOException {