Make TimeValue Writeable instead of Streamable

Writeable is better for immutable objects like TimeValue.

Switch to writeZLong which takes up less space than the original
writeLong in the majority of cases. Since we expect negative
TimeValues we shouldn't use
writeVLong.
This commit is contained in:
Nik Everett 2016-06-10 15:32:30 -04:00
parent 200af0c6a6
commit 387155559e
21 changed files with 52 additions and 87 deletions

View File

@ -33,8 +33,6 @@ import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
/**
*
*/
@ -160,7 +158,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
indices[i] = in.readString();
}
}
timeout = readTimeValue(in);
timeout = new TimeValue(in);
if (in.readBoolean()) {
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
}

View File

@ -187,7 +187,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
timedOut = in.readBoolean();
numberOfInFlightFetch = in.readInt();
delayedUnassignedShards= in.readInt();
taskMaxWaitingTime = TimeValue.readTimeValue(in);
taskMaxWaitingTime = new TimeValue(in);
}
@Override

View File

@ -101,7 +101,7 @@ public class NodesHotThreadsRequest extends BaseNodesRequest<NodesHotThreadsRequ
threads = in.readInt();
ignoreIdleThreads = in.readBoolean();
type = in.readString();
interval = TimeValue.readTimeValue(in);
interval = new TimeValue(in);
snapshots = in.readInt();
}

View File

@ -544,7 +544,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
}
}
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = TimeValue.readTimeValue(in);
timeout = new TimeValue(in);
}
@Override

View File

