From 387155559ead75fbc6d6f330528229be47e75781 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 10 Jun 2016 15:32:30 -0400 Subject: [PATCH] Make TimeValue Writeable instead of Streamable Writeable is better for immutable objects like TimeValue. Switch to writeZLong which takes up less space than the original writeLong in the majority of cases. Since we expect negative TimeValues we shouldn't use writeVLong. --- .../cluster/health/ClusterHealthRequest.java | 4 +- .../cluster/health/ClusterHealthResponse.java | 2 +- .../hotthreads/NodesHotThreadsRequest.java | 2 +- .../action/bulk/BulkRequest.java | 2 +- .../action/index/IndexRequest.java | 10 +--- .../support/master/AcknowledgedRequest.java | 3 +- .../support/master/MasterNodeRequest.java | 2 +- .../support/nodes/BaseNodesRequest.java | 11 +---- .../replication/ReplicationRequest.java | 2 +- .../InstanceShardOperationRequest.java | 2 +- .../support/tasks/BaseTasksRequest.java | 6 +-- .../common/io/stream/Streamable.java | 2 + .../common/io/stream/Writeable.java | 2 - .../elasticsearch/common/unit/TimeValue.java | 46 +++++++------------ .../zen/ping/unicast/UnicastZenPing.java | 4 +- .../java/org/elasticsearch/search/Scroll.java | 13 +----- .../elasticsearch/threadpool/ThreadPool.java | 2 +- .../common/unit/TimeValueTests.java | 14 ++++-- .../reindex/AbstractBulkByScrollRequest.java | 4 +- .../index/reindex/BulkByScrollTask.java | 4 +- .../reindex/BulkIndexByScrollResponse.java | 2 +- 21 files changed, 52 insertions(+), 87 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index 59b426d8c31..27970f332fc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -33,8 +33,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; - /** * */ @@ -160,7 +158,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest implements Composite } } refreshPolicy = RefreshPolicy.readFrom(in); - timeout = TimeValue.readTimeValue(in); + timeout = new TimeValue(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index bc1e631e559..a79f6d83204 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -633,9 +633,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement routing = in.readOptionalString(); parent = in.readOptionalString(); timestamp = in.readOptionalString(); - ttl = in.readBoolean() ? TimeValue.readTimeValue(in) : null; + ttl = in.readOptionalWriteable(TimeValue::new); source = in.readBytesReference(); - opType = OpType.fromId(in.readByte()); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); @@ -650,12 +649,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement out.writeOptionalString(routing); out.writeOptionalString(parent); out.writeOptionalString(timestamp); - if (ttl == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - ttl.writeTo(out); - } + out.writeOptionalWriteable(ttl); out.writeBytesReference(source); out.writeByte(opType.id()); out.writeLong(version); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index 9e45bccc547..e3f32543bf2 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; /** @@ -75,7 +74,7 @@ public abstract class AcknowledgedRequest public void readFrom(StreamInput in) throws IOException { super.readFrom(in); nodesIds = in.readStringArray(); - if (in.readBoolean()) { - timeout = TimeValue.readTimeValue(in); - } + timeout = in.readOptionalWriteable(TimeValue::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArrayNullable(nodesIds); - if (timeout == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - timeout.writeTo(out); - } + out.writeOptionalWriteable(timeout); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 44c420598b5..c444fd2cf39 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -181,7 +181,7 @@ public abstract class ReplicationRequest> extends parentTaskId = TaskId.readFromStream(in); nodesIds = in.readStringArray(); actions = in.readStringArray(); - if (in.readBoolean()) { - timeout = TimeValue.readTimeValue(in); - } + timeout = in.readOptionalWriteable(TimeValue::new); } @Override @@ -156,7 +154,7 @@ public class BaseTasksRequest> extends parentTaskId.writeTo(out); out.writeStringArrayNullable(nodesIds); out.writeStringArrayNullable(actions); - out.writeOptionalStreamable(timeout); + out.writeOptionalWriteable(timeout); } public boolean match(Task task) { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java index 4added7df31..a37c6371482 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java @@ -26,6 +26,8 @@ import java.io.IOException; * across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by * serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. For * example, {@link org.elasticsearch.common.unit.TimeValue} converts the time to nanoseconds for serialization. + * {@linkplain org.elasticsearch.common.unit.TimeValue} actually implements {@linkplain Writeable} not {@linkplain Streamable} but it has + * the same contract. * * Prefer implementing {@link Writeable} over implementing this interface where possible. Lots of code depends on this interface so this * isn't always possible. diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java index 55ab419881f..cf127e5b968 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java @@ -26,8 +26,6 @@ import java.io.IOException; * across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by * serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. For * example, {@link org.elasticsearch.common.unit.TimeValue} converts the time to nanoseconds for serialization. - * {@linkplain org.elasticsearch.common.unit.TimeValue} actually implements {@linkplain Streamable} not {@linkplain Writeable} but it has - * the same contract. * * Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable} * so this isn't always possible. diff --git a/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java b/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java index c467a0c18a8..2058355d300 100644 --- a/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java +++ b/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.joda.time.Period; import org.joda.time.PeriodType; import org.joda.time.format.PeriodFormat; @@ -34,7 +34,7 @@ import java.util.Locale; import java.util.Objects; import java.util.concurrent.TimeUnit; -public class TimeValue implements Streamable { +public class TimeValue implements Writeable { /** How many nano-seconds in one milli-second */ public static final long NSEC_PER_MSEC = 1000000; @@ -59,13 +59,8 @@ public class TimeValue implements Streamable { return new TimeValue(hours, TimeUnit.HOURS); } - private long duration; - - private TimeUnit timeUnit; - - private TimeValue() { - - } + private final long duration; + private final TimeUnit timeUnit; public TimeValue(long millis) { this(millis, TimeUnit.MILLISECONDS); @@ -76,6 +71,19 @@ public class TimeValue implements Streamable { this.timeUnit = timeUnit; } + /** + * Read from a stream. + */ + public TimeValue(StreamInput in) throws IOException { + duration = in.readZLong(); + timeUnit = TimeUnit.NANOSECONDS; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(nanos()); + } + public long nanos() { return timeUnit.toNanos(duration); } @@ -304,26 +312,6 @@ public class TimeValue implements Streamable { static final long C5 = C4 * 60L; static final long C6 = C5 * 24L; - public static TimeValue readTimeValue(StreamInput in) throws IOException { - TimeValue timeValue = new TimeValue(); - timeValue.readFrom(in); - return timeValue; - } - - /** - * serialization converts TimeValue internally to NANOSECONDS - */ - @Override - public void readFrom(StreamInput in) throws IOException { - duration = in.readLong(); - timeUnit = TimeUnit.NANOSECONDS; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(nanos()); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index d200ca5b07b..1b4f1ba75b0 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen.ping.unicast; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -79,7 +80,6 @@ import java.util.function.Function; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse; @@ -545,7 +545,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readInt(); - timeout = readTimeValue(in); + timeout = new TimeValue(in); pingResponse = readPingResponse(in); } diff --git a/core/src/main/java/org/elasticsearch/search/Scroll.java b/core/src/main/java/org/elasticsearch/search/Scroll.java index 2d703bdae90..13298b2cbbe 100644 --- a/core/src/main/java/org/elasticsearch/search/Scroll.java +++ b/core/src/main/java/org/elasticsearch/search/Scroll.java @@ -26,8 +26,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import static org.elasticsearch.common.unit.TimeValue.readTimeValue; - /** * A scroll enables scrolling of search request. It holds a {@link #keepAlive()} time that * will control how long to keep the scrolling resources open. @@ -64,18 +62,11 @@ public class Scroll implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { - if (in.readBoolean()) { - keepAlive = readTimeValue(in); - } + in.readOptionalWriteable(TimeValue::new); } @Override public void writeTo(StreamOutput out) throws IOException { - if (keepAlive == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - keepAlive.writeTo(out); - } + out.writeOptionalWriteable(keepAlive); } } diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 5c31323b3d8..f8cb882b083 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -579,7 +579,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { min = in.readInt(); max = in.readInt(); if (in.readBoolean()) { - keepAlive = TimeValue.readTimeValue(in); + keepAlive = new TimeValue(in); } if (in.readBoolean()) { queueSize = SizeValue.readSizeValue(in); diff --git a/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java b/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java index 3cd68bea038..cc36625e68f 100644 --- a/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java +++ b/core/src/test/java/org/elasticsearch/common/unit/TimeValueTests.java @@ -28,6 +28,8 @@ import org.joda.time.PeriodType; import java.io.IOException; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -123,20 +125,22 @@ public class TimeValueTests extends ESTestCase { TimeValue.parseTimeValue("10W", null, "test")); } - private void assertEqualityAfterSerialize(TimeValue value) throws IOException { + private void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); value.writeTo(out); + assertEquals(expectedSize, out.size()); StreamInput in = StreamInput.wrap(out.bytes()); - TimeValue inValue = TimeValue.readTimeValue(in); + TimeValue inValue = new TimeValue(in); assertThat(inValue, equalTo(value)); } public void testSerialize() throws Exception { - assertEqualityAfterSerialize(new TimeValue(100, TimeUnit.DAYS)); - assertEqualityAfterSerialize(new TimeValue(-1)); - assertEqualityAfterSerialize(new TimeValue(1, TimeUnit.NANOSECONDS)); + assertEqualityAfterSerialize(new TimeValue(100, TimeUnit.DAYS), 8); + assertEqualityAfterSerialize(timeValueNanos(-1), 1); + assertEqualityAfterSerialize(timeValueNanos(1), 1); + assertEqualityAfterSerialize(timeValueSeconds(30), 6); } public void testFailOnUnknownUnits() { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 7505f490f45..499dfc82457 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -299,9 +299,9 @@ public abstract class AbstractBulkByScrollRequest indexingFailures = new ArrayList<>(indexingFailuresCount);