diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index 59b426d8c31..27970f332fc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -33,8 +33,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; - /** * */ @@ -160,7 +158,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest implements Composite } } refreshPolicy = RefreshPolicy.readFrom(in); - timeout = TimeValue.readTimeValue(in); + timeout = new TimeValue(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index bc1e631e559..a79f6d83204 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -633,9 +633,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement routing = in.readOptionalString(); parent = in.readOptionalString(); timestamp = in.readOptionalString(); - ttl = in.readBoolean() ? TimeValue.readTimeValue(in) : null; + ttl = in.readOptionalWriteable(TimeValue::new); source = in.readBytesReference(); - opType = OpType.fromId(in.readByte()); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); @@ -650,12 +649,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement out.writeOptionalString(routing); out.writeOptionalString(parent); out.writeOptionalString(timestamp); - if (ttl == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - ttl.writeTo(out); - } + out.writeOptionalWriteable(ttl); out.writeBytesReference(source); out.writeByte(opType.id()); out.writeLong(version); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index 9e45bccc547..e3f32543bf2 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** @@ -75,7 +74,7 @@ public abstract class AcknowledgedRequest public void readFrom(StreamInput in) throws IOException { super.readFrom(in); nodesIds = in.readStringArray(); - if (in.readBoolean()) { - timeout = TimeValue.readTimeValue(in); - } + timeout = in.readOptionalWriteable(TimeValue::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArrayNullable(nodesIds); - if (timeout == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - timeout.writeTo(out); - } + out.writeOptionalWriteable(timeout); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 44c420598b5..c444fd2cf39 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -181,7 +181,7 @@ public abstract class ReplicationRequest> extends parentTaskId = TaskId.readFromStream(in); nodesIds = in.readStringArray(); actions = in.readStringArray(); - if (in.readBoolean()) { - timeout = TimeValue.readTimeValue(in); - } + timeout = in.readOptionalWriteable(TimeValue::new); } @Override @@ -156,7 +154,7 @@ public class BaseTasksRequest> extends parentTaskId.writeTo(out); out.writeStringArrayNullable(nodesIds); out.writeStringArrayNullable(actions); - out.writeOptionalStreamable(timeout); + out.writeOptionalWriteable(timeout); } public boolean match(Task task) { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java index 4added7df31..a37c6371482 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java @@ -26,6 +26,8 @@ import java.io.IOException; * across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by * serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. For * example, {@link org.elasticsearch.common.unit.TimeValue} converts the time to nanoseconds for serialization. + * {@linkplain org.elasticsearch.common.unit.TimeValue} actually implements {@linkplain Writeable} not {@linkplain Streamable} but it has + * the same contract. * * Prefer implementing {@link Writeable} over implementing this interface where possible. Lots of code depends on this interface so this * isn't always possible. diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java index 55ab419881f..cf127e5b968 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java @@ -26,8 +26,6 @@ import java.io.IOException; * across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by * serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. For * example, {@link org.elasticsearch.common.unit.TimeValue} converts the time to nanoseconds for serialization. - * {@linkplain org.elasticsearch.common.unit.TimeValue} actually implements {@linkplain Streamable} not {@linkplain Writeable} but it has - * the same contract. * * Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable} * so this isn't always possible. diff --git a/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java b/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java index c467a0c18a8..2058355d300 100644 --- a/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java +++ b/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.joda.time.Period; import org.joda.time.PeriodType; import org.joda.time.format.PeriodFormat; @@ -34,7 +34,7 @@ import java.util.Locale; import java.util.Objects; import java.util.concurrent.TimeUnit; -public class TimeValue implements Streamable { +public class TimeValue implements Writeable { /** How many nano-seconds in one milli-second */ public static final long NSEC_PER_MSEC = 1000000; @@ -59,13 +59,8 @@ public class TimeValue implements Streamable { return new TimeValue(hours, TimeUnit.HOURS); } - private long duration; - - private TimeUnit timeUnit; - - private TimeValue() { - - } + private final long duration; + private final TimeUnit timeUnit; public TimeValue(long millis) { this(millis, TimeUnit.MILLISECONDS); @@ -76,6 +71,19 @@ public class TimeValue implements Streamable { this.timeUnit = timeUnit; } + /** + * Read from a stream. + */ + public TimeValue(StreamInput in) throws IOException { + duration = in.readZLong(); + timeUnit = TimeUnit.NANOSECONDS; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(nanos()); + } + public long nanos() { return timeUnit.toNanos(duration); } @@ -304,26 +312,6 @@ public class TimeValue implements Streamable { static final long C5 = C4 * 60L; static final long C6 = C5 * 24L; - public static TimeValue readTimeValue(StreamInput in) throws IOException { - TimeValue timeValue = new TimeValue(); - timeValue.readFrom(in); - return timeValue; - } - - /** - * serialization converts TimeValue internally to NANOSECONDS - */ - @Override - public void readFrom(StreamInput in) throws IOException { - duration = in.readLong(); - timeUnit = TimeUnit.NANOSECONDS; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(nanos()); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index d200ca5b07b..1b4f1ba75b0 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen.ping.unicast; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -79,7 +80,6 @@ import java.util.function.Function; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse; @@ -545,7 +545,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readInt(); - timeout = readTimeValue(in); + timeout = new TimeValue(in); pingResponse = readPingResponse(in); } diff --git a/core/src/main/java/org/elasticsearch/search/Scroll.java b/core/src/main/java/org/elasticsearch/search/Scroll.java index 2d703bdae90..13298b2cbbe 100644 --- a/core/src/main/java/org/elasticsearch/search/Scroll.java +++ b/core/src/main/java/org/elasticsearch/search/Scroll.java @@ -26,8 +26,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; - /** * A scroll enables scrolling of search request. It holds a {@link #keepAlive()} time that * will control how long to keep the scrolling resources open. @@ -64,18 +62,11 @@ public class Scroll implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { - if (in.readBoolean()) { - keepAlive = readTimeValue(in); - } + in.readOptionalWriteable(TimeValue::new); } @Override public void writeTo(StreamOutput out) throws IOException { - if (keepAlive == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - keepAlive.writeTo(out); - } + out.writeOptionalWriteable(keepAlive); } } diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 5c31323b3d8..f8cb882b083 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -579,7 +579,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { min = in.readInt(); max = in.readInt(); if (in.readBoolean()) { - keepAlive = TimeValue.readTimeValue(in); + keepAlive = new TimeValue(in); } if (in.readBoolean()) { queueSize = SizeValue.readSizeValue(in); diff --git a/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java b/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java index 3cd68bea038..cc36625e68f 100644 --- a/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java +++ b/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java @@ -28,6 +28,8 @@ import org.joda.time.PeriodType; import java.io.IOException; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -123,20 +125,22 @@ public class TimeValueTests extends ESTestCase { TimeValue.parseTimeValue("10W", null, "test")); } - private void assertEqualityAfterSerialize(TimeValue value) throws IOException { + private void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); value.writeTo(out); + assertEquals(expectedSize, out.size()); StreamInput in = StreamInput.wrap(out.bytes()); - TimeValue inValue = TimeValue.readTimeValue(in); + TimeValue inValue = new TimeValue(in); assertThat(inValue, equalTo(value)); } public void testSerialize() throws Exception { - assertEqualityAfterSerialize(new TimeValue(100, TimeUnit.DAYS)); - assertEqualityAfterSerialize(new TimeValue(-1)); - assertEqualityAfterSerialize(new TimeValue(1, TimeUnit.NANOSECONDS)); + assertEqualityAfterSerialize(new TimeValue(100, TimeUnit.DAYS), 8); + assertEqualityAfterSerialize(timeValueNanos(-1), 1); + assertEqualityAfterSerialize(timeValueNanos(1), 1); + assertEqualityAfterSerialize(timeValueSeconds(30), 6); } public void testFailOnUnknownUnits() { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 7505f490f45..499dfc82457 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -299,9 +299,9 @@ public abstract class AbstractBulkByScrollRequest indexingFailures = new ArrayList<>(indexingFailuresCount);