@ -633,9 +633,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
routing = in.readOptionalString();
parent = in.readOptionalString();
timestamp = in.readOptionalString();
ttl = in.readBoolean() ? TimeValue.readTimeValue(in) : null;
ttl = in.readOptionalWriteable(TimeValue::new);
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
@ -650,12 +649,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
out.writeOptionalString(routing);
out.writeOptionalString(parent);
out.writeOptionalString(timestamp);
if (ttl == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
ttl.writeTo(out);
}
out.writeOptionalWriteable(ttl);
out.writeBytesReference(source);
out.writeByte(opType.id());
out.writeLong(version);

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
@ -75,7 +74,7 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ
* Reads the timeout value
*/
protected void readTimeout(StreamInput in) throws IOException {
timeout = readTimeValue(in);
timeout = new TimeValue(in);
}
/**

View File

@ -61,7 +61,7 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
masterNodeTimeout = TimeValue.readTimeValue(in);
masterNodeTimeout = new TimeValue(in);
}
@Override

View File

@ -82,20 +82,13 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodesIds = in.readStringArray();
if (in.readBoolean()) {
timeout = TimeValue.readTimeValue(in);
}
timeout = in.readOptionalWriteable(TimeValue::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(nodesIds);
if (timeout == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
timeout.writeTo(out);
}
out.writeOptionalWriteable(timeout);
}
}

View File

@ -181,7 +181,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
shardId = null;
}
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
timeout = new TimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
primaryTerm = in.readVLong();

View File

@ -121,7 +121,7 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
} else {
shardId = null;
}
timeout = TimeValue.readTimeValue(in);
timeout = new TimeValue(in);
concreteIndex = in.readOptionalString();
}

View File

@ -144,9 +144,7 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
parentTaskId = TaskId.readFromStream(in);
nodesIds = in.readStringArray();
actions = in.readStringArray();
if (in.readBoolean()) {
timeout = TimeValue.readTimeValue(in);
}
timeout = in.readOptionalWriteable(TimeValue::new);
}
@Override
@ -156,7 +154,7 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
parentTaskId.writeTo(out);
out.writeStringArrayNullable(nodesIds);
out.writeStringArrayNullable(actions);
out.writeOptionalStreamable(timeout);
out.writeOptionalWriteable(timeout);
}
public boolean match(Task task) {

View File

@ -26,6 +26,8 @@ import java.io.IOException;
* across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by
* serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. For
* example, {@link org.elasticsearch.common.unit.TimeValue} converts the time to nanoseconds for serialization.
* {@linkplain org.elasticsearch.common.unit.TimeValue} actually implements {@linkplain Writeable} not {@linkplain Streamable} but it has
* the same contract.
*
* Prefer implementing {@link Writeable} over implementing this interface where possible. Lots of code depends on this interface so this
* isn't always possible.

View File

@ -26,8 +26,6 @@ import java.io.IOException;
* across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by
* serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. For
* example, {@link org.elasticsearch.common.unit.TimeValue} converts the time to nanoseconds for serialization.
* {@linkplain org.elasticsearch.common.unit.TimeValue} actually implements {@linkplain Streamable} not {@linkplain Writeable} but it has
* the same contract.
*
* Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable}
* so this isn't always possible.

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.joda.time.Period;
import org.joda.time.PeriodType;
import org.joda.time.format.PeriodFormat;
@ -34,7 +34,7 @@ import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class TimeValue implements Streamable {
public class TimeValue implements Writeable {
/** How many nano-seconds in one milli-second */
public static final long NSEC_PER_MSEC = 1000000;
@ -59,13 +59,8 @@ public class TimeValue implements Streamable {
return new TimeValue(hours, TimeUnit.HOURS);
}
private long duration;
private TimeUnit timeUnit;
private TimeValue() {
}
private final long duration;
private final TimeUnit timeUnit;
public TimeValue(long millis) {
this(millis, TimeUnit.MILLISECONDS);
@ -76,6 +71,19 @@ public class TimeValue implements Streamable {
this.timeUnit = timeUnit;
}
/**
* Read from a stream.
*/
public TimeValue(StreamInput in) throws IOException {
duration = in.readZLong();
timeUnit = TimeUnit.NANOSECONDS;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(nanos());
}
public long nanos() {
return timeUnit.toNanos(duration);
}
@ -304,26 +312,6 @@ public class TimeValue implements Streamable {
static final long C5 = C4 * 60L;
static final long C6 = C5 * 24L;
public static TimeValue readTimeValue(StreamInput in) throws IOException {
TimeValue timeValue = new TimeValue();
timeValue.readFrom(in);
return timeValue;
}
/**
* serialization converts TimeValue internally to NANOSECONDS
*/
@Override
public void readFrom(StreamInput in) throws IOException {
duration = in.readLong();
timeUnit = TimeUnit.NANOSECONDS;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(nanos());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen.ping.unicast;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -79,7 +80,6 @@ import java.util.function.Function;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse;
@ -545,7 +545,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readInt();
timeout = readTimeValue(in);
timeout = new TimeValue(in);
pingResponse = readPingResponse(in);
}

View File

@ -26,8 +26,6 @@ import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
/**
* A scroll enables scrolling of search request. It holds a {@link #keepAlive()} time that
* will control how long to keep the scrolling resources open.
@ -64,18 +62,11 @@ public class Scroll implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
keepAlive = readTimeValue(in);
}
in.readOptionalWriteable(TimeValue::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (keepAlive == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
keepAlive.writeTo(out);
}
out.writeOptionalWriteable(keepAlive);
}
}

View File

@ -579,7 +579,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
min = in.readInt();
max = in.readInt();
if (in.readBoolean()) {
keepAlive = TimeValue.readTimeValue(in);
keepAlive = new TimeValue(in);
}
if (in.readBoolean()) {
queueSize = SizeValue.readSizeValue(in);

View File

@ -28,6 +28,8 @@ import org.joda.time.PeriodType;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
@ -123,20 +125,22 @@ public class TimeValueTests extends ESTestCase {
TimeValue.parseTimeValue("10W", null, "test"));
}
private void assertEqualityAfterSerialize(TimeValue value) throws IOException {
private void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
value.writeTo(out);
assertEquals(expectedSize, out.size());
StreamInput in = StreamInput.wrap(out.bytes());
TimeValue inValue = TimeValue.readTimeValue(in);
TimeValue inValue = new TimeValue(in);
assertThat(inValue, equalTo(value));
}
public void testSerialize() throws Exception {
assertEqualityAfterSerialize(new TimeValue(100, TimeUnit.DAYS));
assertEqualityAfterSerialize(new TimeValue(-1));
assertEqualityAfterSerialize(new TimeValue(1, TimeUnit.NANOSECONDS));
assertEqualityAfterSerialize(new TimeValue(100, TimeUnit.DAYS), 8);
assertEqualityAfterSerialize(timeValueNanos(-1), 1);
assertEqualityAfterSerialize(timeValueNanos(1), 1);
assertEqualityAfterSerialize(timeValueSeconds(30), 6);
}
public void testFailOnUnknownUnits() {

View File

@ -299,9 +299,9 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
abortOnVersionConflict = in.readBoolean();
size = in.readVInt();
refresh = in.readBoolean();
timeout = TimeValue.readTimeValue(in);
timeout = new TimeValue(in);
consistency = WriteConsistencyLevel.fromId(in.readByte());
retryBackoffInitialTime = TimeValue.readTimeValue(in);
retryBackoffInitialTime = new TimeValue(in);
maxRetries = in.readVInt();
requestsPerSecond = in.readFloat();
}

View File

@ -168,10 +168,10 @@ public class BulkByScrollTask extends CancellableTask {
noops = in.readVLong();
bulkRetries = in.readVLong();
searchRetries = in.readVLong();
throttled = TimeValue.readTimeValue(in);
throttled = new TimeValue(in);
requestsPerSecond = in.readFloat();
reasonCancelled = in.readOptionalString();
throttledUntil = TimeValue.readTimeValue(in);
throttledUntil = new TimeValue(in);
}
@Override

View File

@ -153,7 +153,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
took = TimeValue.readTimeValue(in);
took = new TimeValue(in);
status = new BulkByScrollTask.Status(in);
int indexingFailuresCount = in.readVInt();
List<Failure> indexingFailures = new ArrayList<>(indexingFailuresCount);