remove usages of #readOptionalStreamable, #readStreamableList. (#44578) (#44598)

This commit removes references to Streamable from StreamInput.

This is all a part of the effort to remove Streamable usage.

relates #34389.
This commit is contained in:
Tal Levy 2019-07-18 16:19:02 -07:00 committed by GitHub
parent 5a05bd76b4
commit 03f5084ac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 79 additions and 133 deletions

View File

@ -69,7 +69,7 @@ public class RestoreSnapshotResponse extends ActionResponse implements ToXConten
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalStreamable(restoreInfo);
out.writeOptionalWriteable(restoreInfo);
}
public RestStatus status() {

View File

@ -139,7 +139,7 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment {
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
commonStats.writeTo(out);
out.writeOptionalStreamable(commitStats);
out.writeOptionalWriteable(commitStats);
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);

View File

@ -75,7 +75,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import static org.elasticsearch.ElasticsearchException.readStackTrace;
@ -825,20 +824,6 @@ public abstract class StreamInput extends InputStream {
return readBoolean() ? readArray(reader, arraySupplier) : null;
}
/**
* Serializes a potential null value.
*/
@Nullable
public <T extends Streamable> T readOptionalStreamable(Supplier<T> supplier) throws IOException {
if (readBoolean()) {
T streamable = supplier.get();
streamable.readFrom(this);
return streamable;
} else {
return null;
}
}
@Nullable
public <T extends Writeable> T readOptionalWriteable(Writeable.Reader<T> reader) throws IOException {
if (readBoolean()) {
@ -991,29 +976,6 @@ public abstract class StreamInput extends InputStream {
return null;
}
/**
* Read a {@link List} of {@link Streamable} objects, using the {@code constructor} to instantiate each instance.
* <p>
* This is expected to take the form:
* <code>
* List&lt;MyStreamableClass&gt; list = in.readStreamList(MyStreamableClass::new);
* </code>
*
* @param constructor Streamable instance creator
* @return Never {@code null}.
* @throws IOException if any step fails
*/
public <T extends Streamable> List<T> readStreamableList(Supplier<T> constructor) throws IOException {
int count = readArraySize();
List<T> builder = new ArrayList<>(count);
for (int i=0; i<count; i++) {
T instance = constructor.get();
instance.readFrom(this);
builder.add(instance);
}
return builder;
}
/**
* Reads a list of objects. The list is expected to have been written using {@link StreamOutput#writeList(List)} or
* {@link StreamOutput#writeStreamableList(List)}.

View File

@ -22,7 +22,7 @@ import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -32,12 +32,12 @@ import java.util.Base64;
import java.util.Map;
/** a class the returns dynamic information with respect to the last commit point of this shard */
public final class CommitStats implements Streamable, ToXContentFragment {
public final class CommitStats implements Writeable, ToXContentFragment {
private Map<String, String> userData;
private long generation;
private String id; // lucene commit id in base 64;
private int numDocs;
private final Map<String, String> userData;
private final long generation;
private final String id; // lucene commit id in base 64;
private final int numDocs;
public CommitStats(SegmentInfos segmentInfos) {
// clone the map to protect against concurrent changes
@ -48,11 +48,19 @@ public final class CommitStats implements Streamable, ToXContentFragment {
numDocs = Lucene.getNumDocs(segmentInfos);
}
private CommitStats() {
CommitStats(StreamInput in) throws IOException {
MapBuilder<String, String> builder = MapBuilder.newMapBuilder();
for (int i = in.readVInt(); i > 0; i--) {
builder.put(in.readString(), in.readString());
}
userData = builder.immutableMap();
generation = in.readLong();
id = in.readOptionalString();
numDocs = in.readInt();
}
public static CommitStats readOptionalCommitStatsFrom(StreamInput in) throws IOException {
return in.readOptionalStreamable(CommitStats::new);
return in.readOptionalWriteable(CommitStats::new);
}
@ -90,18 +98,6 @@ public final class CommitStats implements Streamable, ToXContentFragment {
return numDocs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
MapBuilder<String, String> builder = MapBuilder.newMapBuilder();
for (int i = in.readVInt(); i > 0; i--) {
builder.put(in.readString(), in.readString());
}
userData = builder.immutableMap();
generation = in.readLong();
id = in.readOptionalString();
numDocs = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(userData.size());

View File

@ -22,7 +22,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -40,7 +40,7 @@ import java.util.Objects;
* <p>
* Returned as part of {@link org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse}
*/
public class RestoreInfo implements ToXContentObject, Streamable {
public class RestoreInfo implements ToXContentObject, Writeable {
private String name;
@ -61,6 +61,18 @@ public class RestoreInfo implements ToXContentObject, Streamable {
this.successfulShards = successfulShards;
}
public RestoreInfo(StreamInput in) throws IOException {
name = in.readString();
int size = in.readVInt();
List<String> indicesListBuilder = new ArrayList<>();
for (int i = 0; i < size; i++) {
indicesListBuilder.add(in.readString());
}
indices = Collections.unmodifiableList(indicesListBuilder);
totalShards = in.readVInt();
successfulShards = in.readVInt();
}
/**
* Snapshot name
*
@ -160,19 +172,6 @@ public class RestoreInfo implements ToXContentObject, Streamable {
return PARSER.parse(parser, null);
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
int size = in.readVInt();
List<String> indicesListBuilder = new ArrayList<>();
for (int i = 0; i < size; i++) {
indicesListBuilder.add(in.readString());
}
indices = Collections.unmodifiableList(indicesListBuilder);
totalShards = in.readVInt();
successfulShards = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
@ -191,7 +190,7 @@ public class RestoreInfo implements ToXContentObject, Streamable {
* @return restore info
*/
public static RestoreInfo readOptionalRestoreInfo(StreamInput in) throws IOException {
return in.readOptionalStreamable(RestoreInfo::new);
return in.readOptionalWriteable(RestoreInfo::new);
}
@Override

View File

@ -432,18 +432,18 @@ public class BytesStreamsTests extends ESTestCase {
public void testWriteStreamableList() throws IOException {
final int size = randomIntBetween(0, 5);
final List<TestStreamable> expected = new ArrayList<>(size);
final List<TestWriteable> expected = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
expected.add(new TestStreamable(randomBoolean()));
expected.add(new TestWriteable(randomBoolean()));
}
final BytesStreamOutput out = new BytesStreamOutput();
out.writeStreamableList(expected);
out.writeList(expected);
final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
final List<TestStreamable> loaded = in.readStreamableList(TestStreamable::new);
final List<TestWriteable> loaded = in.readList(TestWriteable::new);
assertThat(loaded, hasSize(expected.size()));
@ -587,18 +587,15 @@ public class BytesStreamsTests extends ESTestCase {
}
}
private static class TestStreamable implements Streamable {
private static class TestWriteable implements Writeable {
private boolean value;
TestStreamable() { }
TestStreamable(boolean value) {
TestWriteable(boolean value) {
this.value = value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
TestWriteable(StreamInput in) throws IOException {
value = in.readBoolean();
}

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.core.watcher.execution;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,16 +16,13 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
public class QueuedWatch implements Streamable, ToXContentObject {
public class QueuedWatch implements Writeable, ToXContentObject {
private String watchId;
private String watchRecordId;
private ZonedDateTime triggeredTime;
private ZonedDateTime executionTime;
public QueuedWatch() {
}
public QueuedWatch(WatchExecutionContext ctx) {
this.watchId = ctx.id().watchId();
this.watchRecordId = ctx.id().value();
@ -33,6 +30,13 @@ public class QueuedWatch implements Streamable, ToXContentObject {
this.executionTime = ctx.executionTime();
}
public QueuedWatch(StreamInput in) throws IOException {
watchId = in.readString();
watchRecordId = in.readString();
triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
}
public String watchId() {
return watchId;
}
@ -53,14 +57,6 @@ public class QueuedWatch implements Streamable, ToXContentObject {
this.executionTime = executionTime;
}
@Override
public void readFrom(StreamInput in) throws IOException {
watchId = in.readString();
watchRecordId = in.readString();
triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(watchId);

View File

@ -7,7 +7,7 @@ package org.elasticsearch.xpack.core.watcher.execution;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
@ -18,18 +18,15 @@ import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Map;
public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
public class WatchExecutionSnapshot implements Writeable, ToXContentObject {
private String watchId;
private String watchRecordId;
private ZonedDateTime triggeredTime;
private ZonedDateTime executionTime;
private ExecutionPhase phase;
private final String watchId;
private final String watchRecordId;
private final ZonedDateTime triggeredTime;
private final ZonedDateTime executionTime;
private final ExecutionPhase phase;
private final StackTraceElement[] executionStackTrace;
private String[] executedActions;
private StackTraceElement[] executionStackTrace;
public WatchExecutionSnapshot() {
}
public WatchExecutionSnapshot(WatchExecutionContext context, StackTraceElement[] executionStackTrace) {
watchId = context.id().watchId();
@ -48,6 +45,23 @@ public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
this.executionStackTrace = executionStackTrace;
}
public WatchExecutionSnapshot(StreamInput in) throws IOException {
watchId = in.readString();
watchRecordId = in.readString();
triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
phase = ExecutionPhase.resolve(in.readString());
int size = in.readVInt();
executionStackTrace = new StackTraceElement[size];
for (int i = 0; i < size; i++) {
String declaringClass = in.readString();
String methodName = in.readString();
String fileName = in.readOptionalString();
int lineNumber = in.readInt();
executionStackTrace[i] = new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
}
}
public String watchId() {
return watchId;
}
@ -72,24 +86,6 @@ public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
return executionStackTrace;
}
@Override
public void readFrom(StreamInput in) throws IOException {
watchId = in.readString();
watchRecordId = in.readString();
triggeredTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
executionTime = Instant.ofEpochMilli(in.readVLong()).atZone(ZoneOffset.UTC);
phase = ExecutionPhase.resolve(in.readString());
int size = in.readVInt();
executionStackTrace = new StackTraceElement[size];
for (int i = 0; i < size; i++) {
String declaringClass = in.readString();
String methodName = in.readString();
String fileName = in.readOptionalString();
int lineNumber = in.readInt();
executionStackTrace[i] = new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(watchId);

View File

@ -100,10 +100,10 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
watcherState = WatcherState.fromId(in.readByte());
if (in.readBoolean()) {
snapshots = in.readStreamableList(WatchExecutionSnapshot::new);
snapshots = in.readList(WatchExecutionSnapshot::new);
}
if (in.readBoolean()) {
queuedWatches = in.readStreamableList(QueuedWatch::new);
queuedWatches = in.readList(QueuedWatch::new);
}
if (in.readBoolean()) {
stats = Counters.read(in);
@ -194,11 +194,11 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
out.writeBoolean(snapshots != null);
if (snapshots != null) {
out.writeStreamableList(snapshots);
out.writeList(snapshots);
}
out.writeBoolean(queuedWatches != null);
if (queuedWatches != null) {
out.writeStreamableList(queuedWatches);
out.writeList(queuedWatches);
}
out.writeBoolean(stats != null);
if (stats != null) {
@ -240,4 +240,4 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
return builder;
}
}
}
}