ThreadPool.Info and SizeValue to implement Writeable rather than Streamable

This commit is contained in:
javanna 2016-09-01 19:05:13 +02:00 committed by Luca Cavanna
parent 84b8c9de19
commit 774244a61f
4 changed files with 45 additions and 91 deletions

View File

@ -23,22 +23,14 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
*
*/
public class SizeValue implements Streamable {
public class SizeValue implements Writeable {
private long size;
private SizeUnit sizeUnit;
private SizeValue() {
}
private final long size;
private final SizeUnit sizeUnit;
public SizeValue(long singles) {
this(singles, SizeUnit.SINGLE);
@ -52,6 +44,16 @@ public class SizeValue implements Streamable {
this.sizeUnit = sizeUnit;
}
public SizeValue(StreamInput in) throws IOException {
size = in.readVLong();
sizeUnit = SizeUnit.SINGLE;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(singles());
}
public long singles() {
return sizeUnit.toSingles(size);
}
@ -194,23 +196,6 @@ public class SizeValue implements Streamable {
return new SizeValue(singles, SizeUnit.SINGLE);
}
public static SizeValue readSizeValue(StreamInput in) throws IOException {
SizeValue sizeValue = new SizeValue();
sizeValue.readFrom(in);
return sizeValue;
}
@Override
public void readFrom(StreamInput in) throws IOException {
size = in.readVLong();
sizeUnit = SizeUnit.SINGLE;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(singles());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -27,7 +27,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
@ -529,18 +529,14 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
}
public static class Info implements Streamable, ToXContent {
public static class Info implements Writeable, ToXContent {
private String name;
private ThreadPoolType type;
private int min;
private int max;
private TimeValue keepAlive;
private SizeValue queueSize;
Info() {
}
private final String name;
private final ThreadPoolType type;
private final int min;
private final int max;
private final TimeValue keepAlive;
private final SizeValue queueSize;
public Info(String name, ThreadPoolType type) {
this(name, type, -1);
@ -559,6 +555,25 @@ public class ThreadPool extends AbstractComponent implements Closeable {
this.queueSize = queueSize;
}
public Info(StreamInput in) throws IOException {
name = in.readString();
type = ThreadPoolType.fromType(in.readString());
min = in.readInt();
max = in.readInt();
keepAlive = in.readOptionalWriteable(TimeValue::new);
queueSize = in.readOptionalWriteable(SizeValue::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type.getType());
out.writeInt(min);
out.writeInt(max);
out.writeOptionalWriteable(keepAlive);
out.writeOptionalWriteable(queueSize);
}
public String getName() {
return this.name;
}
@ -585,46 +600,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
return this.queueSize;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
type = ThreadPoolType.fromType(in.readString());
min = in.readInt();
max = in.readInt();
if (in.readBoolean()) {
keepAlive = new TimeValue(in);
}
if (in.readBoolean()) {
queueSize = SizeValue.readSizeValue(in);
}
in.readBoolean(); // here to conform with removed waitTime
in.readBoolean(); // here to conform with removed rejected setting
in.readBoolean(); // here to conform with queue type
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type.getType());
out.writeInt(min);
out.writeInt(max);
if (keepAlive == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
keepAlive.writeTo(out);
}
if (queueSize == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
queueSize.writeTo(out);
}
out.writeBoolean(false); // here to conform with removed waitTime
out.writeBoolean(false); // here to conform with removed rejected setting
out.writeBoolean(false); // here to conform with queue type
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);
@ -654,7 +629,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
static final String KEEP_ALIVE = "keep_alive";
static final String QUEUE_SIZE = "queue_size";
}
}
/**

View File

@ -43,9 +43,7 @@ public class ThreadPoolInfo implements Writeable, Iterable<ThreadPool.Info>, ToX
int size = in.readVInt();
List<ThreadPool.Info> infos = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
ThreadPool.Info info = new ThreadPool.Info();
info.readFrom(in);
infos.add(info);
infos.add(new ThreadPool.Info(in));
}
this.infos = Collections.unmodifiableList(infos);
}

View File

@ -59,8 +59,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
info.writeTo(output);
StreamInput input = output.bytes().streamInput();
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
ThreadPool.Info newInfo = new ThreadPool.Info(input);
assertThat(newInfo.getQueueSize().singles(), is(10000L));
}
@ -71,8 +70,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
info.writeTo(output);
StreamInput input = output.bytes().streamInput();
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
ThreadPool.Info newInfo = new ThreadPool.Info(input);
assertThat(newInfo.getQueueSize(), is(nullValue()));
}
@ -126,8 +124,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
info.writeTo(output);
StreamInput input = output.bytes().streamInput();
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
ThreadPool.Info newInfo = new ThreadPool.Info(input);
assertThat(newInfo.getThreadPoolType(), is(threadPoolType));
}