[7.x][ML] Fix wire serialization for flush acknowledgements (#58443)
There was a discrepancy in the implementation of flush acknowledgements: most of the class was designed on the basis that the "last finalized bucket time" could be null but the wire serialization assumed that it was never null. This works because, the C++ sends zero "last finalized bucket time" when it is not known or not relevant. But then the Java code will print that to XContent as it is assuming null represents not known or not relevant. This change corrects the discrepancies. Internally within the class null represents not known or not relevant, but this is translated from/to 0 for communications from the C++ and old nodes that have the bug. Additionally I switched from Date to Instant for this class and made the member variables final to modernise it a bit. Backport of #58413
This commit is contained in:
parent
52806a8f89
commit
0d6bfd0ac3
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
@ -108,4 +109,11 @@ public abstract class AbstractSerializingTestCase<T extends ToXContent & Writeab
|
||||||
protected Date randomDate() {
|
protected Date randomDate() {
|
||||||
return new Date(randomLongBetween(0, 3000000000000L));
|
return new Date(randomLongBetween(0, 3000000000000L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a random instant between 1970 and ca 2065
|
||||||
|
*/
|
||||||
|
protected Instant randomInstant() {
|
||||||
|
return Instant.ofEpochSecond(randomLongBetween(0, 3000000000L), randomLongBetween(0, 999999999));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.core.ml.action;
|
package org.elasticsearch.xpack.core.ml.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionType;
|
import org.elasticsearch.action.ActionType;
|
||||||
import org.elasticsearch.action.ActionRequestBuilder;
|
import org.elasticsearch.action.ActionRequestBuilder;
|
||||||
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
||||||
|
@ -22,7 +23,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Date;
|
import java.time.Instant;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class FlushJobAction extends ActionType<FlushJobAction.Response> {
|
public class FlushJobAction extends ActionType<FlushJobAction.Response> {
|
||||||
|
@ -186,33 +187,47 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
|
||||||
|
|
||||||
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
|
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
|
||||||
|
|
||||||
private boolean flushed;
|
private final boolean flushed;
|
||||||
private Date lastFinalizedBucketEnd;
|
private final Instant lastFinalizedBucketEnd;
|
||||||
|
|
||||||
public Response(boolean flushed, @Nullable Date lastFinalizedBucketEnd) {
|
public Response(boolean flushed, @Nullable Instant lastFinalizedBucketEnd) {
|
||||||
super(null, null);
|
super(null, null);
|
||||||
this.flushed = flushed;
|
this.flushed = flushed;
|
||||||
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
|
// Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object
|
||||||
|
this.lastFinalizedBucketEnd =
|
||||||
|
(lastFinalizedBucketEnd != null) ? Instant.ofEpochMilli(lastFinalizedBucketEnd.toEpochMilli()) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Response(StreamInput in) throws IOException {
|
public Response(StreamInput in) throws IOException {
|
||||||
super(in);
|
super(in);
|
||||||
flushed = in.readBoolean();
|
flushed = in.readBoolean();
|
||||||
lastFinalizedBucketEnd = new Date(in.readVLong());
|
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
|
||||||
|
lastFinalizedBucketEnd = in.readOptionalInstant();
|
||||||
|
} else {
|
||||||
|
long epochMillis = in.readVLong();
|
||||||
|
// Older versions will be storing zero when the desired behaviour was null
|
||||||
|
lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeBoolean(flushed);
|
out.writeBoolean(flushed);
|
||||||
out.writeVLong(lastFinalizedBucketEnd.getTime());
|
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
|
||||||
|
out.writeOptionalInstant(lastFinalizedBucketEnd);
|
||||||
|
} else {
|
||||||
|
// Older versions cannot tolerate null on the wire even though the rest of the class is designed to cope with null
|
||||||
|
long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0;
|
||||||
|
out.writeVLong(epochMillis);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isFlushed() {
|
public boolean isFlushed() {
|
||||||
return flushed;
|
return flushed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Date getLastFinalizedBucketEnd() {
|
public Instant getLastFinalizedBucketEnd() {
|
||||||
return lastFinalizedBucketEnd;
|
return lastFinalizedBucketEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,7 +237,8 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
|
||||||
builder.field("flushed", flushed);
|
builder.field("flushed", flushed);
|
||||||
if (lastFinalizedBucketEnd != null) {
|
if (lastFinalizedBucketEnd != null) {
|
||||||
builder.timeField(FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName(),
|
builder.timeField(FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName(),
|
||||||
FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", lastFinalizedBucketEnd.getTime());
|
FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string",
|
||||||
|
lastFinalizedBucketEnd.toEpochMilli());
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
@ -242,7 +258,4 @@ public class FlushJobAction extends ActionType<FlushJobAction.Response> {
|
||||||
return Objects.hash(flushed, lastFinalizedBucketEnd);
|
return Objects.hash(flushed, lastFinalizedBucketEnd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -5,18 +5,17 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.core.ml.job.process.autodetect.output;
|
package org.elasticsearch.xpack.core.ml.job.process.autodetect.output;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
|
||||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Date;
|
import java.time.Instant;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,39 +30,58 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
|
||||||
public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
|
public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
|
||||||
|
|
||||||
public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
|
public static final ConstructingObjectParser<FlushAcknowledgement, Void> PARSER = new ConstructingObjectParser<>(
|
||||||
TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Date) a[1]));
|
TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Long) a[1]));
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
|
||||||
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
|
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END);
|
||||||
p -> TimeUtils.parseTimeField(p, LAST_FINALIZED_BUCKET_END.getPreferredName()),
|
|
||||||
LAST_FINALIZED_BUCKET_END, ObjectParser.ValueType.VALUE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String id;
|
private final String id;
|
||||||
private Date lastFinalizedBucketEnd;
|
private final Instant lastFinalizedBucketEnd;
|
||||||
|
|
||||||
public FlushAcknowledgement(String id, Date lastFinalizedBucketEnd) {
|
public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
|
// The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null
|
||||||
|
this.lastFinalizedBucketEnd =
|
||||||
|
(lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0) ? Instant.ofEpochMilli(lastFinalizedBucketEndMs) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd) {
|
||||||
|
this.id = id;
|
||||||
|
// Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object
|
||||||
|
long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0;
|
||||||
|
this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FlushAcknowledgement(StreamInput in) throws IOException {
|
public FlushAcknowledgement(StreamInput in) throws IOException {
|
||||||
id = in.readString();
|
id = in.readString();
|
||||||
lastFinalizedBucketEnd = new Date(in.readVLong());
|
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
|
||||||
|
lastFinalizedBucketEnd = in.readOptionalInstant();
|
||||||
|
} else {
|
||||||
|
long epochMillis = in.readVLong();
|
||||||
|
// Older versions will be storing zero when the desired behaviour was null
|
||||||
|
lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
out.writeString(id);
|
out.writeString(id);
|
||||||
out.writeVLong(lastFinalizedBucketEnd.getTime());
|
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
|
||||||
|
out.writeOptionalInstant(lastFinalizedBucketEnd);
|
||||||
|
} else {
|
||||||
|
// Older versions cannot tolerate null on the wire even though the rest of the class is designed to cope with null
|
||||||
|
long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0;
|
||||||
|
out.writeVLong(epochMillis);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getId() {
|
public String getId() {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Date getLastFinalizedBucketEnd() {
|
public Instant getLastFinalizedBucketEnd() {
|
||||||
return lastFinalizedBucketEnd;
|
return lastFinalizedBucketEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +91,7 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
|
||||||
builder.field(ID.getPreferredName(), id);
|
builder.field(ID.getPreferredName(), id);
|
||||||
if (lastFinalizedBucketEnd != null) {
|
if (lastFinalizedBucketEnd != null) {
|
||||||
builder.timeField(LAST_FINALIZED_BUCKET_END.getPreferredName(), LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string",
|
builder.timeField(LAST_FINALIZED_BUCKET_END.getPreferredName(), LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string",
|
||||||
lastFinalizedBucketEnd.getTime());
|
lastFinalizedBucketEnd.toEpochMilli());
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
|
@ -97,4 +115,3 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable {
|
||||||
Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
|
Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,14 +9,13 @@ import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||||
import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Response;
|
import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Response;
|
||||||
|
|
||||||
import java.time.ZonedDateTime;
|
import java.time.Instant;
|
||||||
import java.util.Date;
|
|
||||||
|
|
||||||
public class PostDataFlushResponseTests extends AbstractWireSerializingTestCase<Response> {
|
public class PostDataFlushResponseTests extends AbstractWireSerializingTestCase<Response> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Response createTestInstance() {
|
protected Response createTestInstance() {
|
||||||
return new Response(randomBoolean(), Date.from(ZonedDateTime.now(randomZone()).toInstant()));
|
return new Response(randomBoolean(), Instant.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -129,7 +129,7 @@ public class InterimResultsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
assertThat(getInterimResults(job.getId()).isEmpty(), is(true));
|
assertThat(getInterimResults(job.getId()).isEmpty(), is(true));
|
||||||
|
|
||||||
// advance time and request interim results
|
// advance time and request interim results
|
||||||
long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime();
|
long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().toEpochMilli();
|
||||||
FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
|
FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
|
||||||
advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000));
|
advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000));
|
||||||
advanceTimeRequest.setCalcInterim(true);
|
advanceTimeRequest.setCalcInterim(true);
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class PersistJobIT extends MlNativeAutodetectIntegTestCase {
|
||||||
openJob(jobId);
|
openJob(jobId);
|
||||||
|
|
||||||
// advance time
|
// advance time
|
||||||
long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime();
|
long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().toEpochMilli();
|
||||||
FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
|
FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
|
||||||
advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000));
|
advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000));
|
||||||
advanceTimeRequest.setCalcInterim(false);
|
advanceTimeRequest.setCalcInterim(false);
|
||||||
|
|
|
@ -453,6 +453,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
|
||||||
return new Date(randomLongBetween(0, 3000000000000L));
|
return new Date(randomLongBetween(0, 3000000000000L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Instant randomInstant() {
|
||||||
|
// between 1970 and 2065
|
||||||
|
return Instant.ofEpochSecond(randomLongBetween(0, 3000000000L), randomLongBetween(0, 999999999));
|
||||||
|
}
|
||||||
|
|
||||||
private static List<AnomalyRecord> createRecords(boolean isInterim) {
|
private static List<AnomalyRecord> createRecords(boolean isInterim) {
|
||||||
List<AnomalyRecord> records = new ArrayList<>();
|
List<AnomalyRecord> records = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -523,7 +528,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static FlushAcknowledgement createFlushAcknowledgement() {
|
private static FlushAcknowledgement createFlushAcknowledgement() {
|
||||||
return new FlushAcknowledgement(randomAlphaOfLength(5), randomDate());
|
return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ResultsBuilder {
|
private static class ResultsBuilder {
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -172,8 +173,8 @@ class DatafeedJob {
|
||||||
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
|
FlushJobAction.Request request = new FlushJobAction.Request(jobId);
|
||||||
request.setSkipTime(String.valueOf(startTime));
|
request.setSkipTime(String.valueOf(startTime));
|
||||||
FlushJobAction.Response flushResponse = flushJob(request);
|
FlushJobAction.Response flushResponse = flushJob(request);
|
||||||
LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().getTime());
|
LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli());
|
||||||
return flushResponse.getLastFinalizedBucketEnd().getTime();
|
return flushResponse.getLastFinalizedBucketEnd().toEpochMilli();
|
||||||
}
|
}
|
||||||
return startTime;
|
return startTime;
|
||||||
}
|
}
|
||||||
|
@ -382,9 +383,9 @@ class DatafeedJob {
|
||||||
// we call flush the job is closed. Thus, we don't flush unless the
|
// we call flush the job is closed. Thus, we don't flush unless the
|
||||||
// datafeed is still running.
|
// datafeed is still running.
|
||||||
if (isRunning() && !isIsolated) {
|
if (isRunning() && !isIsolated) {
|
||||||
Date lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd();
|
Instant lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd();
|
||||||
if (lastFinalizedBucketEnd != null) {
|
if (lastFinalizedBucketEnd != null) {
|
||||||
this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.getTime();
|
this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String flushJob(FlushJobParams params) {
|
public String flushJob(FlushJobParams params) {
|
||||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null);
|
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L);
|
||||||
AutodetectResult result =
|
AutodetectResult result =
|
||||||
new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, null, flushAcknowledgement);
|
new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, null, flushAcknowledgement);
|
||||||
results.add(result);
|
results.add(result);
|
||||||
|
|
|
@ -69,7 +69,7 @@ class FlushListener {
|
||||||
private volatile Exception flushException;
|
private volatile Exception flushException;
|
||||||
|
|
||||||
private FlushAcknowledgementHolder(String flushId) {
|
private FlushAcknowledgementHolder(String flushId) {
|
||||||
this.flushAcknowledgement = new FlushAcknowledgement(flushId, null);
|
this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L);
|
||||||
this.latch = new CountDownLatch(1);
|
this.latch = new CountDownLatch(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
@ -120,7 +121,7 @@ public class DatafeedJobTests extends ESTestCase {
|
||||||
postDataFuture = mock(ActionFuture.class);
|
postDataFuture = mock(ActionFuture.class);
|
||||||
flushJobFuture = mock(ActionFuture.class);
|
flushJobFuture = mock(ActionFuture.class);
|
||||||
annotationDocId = "AnnotationDocId";
|
annotationDocId = "AnnotationDocId";
|
||||||
flushJobResponse = new FlushJobAction.Response(true, new Date());
|
flushJobResponse = new FlushJobAction.Response(true, Instant.now());
|
||||||
delayedDataDetector = mock(DelayedDataDetector.class);
|
delayedDataDetector = mock(DelayedDataDetector.class);
|
||||||
when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS);
|
when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS);
|
||||||
currentTime = 0;
|
currentTime = 0;
|
||||||
|
@ -216,7 +217,7 @@ public class DatafeedJobTests extends ESTestCase {
|
||||||
long latestFinalBucketEndTimeMs = 5000;
|
long latestFinalBucketEndTimeMs = 5000;
|
||||||
long latestRecordTimeMs = 5000;
|
long latestRecordTimeMs = 5000;
|
||||||
|
|
||||||
FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, new Date(10000L));
|
FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, Instant.ofEpochMilli(10000L));
|
||||||
when(flushJobFuture.actionGet()).thenReturn(skipTimeResponse);
|
when(flushJobFuture.actionGet()).thenReturn(skipTimeResponse);
|
||||||
|
|
||||||
long frequencyMs = 1000;
|
long frequencyMs = 1000;
|
||||||
|
@ -241,7 +242,7 @@ public class DatafeedJobTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRealtimeRun() throws Exception {
|
public void testRealtimeRun() throws Exception {
|
||||||
flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
|
flushJobResponse = new FlushJobAction.Response(true, Instant.ofEpochMilli(2000));
|
||||||
Bucket bucket = mock(Bucket.class);
|
Bucket bucket = mock(Bucket.class);
|
||||||
when(bucket.getTimestamp()).thenReturn(new Date(2000));
|
when(bucket.getTimestamp()).thenReturn(new Date(2000));
|
||||||
when(bucket.getEpoch()).thenReturn(2L);
|
when(bucket.getEpoch()).thenReturn(2L);
|
||||||
|
|
|
@ -10,7 +10,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.time.Instant;
|
||||||
|
|
||||||
public class FlushAcknowledgementTests extends AbstractSerializingTestCase<FlushAcknowledgement> {
|
public class FlushAcknowledgementTests extends AbstractSerializingTestCase<FlushAcknowledgement> {
|
||||||
|
|
||||||
|
@ -21,7 +21,11 @@ public class FlushAcknowledgementTests extends AbstractSerializingTestCase<Flush
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected FlushAcknowledgement createTestInstance() {
|
protected FlushAcknowledgement createTestInstance() {
|
||||||
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), new Date(randomNonNegativeLong()));
|
if (randomBoolean()) {
|
||||||
|
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomFrom(randomNonNegativeLong(), 0L, null));
|
||||||
|
} else {
|
||||||
|
return new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomFrom(randomInstant(), Instant.EPOCH, null));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -9,8 +9,8 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ public class FlushListenerTests extends ESTestCase {
|
||||||
}).start();
|
}).start();
|
||||||
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
|
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
|
||||||
assertNull(flushAcknowledgementHolder.get());
|
assertNull(flushAcknowledgementHolder.get());
|
||||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L));
|
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", 12345678L);
|
||||||
listener.acknowledgeFlush(flushAcknowledgement, null);
|
listener.acknowledgeFlush(flushAcknowledgement, null);
|
||||||
assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get()));
|
assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get()));
|
||||||
assertEquals(1, listener.awaitingFlushed.size());
|
assertEquals(1, listener.awaitingFlushed.size());
|
||||||
|
@ -58,7 +58,7 @@ public class FlushListenerTests extends ESTestCase {
|
||||||
}).start();
|
}).start();
|
||||||
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
|
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
|
||||||
assertNull(flushExceptionHolder.get());
|
assertNull(flushExceptionHolder.get());
|
||||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L));
|
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", Instant.ofEpochMilli(12345678L));
|
||||||
listener.acknowledgeFlush(flushAcknowledgement, new Exception("BOOM"));
|
listener.acknowledgeFlush(flushAcknowledgement, new Exception("BOOM"));
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
assertNotNull(flushExceptionHolder.get());
|
assertNotNull(flushExceptionHolder.get());
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
||||||
categorizerStats = null;
|
categorizerStats = null;
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomDate());
|
flushAcknowledgement = new FlushAcknowledgement(randomAlphaOfLengthBetween(1, 20), randomInstant());
|
||||||
} else {
|
} else {
|
||||||
flushAcknowledgement = null;
|
flushAcknowledgement = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,7 +108,7 @@ setup:
|
||||||
ml.flush_job:
|
ml.flush_job:
|
||||||
job_id: post-data-job
|
job_id: post-data-job
|
||||||
- match: { flushed: true }
|
- match: { flushed: true }
|
||||||
- match: { last_finalized_bucket_end: 0 }
|
- is_false: last_finalized_bucket_end
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
ml.close_job:
|
ml.close_job:
|
||||||
|
|
Loading…
Reference in New Issue