[ML] update Instant serialization (#44765) (#44954)

* [ML] update Instant serialization

* addressing PR comments

* removing unused import
This commit is contained in:
Benjamin Trent 2019-07-29 13:06:56 -05:00 committed by GitHub
parent 0914c04b8e
commit 3b514f0dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 79 additions and 61 deletions

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core.ml.calendars; package org.elasticsearch.xpack.core.ml.calendars;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -14,7 +15,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Operator; import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleAction; import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
@ -51,25 +51,14 @@ public class ScheduledEvent implements ToXContentObject, Writeable {
ObjectParser<ScheduledEvent.Builder, Void> parser = new ObjectParser<>("scheduled_event", ignoreUnknownFields, Builder::new); ObjectParser<ScheduledEvent.Builder, Void> parser = new ObjectParser<>("scheduled_event", ignoreUnknownFields, Builder::new);
parser.declareString(ScheduledEvent.Builder::description, DESCRIPTION); parser.declareString(ScheduledEvent.Builder::description, DESCRIPTION);
parser.declareField(ScheduledEvent.Builder::startTime, p -> { parser.declareField(ScheduledEvent.Builder::startTime,
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { p -> TimeUtils.parseTimeFieldToInstant(p, START_TIME.getPreferredName()),
return Instant.ofEpochMilli(p.longValue()); START_TIME,
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { ObjectParser.ValueType.VALUE);
return Instant.ofEpochMilli(TimeUtils.dateStringToEpoch(p.text())); parser.declareField(ScheduledEvent.Builder::endTime,
} p -> TimeUtils.parseTimeFieldToInstant(p, END_TIME.getPreferredName()),
throw new IllegalArgumentException( END_TIME,
"unexpected token [" + p.currentToken() + "] for [" + START_TIME.getPreferredName() + "]"); ObjectParser.ValueType.VALUE);
}, START_TIME, ObjectParser.ValueType.VALUE);
parser.declareField(ScheduledEvent.Builder::endTime, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return Instant.ofEpochMilli(p.longValue());
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Instant.ofEpochMilli(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + END_TIME.getPreferredName() + "]");
}, END_TIME, ObjectParser.ValueType.VALUE);
parser.declareString(ScheduledEvent.Builder::calendarId, Calendar.ID); parser.declareString(ScheduledEvent.Builder::calendarId, Calendar.ID);
parser.declareString((builder, s) -> {}, TYPE); parser.declareString((builder, s) -> {}, TYPE);
@ -88,16 +77,21 @@ public class ScheduledEvent implements ToXContentObject, Writeable {
ScheduledEvent(String description, Instant startTime, Instant endTime, String calendarId, @Nullable String eventId) { ScheduledEvent(String description, Instant startTime, Instant endTime, String calendarId, @Nullable String eventId) {
this.description = Objects.requireNonNull(description); this.description = Objects.requireNonNull(description);
this.startTime = Objects.requireNonNull(startTime); this.startTime = Instant.ofEpochMilli(Objects.requireNonNull(startTime).toEpochMilli());
this.endTime = Objects.requireNonNull(endTime); this.endTime = Instant.ofEpochMilli(Objects.requireNonNull(endTime).toEpochMilli());
this.calendarId = Objects.requireNonNull(calendarId); this.calendarId = Objects.requireNonNull(calendarId);
this.eventId = eventId; this.eventId = eventId;
} }
public ScheduledEvent(StreamInput in) throws IOException { public ScheduledEvent(StreamInput in) throws IOException {
description = in.readString(); description = in.readString();
startTime = Instant.ofEpochMilli(in.readVLong()); if (in.getVersion().onOrAfter(Version.CURRENT)) {
endTime = Instant.ofEpochMilli(in.readVLong()); startTime = in.readInstant();
endTime = in.readInstant();
} else {
startTime = Instant.ofEpochMilli(in.readVLong());
endTime = Instant.ofEpochMilli(in.readVLong());
}
calendarId = in.readString(); calendarId = in.readString();
eventId = in.readOptionalString(); eventId = in.readOptionalString();
} }
@ -152,8 +146,13 @@ public class ScheduledEvent implements ToXContentObject, Writeable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(description); out.writeString(description);
out.writeVLong(startTime.toEpochMilli()); if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeVLong(endTime.toEpochMilli()); out.writeInstant(startTime);
out.writeInstant(endTime);
} else {
out.writeVLong(startTime.toEpochMilli());
out.writeVLong(endTime.toEpochMilli());
}
out.writeString(calendarId); out.writeString(calendarId);
out.writeOptionalString(eventId); out.writeOptionalString(eventId);
} }
@ -186,17 +185,9 @@ public class ScheduledEvent implements ToXContentObject, Writeable {
} }
ScheduledEvent other = (ScheduledEvent) obj; ScheduledEvent other = (ScheduledEvent) obj;
// In Java 8 the tests pass with ZonedDateTime.isEquals() or ZonedDateTime.toInstant.equals()
// but in Java 9 & 10 the same tests fail.
// Both isEquals() and toInstant.equals() work the same; convert to epoch seconds and
// compare seconds and nanos are equal. For some reason the nanos are different in Java 9 & 10.
// It's sufficient to compare just the epoch seconds for the purpose of establishing equality
// which only occurs in testing.
// Note ZonedDataTime.equals() fails because the time zone and date-time must be the same
// which isn't the case in tests where the time zone is randomised.
return description.equals(other.description) return description.equals(other.description)
&& Objects.equals(startTime.getEpochSecond(), other.startTime.getEpochSecond()) && Objects.equals(startTime, other.startTime)
&& Objects.equals(endTime.getEpochSecond(), other.endTime.getEpochSecond()) && Objects.equals(endTime, other.endTime)
&& calendarId.equals(other.calendarId); && calendarId.equals(other.calendarId);
} }
@ -218,12 +209,12 @@ public class ScheduledEvent implements ToXContentObject, Writeable {
} }
public Builder startTime(Instant startTime) { public Builder startTime(Instant startTime) {
this.startTime = startTime; this.startTime = Instant.ofEpochMilli(Objects.requireNonNull(startTime, START_TIME.getPreferredName()).toEpochMilli());
return this; return this;
} }
public Builder endTime(Instant endTime) { public Builder endTime(Instant endTime) {
this.endTime = endTime; this.endTime = Instant.ofEpochMilli(Objects.requireNonNull(endTime, END_TIME.getPreferredName()).toEpochMilli());
return this; return this;
} }

