From 0d6bfd0ac3e3cee0d9d0afcdcecde17a31e1a74e Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 23 Jun 2020 16:42:06 +0100 Subject: [PATCH] [7.x][ML] Fix wire serialization for flush acknowledgements (#58443) There was a discrepancy in the implementation of flush acknowledgements: most of the class was designed on the basis that the "last finalized bucket time" could be null but the wire serialization assumed that it was never null. This works because, the C++ sends zero "last finalized bucket time" when it is not known or not relevant. But then the Java code will print that to XContent as it is assuming null represents not known or not relevant. This change corrects the discrepancies. Internally within the class null represents not known or not relevant, but this is translated from/to 0 for communications from the C++ and old nodes that have the bug. Additionally I switched from Date to Instant for this class and made the member variables final to modernise it a bit. Backport of #58413 --- .../test/AbstractSerializingTestCase.java | 8 +++ .../xpack/core/ml/action/FlushJobAction.java | 37 +++++++++----- .../output/FlushAcknowledgement.java | 49 +++++++++++++------ .../ml/action/PostDataFlushResponseTests.java | 5 +- .../ml/integration/InterimResultsIT.java | 2 +- .../xpack/ml/integration/PersistJobIT.java | 2 +- .../AutodetectResultProcessorIT.java | 7 ++- .../xpack/ml/datafeed/DatafeedJob.java | 9 ++-- .../BlackHoleAutodetectProcess.java | 2 +- .../autodetect/output/FlushListener.java | 2 +- .../xpack/ml/datafeed/DatafeedJobTests.java | 7 +-- .../output/FlushAcknowledgementTests.java | 8 ++- .../autodetect/output/FlushListenerTests.java | 6 +-- .../ml/job/results/AutodetectResultTests.java | 2 +- .../rest-api-spec/test/ml/post_data.yml | 2 +- 15 files changed, 98 insertions(+), 50 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java index 58107cbc731..528c95e4de2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractSerializingTestCase.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; +import java.time.Instant; import java.util.Date; import java.util.function.Predicate; @@ -108,4 +109,11 @@ public abstract class AbstractSerializingTestCase { @@ -186,33 +187,47 @@ public class FlushJobAction extends ActionType { public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - private boolean flushed; - private Date lastFinalizedBucketEnd; + private final boolean flushed; + private final Instant lastFinalizedBucketEnd; - public Response(boolean flushed, @Nullable Date lastFinalizedBucketEnd) { + public Response(boolean flushed, @Nullable Instant lastFinalizedBucketEnd) { super(null, null); this.flushed = flushed; - this.lastFinalizedBucketEnd = lastFinalizedBucketEnd; + // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object + this.lastFinalizedBucketEnd = + (lastFinalizedBucketEnd != null) ? Instant.ofEpochMilli(lastFinalizedBucketEnd.toEpochMilli()) : null; } public Response(StreamInput in) throws IOException { super(in); flushed = in.readBoolean(); - lastFinalizedBucketEnd = new Date(in.readVLong()); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + lastFinalizedBucketEnd = in.readOptionalInstant(); + } else { + long epochMillis = in.readVLong(); + // Older versions will be storing zero when the desired behaviour was null + lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(flushed); - out.writeVLong(lastFinalizedBucketEnd.getTime()); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalInstant(lastFinalizedBucketEnd); + } else { + // Older versions cannot tolerate null on the wire even though the rest of the class is designed to cope with null + long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; + out.writeVLong(epochMillis); + } } public boolean isFlushed() { return flushed; } - public Date getLastFinalizedBucketEnd() { + public Instant getLastFinalizedBucketEnd() { return lastFinalizedBucketEnd; } @@ -222,7 +237,8 @@ public class FlushJobAction extends ActionType { builder.field("flushed", flushed); if (lastFinalizedBucketEnd != null) { builder.timeField(FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName(), - FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", lastFinalizedBucketEnd.getTime()); + FlushAcknowledgement.LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", + lastFinalizedBucketEnd.toEpochMilli()); } builder.endObject(); return builder; @@ -242,7 +258,4 @@ public class FlushJobAction extends ActionType { return Objects.hash(flushed, lastFinalizedBucketEnd); } } - } - - diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java index 932e51a60f1..e09024ff7ba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/output/FlushAcknowledgement.java @@ -5,18 +5,17 @@ */ package org.elasticsearch.xpack.core.ml.job.process.autodetect.output; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.xpack.core.common.time.TimeUtils; import java.io.IOException; -import java.util.Date; +import java.time.Instant; import java.util.Objects; /** @@ -31,39 +30,58 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Date) a[1])); + TYPE.getPreferredName(), a -> new FlushAcknowledgement((String) a[0], (Long) a[1])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); - PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), - p -> TimeUtils.parseTimeField(p, LAST_FINALIZED_BUCKET_END.getPreferredName()), - LAST_FINALIZED_BUCKET_END, ObjectParser.ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END); } - private String id; - private Date lastFinalizedBucketEnd; + private final String id; + private final Instant lastFinalizedBucketEnd; - public FlushAcknowledgement(String id, Date lastFinalizedBucketEnd) { + public FlushAcknowledgement(String id, Long lastFinalizedBucketEndMs) { this.id = id; - this.lastFinalizedBucketEnd = lastFinalizedBucketEnd; + // The C++ passes 0 when last finalized bucket end is not available, so treat 0 as null + this.lastFinalizedBucketEnd = + (lastFinalizedBucketEndMs != null && lastFinalizedBucketEndMs > 0) ? Instant.ofEpochMilli(lastFinalizedBucketEndMs) : null; + } + + public FlushAcknowledgement(String id, Instant lastFinalizedBucketEnd) { + this.id = id; + // Round to millisecond accuracy to ensure round-tripping via XContent results in an equal object + long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; + this.lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; } public FlushAcknowledgement(StreamInput in) throws IOException { id = in.readString(); - lastFinalizedBucketEnd = new Date(in.readVLong()); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + lastFinalizedBucketEnd = in.readOptionalInstant(); + } else { + long epochMillis = in.readVLong(); + // Older versions will be storing zero when the desired behaviour was null + lastFinalizedBucketEnd = (epochMillis > 0) ? Instant.ofEpochMilli(epochMillis) : null; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); - out.writeVLong(lastFinalizedBucketEnd.getTime()); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalInstant(lastFinalizedBucketEnd); + } else { + // Older versions cannot tolerate null on the wire even though the rest of the class is designed to cope with null + long epochMillis = (lastFinalizedBucketEnd != null) ? lastFinalizedBucketEnd.toEpochMilli() : 0; + out.writeVLong(epochMillis); + } } public String getId() { return id; } - public Date getLastFinalizedBucketEnd() { + public Instant getLastFinalizedBucketEnd() { return lastFinalizedBucketEnd; } @@ -73,7 +91,7 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { builder.field(ID.getPreferredName(), id); if (lastFinalizedBucketEnd != null) { builder.timeField(LAST_FINALIZED_BUCKET_END.getPreferredName(), LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", - lastFinalizedBucketEnd.getTime()); + lastFinalizedBucketEnd.toEpochMilli()); } builder.endObject(); return builder; @@ -97,4 +115,3 @@ public class FlushAcknowledgement implements ToXContentObject, Writeable { Objects.equals(lastFinalizedBucketEnd, other.lastFinalizedBucketEnd); } } - diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java index 4c042db3885..d0c17b6be64 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java @@ -9,14 +9,13 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Response; -import java.time.ZonedDateTime; -import java.util.Date; +import java.time.Instant; public class PostDataFlushResponseTests extends AbstractWireSerializingTestCase { @Override protected Response createTestInstance() { - return new Response(randomBoolean(), Date.from(ZonedDateTime.now(randomZone()).toInstant())); + return new Response(randomBoolean(), Instant.now()); } @Override diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java index 5f203a3e758..c7f4227fcb6 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/InterimResultsIT.java @@ -129,7 +129,7 @@ public class InterimResultsIT extends MlNativeAutodetectIntegTestCase { assertThat(getInterimResults(job.getId()).isEmpty(), is(true)); // advance time and request interim results - long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime(); + long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().toEpochMilli(); FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId); advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000)); advanceTimeRequest.setCalcInterim(true); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java index b9666306911..6fd776e0650 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java @@ -94,7 +94,7 @@ public class PersistJobIT extends MlNativeAutodetectIntegTestCase { openJob(jobId); // advance time - long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime(); + long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().toEpochMilli(); FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId); advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000)); advanceTimeRequest.setCalcInterim(false); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 92c7e4a4c89..1401dd8b986 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -453,6 +453,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { return new Date(randomLongBetween(0, 3000000000000L)); } + private static Instant randomInstant() { + // between 1970 and 2065 + return Instant.ofEpochSecond(randomLongBetween(0, 3000000000L), randomLongBetween(0, 999999999)); + } + private static List createRecords(boolean isInterim) { List records = new ArrayList<>(); @@ -523,7 +528,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { } private static FlushAcknowledgement createFlushAcknowledgement() { - return new FlushAcknowledgement(randomAlphaOfLength(5), randomDate()); + return new FlushAcknowledgement(randomAlphaOfLength(5), randomInstant()); } private static class ResultsBuilder { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 3836284a101..f9c745d4996 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.Date; import java.util.List; import java.util.Objects; @@ -172,8 +173,8 @@ class DatafeedJob { FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setSkipTime(String.valueOf(startTime)); FlushJobAction.Response flushResponse = flushJob(request); - LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().getTime()); - return flushResponse.getLastFinalizedBucketEnd().getTime(); + LOGGER.info("[{}] Skipped to time [{}]", jobId, flushResponse.getLastFinalizedBucketEnd().toEpochMilli()); + return flushResponse.getLastFinalizedBucketEnd().toEpochMilli(); } return startTime; } @@ -382,9 +383,9 @@ class DatafeedJob { // we call flush the job is closed. Thus, we don't flush unless the // datafeed is still running. if (isRunning() && !isIsolated) { - Date lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd(); + Instant lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd(); if (lastFinalizedBucketEnd != null) { - this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.getTime(); + this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.toEpochMilli(); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index f6111055367..246280d8c14 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -107,7 +107,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { */ @Override public String flushJob(FlushJobParams params) { - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, 0L); AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null, null, flushAcknowledgement); results.add(result); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 2a349ce8aee..5edbd573b94 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -69,7 +69,7 @@ class FlushListener { private volatile Exception flushException; private FlushAcknowledgementHolder(String flushId) { - this.flushAcknowledgement = new FlushAcknowledgement(flushId, null); + this.flushAcknowledgement = new FlushAcknowledgement(flushId, 0L); this.latch = new CountDownLatch(1); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index cbb11e7c180..e11eac7be2b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -56,6 +56,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -120,7 +121,7 @@ public class DatafeedJobTests extends ESTestCase { postDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); annotationDocId = "AnnotationDocId"; - flushJobResponse = new FlushJobAction.Response(true, new Date()); + flushJobResponse = new FlushJobAction.Response(true, Instant.now()); delayedDataDetector = mock(DelayedDataDetector.class); when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS); currentTime = 0; @@ -216,7 +217,7 @@ public class DatafeedJobTests extends ESTestCase { long latestFinalBucketEndTimeMs = 5000; long latestRecordTimeMs = 5000; - FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, new Date(10000L)); + FlushJobAction.Response skipTimeResponse = new FlushJobAction.Response(true, Instant.ofEpochMilli(10000L)); when(flushJobFuture.actionGet()).thenReturn(skipTimeResponse); long frequencyMs = 1000; @@ -241,7 +242,7 @@ public class DatafeedJobTests extends ESTestCase { } public void testRealtimeRun() throws Exception { - flushJobResponse = new FlushJobAction.Response(true, new Date(2000)); + flushJobResponse = new FlushJobAction.Response(true, Instant.ofEpochMilli(2000)); Bucket bucket = mock(Bucket.class); when(bucket.getTimestamp()).thenReturn(new Date(2000)); when(bucket.getEpoch()).thenReturn(2L); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java index 25af894fd3f..c3bc9459915 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushAcknowledgementTests.java @@ -10,7 +10,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; -import java.util.Date; +import java.time.Instant; public class FlushAcknowledgementTests extends AbstractSerializingTestCase { @@ -21,7 +21,11 @@ public class FlushAcknowledgementTests extends AbstractSerializingTestCase assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertNull(flushAcknowledgementHolder.get()); - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L)); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", 12345678L); listener.acknowledgeFlush(flushAcknowledgement, null); assertBusy(() -> assertNotNull(flushAcknowledgementHolder.get())); assertEquals(1, listener.awaitingFlushed.size()); @@ -58,7 +58,7 @@ public class FlushListenerTests extends ESTestCase { }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertNull(flushExceptionHolder.get()); - FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", new Date(12345678L)); + FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement("_id", Instant.ofEpochMilli(12345678L)); listener.acknowledgeFlush(flushAcknowledgement, new Exception("BOOM")); assertBusy(() -> { assertNotNull(flushExceptionHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java index c217525b8d4..e6b90e552e8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java @@ -124,7 +124,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase