Remove StreamOutput #writeOptionalStreamable and #writeStreamableList (#44602) (#44643)

remove usages of writeOptionalStreamable and writeStreambaleList

relates #34389.
This commit is contained in:
Tal Levy 2019-07-19 15:55:53 -07:00 committed by GitHub
parent 2e303fc5f7
commit 7c84636029
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 76 additions and 123 deletions

View File

@ -127,7 +127,7 @@ public class MultiSearchTemplateRequest extends ActionRequest implements Composi
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVInt(maxConcurrentSearchRequests); out.writeVInt(maxConcurrentSearchRequests);
out.writeStreamableList(requests); out.writeList(requests);
} }
@Override @Override

View File

@ -232,7 +232,7 @@ public class SearchTemplateRequest extends ActionRequest implements CompositeInd
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeOptionalStreamable(request); out.writeOptionalWriteable(request);
out.writeBoolean(simulate); out.writeBoolean(simulate);
out.writeBoolean(explain); out.writeBoolean(explain);
out.writeBoolean(profile); out.writeBoolean(profile);

View File

@ -82,7 +82,7 @@ public class SearchTemplateResponse extends ActionResponse implements StatusToXC
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBytesReference(source); out.writeOptionalBytesReference(source);
out.writeOptionalStreamable(response); out.writeOptionalWriteable(response);
} }
public static SearchTemplateResponse fromXContent(XContentParser parser) throws IOException { public static SearchTemplateResponse fromXContent(XContentParser parser) throws IOException {

View File

@ -45,7 +45,7 @@ public class NodesHotThreadsResponse extends BaseNodesResponse<NodeHotThreads> {
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeHotThreads> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeHotThreads> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
} }

View File

@ -52,7 +52,7 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeInfo> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeInfo> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -55,7 +55,7 @@ public class NodesReloadSecureSettingsResponse extends BaseNodesResponse<NodesRe
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodesReloadSecureSettingsResponse.NodeResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodesReloadSecureSettingsResponse.NodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -49,7 +49,7 @@ public class NodesStatsResponse extends BaseNodesResponse<NodeStats> implements
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeStats> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeStats> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -53,7 +53,7 @@ public class NodesUsageResponse extends BaseNodesResponse<NodeUsage> implements
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeUsage> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeUsage> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -159,7 +159,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeSnapshotStatus> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeSnapshotStatus> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
} }

View File

@ -109,7 +109,7 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
@Override @Override
protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException {
// nodeStats and indicesStats are rebuilt from nodes // nodeStats and indicesStats are rebuilt from nodes
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -52,7 +52,7 @@ public class IndicesSegmentResponse extends BroadcastResponse {
super(in); super(in);
shards = new ShardSegments[in.readVInt()]; shards = new ShardSegments[in.readVInt()];
for (int i = 0; i < shards.length; i++) { for (int i = 0; i < shards.length; i++) {
shards[i] = ShardSegments.readShardSegments(in); shards[i] = new ShardSegments(in);
} }
} }

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.indices.segments;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.Segment;
import java.io.IOException; import java.io.IOException;
@ -31,20 +31,30 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
public class ShardSegments implements Streamable, Iterable<Segment> { public class ShardSegments implements Writeable, Iterable<Segment> {
private ShardRouting shardRouting; private ShardRouting shardRouting;
private List<Segment> segments; private List<Segment> segments;
ShardSegments() {
}
ShardSegments(ShardRouting shardRouting, List<Segment> segments) { ShardSegments(ShardRouting shardRouting, List<Segment> segments) {
this.shardRouting = shardRouting; this.shardRouting = shardRouting;
this.segments = segments; this.segments = segments;
} }
ShardSegments(StreamInput in) throws IOException {
shardRouting = new ShardRouting(in);
int size = in.readVInt();
if (size == 0) {
segments = Collections.emptyList();
} else {
segments = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
segments.add(Segment.readSegment(in));
}
}
}
@Override @Override
public Iterator<Segment> iterator() { public Iterator<Segment> iterator() {
return segments.iterator(); return segments.iterator();
@ -78,26 +88,6 @@ public class ShardSegments implements Streamable, Iterable<Segment> {
return count; return count;
} }
public static ShardSegments readShardSegments(StreamInput in) throws IOException {
ShardSegments shard = new ShardSegments();
shard.readFrom(in);
return shard;
}
@Override
public void readFrom(StreamInput in) throws IOException {
shardRouting = new ShardRouting(in);
int size = in.readVInt();
if (size == 0) {
segments = Collections.emptyList();
} else {
segments = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
segments.add(Segment.readSegment(in));
}
}
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out); shardRouting.writeTo(out);

View File

@ -74,7 +74,7 @@ public class TransportIndicesSegmentsAction
@Override @Override
protected ShardSegments readShardResult(StreamInput in) throws IOException { protected ShardSegments readShardResult(StreamInput in) throws IOException {
return ShardSegments.readShardSegments(in); return new ShardSegments(in);
} }
@Override @Override

View File

@ -248,22 +248,22 @@ public class CommonStats implements Writeable, ToXContentFragment {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalStreamable(docs); out.writeOptionalWriteable(docs);
out.writeOptionalStreamable(store); out.writeOptionalWriteable(store);
out.writeOptionalStreamable(indexing); out.writeOptionalWriteable(indexing);
out.writeOptionalStreamable(get); out.writeOptionalWriteable(get);
out.writeOptionalWriteable(search); out.writeOptionalWriteable(search);
out.writeOptionalStreamable(merge); out.writeOptionalWriteable(merge);
out.writeOptionalStreamable(refresh); out.writeOptionalWriteable(refresh);
out.writeOptionalStreamable(flush); out.writeOptionalWriteable(flush);
out.writeOptionalStreamable(warmer); out.writeOptionalWriteable(warmer);
out.writeOptionalStreamable(queryCache); out.writeOptionalWriteable(queryCache);
out.writeOptionalStreamable(fieldData); out.writeOptionalWriteable(fieldData);
out.writeOptionalWriteable(completion); out.writeOptionalWriteable(completion);
out.writeOptionalStreamable(segments); out.writeOptionalWriteable(segments);
out.writeOptionalStreamable(translog); out.writeOptionalWriteable(translog);
out.writeOptionalStreamable(requestCache); out.writeOptionalWriteable(requestCache);
out.writeOptionalStreamable(recoveryStats); out.writeOptionalWriteable(recoveryStats);
} }
public void add(CommonStats stats) { public void add(CommonStats stats) {

View File

@ -22,13 +22,13 @@ package org.elasticsearch.action.admin.indices.upgrade.post;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
class ShardUpgradeResult implements Streamable { class ShardUpgradeResult implements Writeable {
private ShardId shardId; private ShardId shardId;
@ -38,10 +38,6 @@ class ShardUpgradeResult implements Streamable {
private boolean primary; private boolean primary;
ShardUpgradeResult() {
}
ShardUpgradeResult(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) { ShardUpgradeResult(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
this.shardId = shardId; this.shardId = shardId;
this.primary = primary; this.primary = primary;
@ -49,6 +45,17 @@ class ShardUpgradeResult implements Streamable {
this.oldestLuceneSegment = oldestLuceneSegment; this.oldestLuceneSegment = oldestLuceneSegment;
} }
ShardUpgradeResult(StreamInput in) throws IOException {
shardId = new ShardId(in);
primary = in.readBoolean();
upgradeVersion = Version.readVersion(in);
try {
oldestLuceneSegment = org.apache.lucene.util.Version.parse(in.readString());
} catch (ParseException ex) {
throw new IOException("failed to parse lucene version [" + oldestLuceneSegment + "]", ex);
}
}
public ShardId getShardId() { public ShardId getShardId() {
return shardId; return shardId;
} }
@ -65,20 +72,6 @@ class ShardUpgradeResult implements Streamable {
return primary; return primary;
} }
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = new ShardId(in);
primary = in.readBoolean();
upgradeVersion = Version.readVersion(in);
try {
oldestLuceneSegment = org.apache.lucene.util.Version.parse(in.readString());
} catch (ParseException ex) {
throw new IOException("failed to parse lucene version [" + oldestLuceneSegment + "]", ex);
}
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out); shardId.writeTo(out);

View File

@ -130,9 +130,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
@Override @Override
protected ShardUpgradeResult readShardResult(StreamInput in) throws IOException { protected ShardUpgradeResult readShardResult(StreamInput in) throws IOException {
ShardUpgradeResult result = new ShardUpgradeResult(); return new ShardUpgradeResult(in);
result.readFrom(in);
return result;
} }
@Override @Override

View File

@ -42,7 +42,6 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -76,7 +75,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
*/ */
public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRequest<Request>, public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRequest<Request>,
Response extends BroadcastResponse, Response extends BroadcastResponse,
ShardOperationResult extends Streamable> extends HandledTransportAction<Request, Response> { ShardOperationResult extends Writeable> extends HandledTransportAction<Request, Response> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportService transportService; private final TransportService transportService;
@ -547,7 +546,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
out.writeVInt(totalShards); out.writeVInt(totalShards);
out.writeVInt(results.size()); out.writeVInt(results.size());
for (ShardOperationResult result : results) { for (ShardOperationResult result : results) {
out.writeOptionalStreamable(result); out.writeOptionalWriteable(result);
} }
out.writeBoolean(exceptions != null); out.writeBoolean(exceptions != null);
if (exceptions != null) { if (exceptions != null) {
@ -560,19 +559,15 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
* Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting) shardOperation} for * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting) shardOperation} for
* which there is no shard-level return value. * which there is no shard-level return value.
*/ */
public static final class EmptyResult implements Streamable { public static final class EmptyResult implements Writeable {
public static EmptyResult INSTANCE = new EmptyResult(); public static EmptyResult INSTANCE = new EmptyResult();
private EmptyResult() { private EmptyResult() {}
}
private EmptyResult(StreamInput in) {}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) { }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
public static EmptyResult readEmptyResultFrom(StreamInput in) { public static EmptyResult readEmptyResultFrom(StreamInput in) {
return INSTANCE; return INSTANCE;

View File

@ -977,8 +977,7 @@ public abstract class StreamInput extends InputStream {
} }
/** /**
* Reads a list of objects. The list is expected to have been written using {@link StreamOutput#writeList(List)} or * Reads a list of objects. The list is expected to have been written using {@link StreamOutput#writeList(List)}.
* {@link StreamOutput#writeStreamableList(List)}.
* *
* @return the list of objects * @return the list of objects
* @throws IOException if an I/O exception occurs reading the list * @throws IOException if an I/O exception occurs reading the list

View File

@ -874,18 +874,6 @@ public abstract class StreamOutput extends OutputStream {
writeOptionalArray((out, value) -> value.writeTo(out), array); writeOptionalArray((out, value) -> value.writeTo(out), array);
} }
/**
* Serializes a potential null value.
*/
public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOException {
if (streamable != null) {
writeBoolean(true);
streamable.writeTo(this);
} else {
writeBoolean(false);
}
}
public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException { public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
if (writeable != null) { if (writeable != null) {
writeBoolean(true); writeBoolean(true);
@ -1078,16 +1066,6 @@ public abstract class StreamOutput extends OutputStream {
} }
} }
/**
* Writes a list of {@link Streamable} objects
*/
public void writeStreamableList(List<? extends Streamable> list) throws IOException {
writeVInt(list.size());
for (Streamable obj: list) {
obj.writeTo(this);
}
}
/** /**
* Writes a collection to this stream. The corresponding collection can be read from a stream input using * Writes a collection to this stream. The corresponding collection can be read from a stream input using
* {@link StreamInput#readList(Writeable.Reader)}. * {@link StreamInput#readList(Writeable.Reader)}.

View File

@ -116,7 +116,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeGatewayMetaState> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeGatewayMetaState> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
} }

View File

@ -214,7 +214,7 @@ public class TransportNodesListGatewayStartedShards extends
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeGatewayStartedShards> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeGatewayStartedShards> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
} }

View File

@ -280,7 +280,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeStoreFilesMetaData> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeStoreFilesMetaData> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
} }

View File

@ -123,7 +123,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
public int failureCount() { public int failureCount() {

View File

@ -153,7 +153,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
public int getFailureCount() { public int getFailureCount() {

View File

@ -316,7 +316,7 @@ public class TransportNodesActionTests extends ESTestCase {
@Override @Override
protected void writeNodesTo(StreamOutput out, List<TestNodeResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<TestNodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
} }

View File

@ -24,7 +24,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -93,7 +93,7 @@ public class TransportReloadAnalyzersAction
return new ReloadResult(shardRouting.index().getName(), shardRouting.currentNodeId(), reloadedSearchAnalyzers); return new ReloadResult(shardRouting.index().getName(), shardRouting.currentNodeId(), reloadedSearchAnalyzers);
} }
static final class ReloadResult implements Streamable { static final class ReloadResult implements Writeable {
String index; String index;
String nodeId; String nodeId;
List<String> reloadedSearchAnalyzers; List<String> reloadedSearchAnalyzers;

View File

@ -35,7 +35,7 @@ public class NodesDeprecationCheckResponse extends BaseNodesResponse<NodesDeprec
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodesDeprecationCheckAction.NodeResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodesDeprecationCheckAction.NodeResponse> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -37,7 +37,7 @@ public class ClearRealmCacheResponse extends BaseNodesResponse<ClearRealmCacheRe
@Override @Override
protected void writeNodesTo(StreamOutput out, List<ClearRealmCacheResponse.Node> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<ClearRealmCacheResponse.Node> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -40,7 +40,7 @@ public class ClearRolesCacheResponse extends BaseNodesResponse<ClearRolesCacheRe
@Override @Override
protected void writeNodesTo(StreamOutput out, List<ClearRolesCacheResponse.Node> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<ClearRolesCacheResponse.Node> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -54,7 +54,7 @@ public class WatcherStatsResponse extends BaseNodesResponse<WatcherStatsResponse
@Override @Override
protected void writeNodesTo(StreamOutput out, List<Node> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<Node> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override

View File

@ -37,7 +37,7 @@ public class SqlStatsResponse extends BaseNodesResponse<SqlStatsResponse.NodeSta
@Override @Override
protected void writeNodesTo(StreamOutput out, List<NodeStatsResponse> nodes) throws IOException { protected void writeNodesTo(StreamOutput out, List<NodeStatsResponse> nodes) throws IOException {
out.writeStreamableList(nodes); out.writeList(nodes);
} }
@Override @Override