View File

@ -127,7 +127,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
this.analyzedFields = analyzedFields; this.analyzedFields = analyzedFields;
this.modelMemoryLimit = modelMemoryLimit; this.modelMemoryLimit = modelMemoryLimit;
this.headers = Collections.unmodifiableMap(headers); this.headers = Collections.unmodifiableMap(headers);
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());; this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.version = version; this.version = version;
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core.ml.job.results; package org.elasticsearch.xpack.core.ml.job.results;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -129,11 +130,20 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
messages = null; messages = null;
} }
timestamp = Instant.ofEpochMilli(in.readVLong()); if (in.getVersion().onOrAfter(Version.CURRENT)) {
startTime = Instant.ofEpochMilli(in.readVLong()); timestamp = in.readInstant();
endTime = Instant.ofEpochMilli(in.readVLong()); startTime = in.readInstant();
createTime = Instant.ofEpochMilli(in.readVLong()); endTime = in.readInstant();
expiryTime = Instant.ofEpochMilli(in.readVLong()); createTime = in.readInstant();
expiryTime = in.readInstant();
} else {
timestamp = Instant.ofEpochMilli(in.readVLong());
startTime = Instant.ofEpochMilli(in.readVLong());
endTime = Instant.ofEpochMilli(in.readVLong());
createTime = Instant.ofEpochMilli(in.readVLong());
expiryTime = Instant.ofEpochMilli(in.readVLong());
}
progress = in.readDouble(); progress = in.readDouble();
processingTime = in.readLong(); processingTime = in.readLong();
setMemoryUsage(in.readLong()); setMemoryUsage(in.readLong());
@ -151,11 +161,19 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
} else { } else {
out.writeBoolean(false); out.writeBoolean(false);
} }
out.writeVLong(timestamp.toEpochMilli()); if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeVLong(startTime.toEpochMilli()); out.writeInstant(timestamp);
out.writeVLong(endTime.toEpochMilli()); out.writeInstant(startTime);
out.writeVLong(createTime.toEpochMilli()); out.writeInstant(endTime);
out.writeVLong(expiryTime.toEpochMilli()); out.writeInstant(createTime);
out.writeInstant(expiryTime);
} else {
out.writeVLong(timestamp.toEpochMilli());
out.writeVLong(startTime.toEpochMilli());
out.writeVLong(endTime.toEpochMilli());
out.writeVLong(createTime.toEpochMilli());
out.writeVLong(expiryTime.toEpochMilli());
}
out.writeDouble(progress); out.writeDouble(progress);
out.writeLong(processingTime); out.writeLong(processingTime);
out.writeLong(getMemoryUsage()); out.writeLong(getMemoryUsage());
@ -234,7 +252,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
} }
public void setTimeStamp(Instant timestamp) { public void setTimeStamp(Instant timestamp) {
this.timestamp = timestamp; this.timestamp = Instant.ofEpochMilli(timestamp.toEpochMilli());
} }
public Instant getTimestamp() { public Instant getTimestamp() {
@ -242,7 +260,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
} }
public void setStartTime(Instant startTime) { public void setStartTime(Instant startTime) {
this.startTime = startTime; this.startTime = Instant.ofEpochMilli(startTime.toEpochMilli());
} }
public Instant getStartTime() { public Instant getStartTime() {
@ -254,11 +272,11 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
} }
public void setEndTime(Instant endTime) { public void setEndTime(Instant endTime) {
this.endTime = endTime; this.endTime = Instant.ofEpochMilli(endTime.toEpochMilli());
} }
public void setCreateTime(Instant createTime) { public void setCreateTime(Instant createTime) {
this.createTime = createTime; this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
} }
public Instant getCreateTime() { public Instant getCreateTime() {
@ -266,7 +284,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
} }
public void setExpiryTime(Instant expiryTime) { public void setExpiryTime(Instant expiryTime) {
this.expiryTime = expiryTime; this.expiryTime = Instant.ofEpochMilli(expiryTime.toEpochMilli());
} }
public Instant getExpiryTime() { public Instant getExpiryTime() {

View File

@ -26,7 +26,7 @@ import static org.hamcrest.Matchers.containsString;
public class ScheduledEventTests extends AbstractSerializingTestCase<ScheduledEvent> { public class ScheduledEventTests extends AbstractSerializingTestCase<ScheduledEvent> {
public static ScheduledEvent createScheduledEvent(String calendarId) { public static ScheduledEvent createScheduledEvent(String calendarId) {
Instant start = Instant.ofEpochMilli(Instant.now().toEpochMilli()); Instant start = Instant.now();
return new ScheduledEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)), return new ScheduledEvent(randomAlphaOfLength(10), start, start.plusSeconds(randomIntBetween(1, 10000)),
calendarId, null); calendarId, null);
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.process.logging; package org.elasticsearch.xpack.ml.process.logging;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -69,12 +70,16 @@ public class CppLogMessage implements ToXContentObject, Writeable {
private long line = 0; private long line = 0;
public CppLogMessage(Instant timestamp) { public CppLogMessage(Instant timestamp) {
this.timestamp = timestamp; this.timestamp = Instant.ofEpochMilli(timestamp.toEpochMilli());
} }
public CppLogMessage(StreamInput in) throws IOException { public CppLogMessage(StreamInput in) throws IOException {
logger = in.readString(); logger = in.readString();
timestamp = Instant.ofEpochMilli(in.readVLong()); if (in.getVersion().onOrAfter(Version.CURRENT)) {
timestamp = in.readInstant();
} else {
timestamp = Instant.ofEpochMilli(in.readVLong());
}
level = in.readString(); level = in.readString();
pid = in.readVLong(); pid = in.readVLong();
thread = in.readString(); thread = in.readString();
@ -88,7 +93,11 @@ public class CppLogMessage implements ToXContentObject, Writeable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(logger); out.writeString(logger);
out.writeVLong(timestamp.toEpochMilli()); if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeInstant(timestamp);
} else {
out.writeVLong(timestamp.toEpochMilli());
}
out.writeString(level); out.writeString(level);
out.writeVLong(pid); out.writeVLong(pid);
out.writeString(thread); out.writeString(thread);
@ -129,7 +138,7 @@ public class CppLogMessage implements ToXContentObject, Writeable {
} }
public void setTimestamp(Instant d) { public void setTimestamp(Instant d) {
this.timestamp = d; this.timestamp = Instant.ofEpochMilli(d.toEpochMilli());
} }
public String getLevel() { public String getLevel() {

View File

@ -35,7 +35,7 @@ public class CppLogMessageTests extends AbstractSerializingTestCase<CppLogMessag
public void testParseWithMissingTimestamp() throws IOException { public void testParseWithMissingTimestamp() throws IOException {
XContent xContent = XContentFactory.xContent(XContentType.JSON); XContent xContent = XContentFactory.xContent(XContentType.JSON);
Instant before = Instant.now(); Instant before = Instant.ofEpochMilli(Instant.now().toEpochMilli());
String input = "{\"logger\":\"controller\",\"level\":\"INFO\"," String input = "{\"logger\":\"controller\",\"level\":\"INFO\","
+ "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 1\",\"class\":\"ml\"," + "\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 1\",\"class\":\"ml\","
@ -43,7 +43,7 @@ public class CppLogMessageTests extends AbstractSerializingTestCase<CppLogMessag
XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input); XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input);
CppLogMessage msg = CppLogMessage.PARSER.apply(parser, null); CppLogMessage msg = CppLogMessage.PARSER.apply(parser, null);
Instant after = Instant.now(); Instant after = Instant.ofEpochMilli(Instant.now().toEpochMilli());
assertTrue(before.isBefore(msg.getTimestamp()) || before.equals(msg.getTimestamp())); assertTrue(before.isBefore(msg.getTimestamp()) || before.equals(msg.getTimestamp()));
assertTrue(after.isAfter(msg.getTimestamp()) || after.equals(msg.getTimestamp())); assertTrue(after.isAfter(msg.getTimestamp()) || after.equals(msg.getTimestamp()));
} }