Convert Transport Request/Response to Writeable (#44636) (#44654)

This commit converts all remaining TransportRequest and
TransportResponse classes to implement Writeable, and disallows
Streamable implementations.

relates #34389
This commit is contained in:
Ryan Ernst 2019-07-20 11:25:58 -07:00 committed by GitHub
parent f4ee2e9e91
commit 4c05d25ec7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
90 changed files with 407 additions and 871 deletions

View File

@ -191,7 +191,7 @@ public class SyncedFlushResponseTests extends ESTestCase {
);
} else {
successful++;
shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse());
shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse((String) null));
}
}
shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses));

View File

@ -47,11 +47,6 @@ public abstract class ActionRequest extends TransportRequest {
return false;
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable " + getClass().getName());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -35,9 +35,4 @@ public abstract class ActionResponse extends TransportResponse {
public ActionResponse(StreamInput in) throws IOException {
super(in);
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -95,7 +95,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
super(in);
timestamp = in.readVLong();
if (in.readBoolean()) {
indices = NodeIndicesStats.readIndicesStats(in);
indices = new NodeIndicesStats(in);
}
os = in.readOptionalWriteable(OsStats::new);
process = in.readOptionalWriteable(ProcessStats::new);

View File

@ -50,7 +50,12 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
private String failure;
private SnapshotIndexShardStatus() {
public SnapshotIndexShardStatus(StreamInput in) throws IOException {
super(in);
stage = SnapshotIndexShardStage.fromValue(in.readByte());
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
}
SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage) {
@ -127,13 +132,6 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
return failure;
}
public static SnapshotIndexShardStatus readShardSnapshotStatus(StreamInput in) throws IOException {
SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus();
shardStatus.readFrom(in);
return shardStatus;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -143,15 +141,6 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
out.writeOptionalString(failure);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
stage = SnapshotIndexShardStage.fromValue(in.readByte());
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
}
static final class Fields {
static final String STAGE = "stage";
static final String REASON = "reason";

View File

@ -78,7 +78,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
int size = in.readVInt();
List<SnapshotIndexShardStatus> builder = new ArrayList<>();
for (int i = 0; i < size; i++) {
builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in));
builder.add(new SnapshotIndexShardStatus(in));
}
shards = Collections.unmodifiableList(builder);
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {

View File

@ -198,7 +198,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
Map<ShardId, SnapshotIndexShardStatus> shardMapBuilder = new HashMap<>(numberOfShards);
for (int j = 0; j < numberOfShards; j++) {
ShardId shardId = new ShardId(in);
SnapshotIndexShardStatus status = SnapshotIndexShardStatus.readShardSnapshotStatus(in);
SnapshotIndexShardStatus status = new SnapshotIndexShardStatus(in);
shardMapBuilder.put(shardId, status);
}
snapshotMapBuilder.put(snapshot, unmodifiableMap(shardMapBuilder));

View File

@ -65,7 +65,7 @@ public class SyncedFlushResponse extends ActionResponse implements ToXContentFra
List<ShardsSyncedFlushResult> shardsSyncedFlushResults = new ArrayList<>();
int numShards = in.readInt();
for (int j =0; j< numShards; j++) {
shardsSyncedFlushResults.add(ShardsSyncedFlushResult.readShardsSyncedFlushResult(in));
shardsSyncedFlushResults.add(new ShardsSyncedFlushResult(in));
}
tmpShardsResultPerIndex.put(index, shardsSyncedFlushResults);
}

View File

@ -50,7 +50,7 @@ public class ShardSegments implements Writeable, Iterable<Segment> {
} else {
segments = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
segments.add(Segment.readSegment(in));
segments.add(new Segment(in));
}
}
}

View File

@ -36,7 +36,12 @@ public class ShardUpgradeStatus extends BroadcastShardResponse {
private long toUpgradeBytesAncient;
ShardUpgradeStatus() {
public ShardUpgradeStatus(StreamInput in) throws IOException {
super(in);
shardRouting = new ShardRouting(in);
totalBytes = in.readLong();
toUpgradeBytes = in.readLong();
toUpgradeBytesAncient = in.readLong();
}
ShardUpgradeStatus(ShardRouting shardRouting, long totalBytes, long toUpgradeBytes, long upgradeBytesAncient) {
@ -64,21 +69,6 @@ public class ShardUpgradeStatus extends BroadcastShardResponse {
return toUpgradeBytesAncient;
}
public static ShardUpgradeStatus readShardUpgradeStatus(StreamInput in) throws IOException {
ShardUpgradeStatus shard = new ShardUpgradeStatus();
shard.readFrom(in);
return shard;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardRouting = new ShardRouting(in);
totalBytes = in.readLong();
toUpgradeBytes = in.readLong();
toUpgradeBytesAncient = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -76,7 +76,7 @@ public class TransportUpgradeStatusAction
@Override
protected ShardUpgradeStatus readShardResult(StreamInput in) throws IOException {
return ShardUpgradeStatus.readShardUpgradeStatus(in);
return new ShardUpgradeStatus(in);
}
@Override

View File

@ -43,7 +43,7 @@ public class UpgradeStatusResponse extends BroadcastResponse {
super(in);
shards = new ShardUpgradeStatus[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ShardUpgradeStatus.readShardUpgradeStatus(in);
shards[i] = new ShardUpgradeStatus(in);
}
}

View File

@ -29,9 +29,11 @@ import java.io.IOException;
public final class ShardUpgradeRequest extends BroadcastShardRequest {
private UpgradeRequest request = new UpgradeRequest();
private UpgradeRequest request;
public ShardUpgradeRequest() {
public ShardUpgradeRequest(StreamInput in) throws IOException {
super(in);
request = new UpgradeRequest(in);
}
ShardUpgradeRequest(ShardId shardId, UpgradeRequest request) {
@ -39,12 +41,6 @@ public final class ShardUpgradeRequest extends BroadcastShardRequest {
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -39,8 +39,11 @@ class ShardValidateQueryResponse extends BroadcastShardResponse {
private String error;
ShardValidateQueryResponse() {
ShardValidateQueryResponse(StreamInput in) throws IOException {
super(in);
valid = in.readBoolean();
explanation = in.readOptionalString();
error = in.readOptionalString();
}
ShardValidateQueryResponse(ShardId shardId, boolean valid, String explanation, String error) {
@ -62,14 +65,6 @@ class ShardValidateQueryResponse extends BroadcastShardResponse {
return error;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
valid = in.readBoolean();
explanation = in.readOptionalString();
error = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.ParsedQuery;
@ -122,8 +123,8 @@ public class TransportValidateQueryAction extends TransportBroadcastAction<
}
@Override
protected ShardValidateQueryResponse newShardResponse() {
return new ShardValidateQueryResponse();
protected ShardValidateQueryResponse readShardResponse(StreamInput in) throws IOException {
return new ShardValidateQueryResponse(in);
}
@Override

View File

@ -91,7 +91,7 @@ public class ExplainResponse extends ActionResponse implements StatusToXContentO
explanation = readExplanation(in);
}
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
getResult = new GetResult(in);
}
}

View File

@ -50,7 +50,7 @@ public class GetResponse extends ActionResponse implements Iterable<DocumentFiel
GetResponse(StreamInput in) throws IOException {
super(in);
getResult = GetResult.readGetResult(in);
getResult = new GetResult(in);
}
public GetResponse(GetResult getResult) {

View File

@ -266,9 +266,6 @@ public class SearchTransportService {
private boolean freed;
SearchFreeContextResponse() {
}
SearchFreeContextResponse(StreamInput in) throws IOException {
freed = in.readBoolean();
}
@ -281,12 +278,6 @@ public class SearchTransportService {
return freed;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
freed = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(freed);
@ -306,8 +297,9 @@ public class SearchTransportService {
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
ThreadPool.Names.SAME, (request, channel, task) -> {
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ThreadPool.Names.SAME,
TransportRequest.Empty::new,
(request, channel, task) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});

View File

@ -30,8 +30,9 @@ public abstract class BroadcastShardResponse extends TransportResponse {
ShardId shardId;
protected BroadcastShardResponse() {
protected BroadcastShardResponse(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
}
protected BroadcastShardResponse(ShardId shardId) {
@ -46,12 +47,6 @@ public abstract class BroadcastShardResponse extends TransportResponse {
return this.shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);

View File

@ -87,7 +87,7 @@ public abstract class TransportBroadcastAction<
protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);
protected abstract ShardResponse newShardResponse();
protected abstract ShardResponse readShardResponse(StreamInput in) throws IOException;
protected abstract ShardResponse shardOperation(ShardRequest request, Task task) throws IOException;
@ -180,9 +180,7 @@ public abstract class TransportBroadcastAction<
new TransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse read(StreamInput in) throws IOException {
ShardResponse response = newShardResponse();
response.readFrom(in);
return response;
return readShardResponse(in);
}
@Override

View File

@ -111,7 +111,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
transportNodeBroadcastAction = actionName + "[n]";
transportService.registerRequestHandler(transportNodeBroadcastAction, NodeRequest::new, executor, false, canTripCircuitBreaker,
transportService.registerRequestHandler(transportNodeBroadcastAction, executor, false, canTripCircuitBreaker, NodeRequest::new,
new BroadcastByNodeTransportRequestHandler());
}
@ -452,7 +452,11 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
protected Request indicesLevelRequest;
public NodeRequest() {
public NodeRequest(StreamInput in) throws IOException {
super(in);
indicesLevelRequest = readRequestFrom(in);
shards = in.readList(ShardRouting::new);
nodeId = in.readString();
}
public NodeRequest(String nodeId, Request request, List<ShardRouting> shards) {
@ -479,14 +483,6 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
return indicesLevelRequest.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indicesLevelRequest = readRequestFrom(in);
shards = in.readList(ShardRouting::new);
nodeId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -38,11 +38,6 @@ public abstract class BaseNodeRequest extends TransportRequest {
}
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -50,11 +50,6 @@ public abstract class BaseNodeResponse extends TransportResponse {
return node;
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);

View File

@ -1101,11 +1101,6 @@ public abstract class TransportReplicationAction<
return "[" + request.getDescription() + "] for aID [" + targetAllocationID + "] and term [" + primaryTerm + "]";
}
@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(targetAllocationID);
@ -1159,11 +1154,6 @@ public abstract class TransportReplicationAction<
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
}
@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -265,9 +265,7 @@ public abstract class TransportTasksAction<
new TransportResponseHandler<NodeTasksResponse>() {
@Override
public NodeTasksResponse read(StreamInput in) throws IOException {
NodeTasksResponse response = new NodeTasksResponse();
response.readFrom(in);
return response;
return new NodeTasksResponse(in);
}
@Override
@ -368,28 +366,8 @@ public abstract class TransportTasksAction<
protected List<TaskOperationFailure> exceptions;
protected List<TaskResponse> results;
NodeTasksResponse() {
}
NodeTasksResponse(String nodeId,
List<TaskResponse> results,
List<TaskOperationFailure> exceptions) {
this.nodeId = nodeId;
this.results = results;
this.exceptions = exceptions;
}
public String getNodeId() {
return nodeId;
}
public List<TaskOperationFailure> getExceptions() {
return exceptions;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
NodeTasksResponse(StreamInput in) throws IOException {
super(in);
nodeId = in.readString();
int resultsSize = in.readVInt();
results = new ArrayList<>(resultsSize);
@ -408,6 +386,22 @@ public abstract class TransportTasksAction<
}
}
NodeTasksResponse(String nodeId,
List<TaskResponse> results,
List<TaskOperationFailure> exceptions) {
this.nodeId = nodeId;
this.results = results;
this.exceptions = exceptions;
}
public String getNodeId() {
return nodeId;
}
public List<TaskOperationFailure> getExceptions() {
return exceptions;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -312,12 +311,12 @@ public class UpdateHelper {
public static class Result {
private final Streamable action;
private final Writeable action;
private final DocWriteResponse.Result result;
private final Map<String, Object> updatedSourceAsMap;
private final XContentType updateSourceContentType;
public Result(Streamable action, DocWriteResponse.Result result, Map<String, Object> updatedSourceAsMap,
public Result(Writeable action, DocWriteResponse.Result result, Map<String, Object> updatedSourceAsMap,
XContentType updateSourceContentType) {
this.action = action;
this.result = result;

View File

@ -42,7 +42,7 @@ public class UpdateResponse extends DocWriteResponse {
public UpdateResponse(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
getResult = new GetResult(in);
}
}

View File

@ -54,7 +54,7 @@ public class NodeMappingRefreshAction {
this.transportService = transportService;
this.metaDataMappingService = metaDataMappingService;
transportService.registerRequestHandler(ACTION_NAME,
NodeMappingRefreshRequest::new, ThreadPool.Names.SAME, new NodeMappingRefreshTransportHandler());
ThreadPool.Names.SAME, NodeMappingRefreshRequest::new, new NodeMappingRefreshTransportHandler());
}
public void nodeMappingRefresh(final DiscoveryNode masterNode, final NodeMappingRefreshRequest request) {
@ -80,7 +80,11 @@ public class NodeMappingRefreshAction {
private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
private String nodeId;
public NodeMappingRefreshRequest() {
public NodeMappingRefreshRequest(StreamInput in) throws IOException {
super(in);
index = in.readString();
nodeId = in.readString();
indexUUID = in.readString();
}
public NodeMappingRefreshRequest(String index, String indexUUID, String nodeId) {
@ -118,13 +122,5 @@ public class NodeMappingRefreshAction {
out.writeString(nodeId);
out.writeString(indexUUID);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
nodeId = in.readString();
indexUUID = in.readString();
}
}
}

View File

@ -115,7 +115,7 @@ public class FollowersChecker {
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, false, false, FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.registerRequestHandler(
NodesFaultDetection.PING_ACTION_NAME, NodesFaultDetection.PingRequest::new, Names.SAME, false, false,
NodesFaultDetection.PING_ACTION_NAME, Names.SAME, false, false, NodesFaultDetection.PingRequest::new,
(request, channel, task) -> // TODO: check that we're a follower of the requesting node?
channel.sendResponse(new NodesFaultDetection.PingResponse()));
transportService.addConnectionListener(new TransportConnectionListener() {

View File

@ -124,8 +124,8 @@ public class JoinHelper {
transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
(request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel)));
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, MembershipAction.JoinRequest::new,
ThreadPool.Names.GENERIC, false, false,
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC, false, false, MembershipAction.JoinRequest::new,
(request, channel, task) -> joinHandler.accept(new JoinRequest(request.getNode(), Optional.empty()), // treat as non-voting join
transportJoinCallback(request, channel)));
@ -138,7 +138,7 @@ public class JoinHelper {
});
transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
ThreadPool.Names.GENERIC, ValidateJoinRequest::new,
(request, channel, task) -> {
final ClusterState localState = currentStateSupplier.get();
if (localState.metaData().clusterUUIDCommitted() &&
@ -152,7 +152,7 @@ public class JoinHelper {
});
transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
ThreadPool.Names.GENERIC, ValidateJoinRequest::new,
(request, channel, task) -> {
final ClusterState localState = currentStateSupplier.get();
if (localState.metaData().clusterUUIDCommitted() &&
@ -167,11 +167,11 @@ public class JoinHelper {
});
transportService.registerRequestHandler(
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME,
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ThreadPool.Names.SAME, ZenDiscovery.RejoinClusterRequest::new,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
transportService.registerRequestHandler(
MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, MembershipAction.LeaveRequest::new, ThreadPool.Names.SAME,
MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, ThreadPool.Names.SAME, MembershipAction.LeaveRequest::new,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
}

View File

@ -106,8 +106,8 @@ public class LeaderChecker {
channel.sendResponse(Empty.INSTANCE);
});
transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME, MasterFaultDetection.MasterPingRequest::new,
Names.SAME, false, false, (request, channel, task) -> {
transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME,
Names.SAME, false, false, MasterFaultDetection.MasterPingRequest::new, (request, channel, task) -> {
try {
handleLeaderCheck(new LeaderCheckRequest(request.sourceNode));
} catch (CoordinationStateRejectedException e) {

View File

@ -96,12 +96,11 @@ public class PublicationTransportHandler {
this.namedWriteableRegistry = namedWriteableRegistry;
this.handlePublishRequest = handlePublishRequest;
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.GENERIC,
false, false, (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request)));
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
BytesTransportRequest::new, (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request)));
transportService.registerRequestHandler(PublishClusterStateAction.SEND_ACTION_NAME, BytesTransportRequest::new,
ThreadPool.Names.GENERIC,
false, false, (request, channel, task) -> {
transportService.registerRequestHandler(PublishClusterStateAction.SEND_ACTION_NAME, ThreadPool.Names.GENERIC,
false, false, BytesTransportRequest::new, (request, channel, task) -> {
handleIncomingPublishRequest(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
@ -111,8 +110,7 @@ public class PublicationTransportHandler {
(request, channel, task) -> handleApplyCommit.accept(request, transportCommitCallback(channel)));
transportService.registerRequestHandler(PublishClusterStateAction.COMMIT_ACTION_NAME,
PublishClusterStateAction.CommitClusterStateRequest::new,
ThreadPool.Names.GENERIC, false, false,
ThreadPool.Names.GENERIC, false, false, PublishClusterStateAction.CommitClusterStateRequest::new,
(request, channel, task) -> {
final Optional<ClusterState> matchingClusterState = Optional.ofNullable(lastSeenClusterState.get()).filter(
cs -> cs.stateUUID().equals(request.stateUUID));

View File

@ -28,18 +28,15 @@ import java.io.IOException;
public class ValidateJoinRequest extends TransportRequest {
private ClusterState state;
public ValidateJoinRequest() {}
public ValidateJoinRequest(StreamInput in) throws IOException {
super(in);
this.state = ClusterState.readFrom(in, null);
}
public ValidateJoinRequest(ClusterState state) {
this.state = state;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.readFrom(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -93,7 +93,7 @@ public class MasterFaultDetection extends FaultDetection {
pingRetryCount);
transportService.registerRequestHandler(
MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
MASTER_PING_ACTION_NAME, ThreadPool.Names.SAME, false, false, MasterPingRequest::new, new MasterPingRequestHandler());
}
public DiscoveryNode masterNode() {
@ -406,7 +406,11 @@ public class MasterFaultDetection extends FaultDetection {
private DiscoveryNode masterNode;
private ClusterName clusterName;
public MasterPingRequest() {
public MasterPingRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
masterNode = new DiscoveryNode(in);
clusterName = new ClusterName(in);
}
public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {
@ -415,14 +419,6 @@ public class MasterFaultDetection extends FaultDetection {
this.clusterName = clusterName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sourceNode = new DiscoveryNode(in);
masterNode = new DiscoveryNode(in);
clusterName = new ClusterName(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -72,13 +72,13 @@ public class MembershipAction {
this.listener = listener;
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC, JoinRequest::new, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
ThreadPool.Names.GENERIC, ValidateJoinRequest::new,
new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME,
ThreadPool.Names.GENERIC, LeaveRequest::new, new LeaveRequestRequestHandler());
}
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
@ -112,19 +112,15 @@ public class MembershipAction {
return node;
}
public JoinRequest() {
public JoinRequest(StreamInput in) throws IOException {
super(in);
node = new DiscoveryNode(in);
}
public JoinRequest(DiscoveryNode node) {
this.node = node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
node = new DiscoveryNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -183,19 +179,15 @@ public class MembershipAction {
private DiscoveryNode node;
public LeaveRequest() {
public LeaveRequest(StreamInput in) throws IOException {
super(in);
node = new DiscoveryNode(in);
}
private LeaveRequest(DiscoveryNode node) {
this.node = node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
node = new DiscoveryNode(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -86,7 +86,7 @@ public class NodesFaultDetection extends FaultDetection {
pingRetryCount);
transportService.registerRequestHandler(
PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, false, false, new PingRequestHandler());
PING_ACTION_NAME, ThreadPool.Names.SAME, false, false, PingRequest::new, new PingRequestHandler());
}
public void setLocalNode(DiscoveryNode localNode) {
@ -313,7 +313,12 @@ public class NodesFaultDetection extends FaultDetection {
private long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
public PingRequest() {
public PingRequest(StreamInput in) throws IOException {
super(in);
targetNode = new DiscoveryNode(in);
clusterName = new ClusterName(in);
masterNode = new DiscoveryNode(in);
clusterStateVersion = in.readLong();
}
public PingRequest(DiscoveryNode targetNode, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) {
@ -339,15 +344,6 @@ public class NodesFaultDetection extends FaultDetection {
return clusterStateVersion;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
targetNode = new DiscoveryNode(in);
clusterName = new ClusterName(in);
masterNode = new DiscoveryNode(in);
clusterStateVersion = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -113,9 +113,9 @@ public class PublishClusterStateAction {
this.namedWriteableRegistry = namedWriteableRegistry;
this.incomingClusterStateListener = incomingClusterStateListener;
this.discoverySettings = discoverySettings;
transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, false, false,
transportService.registerRequestHandler(SEND_ACTION_NAME, ThreadPool.Names.SAME, false, false, BytesTransportRequest::new,
new SendClusterStateRequestHandler());
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, false, false,
transportService.registerRequestHandler(COMMIT_ACTION_NAME, ThreadPool.Names.SAME, false, false, CommitClusterStateRequest::new,
new CommitClusterStateRequestHandler());
}
@ -462,19 +462,15 @@ public class PublishClusterStateAction {
public String stateUUID;
public CommitClusterStateRequest() {
public CommitClusterStateRequest(StreamInput in) throws IOException {
super(in);
stateUUID = in.readString();
}
public CommitClusterStateRequest(String stateUUID) {
this.stateUUID = stateUUID;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
stateUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -533,11 +533,6 @@ public class UnicastZenPing implements ZenPing {
pingResponse = new PingResponse(in);
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -571,11 +566,6 @@ public class UnicastZenPing implements ZenPing {
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(id);

View File

@ -233,7 +233,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
masterService.setClusterStateSupplier(this::clusterState);
transportService.registerRequestHandler(
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
DISCOVERY_REJOIN_ACTION_NAME, ThreadPool.Names.SAME, RejoinClusterRequest::new, new RejoinClusterRequestHandler());
if (clusterApplier instanceof ClusterApplierService) {
((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState);
@ -1097,13 +1097,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
this.fromNodeId = fromNodeId;
}
public RejoinClusterRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public RejoinClusterRequest(StreamInput in) throws IOException {
super(in);
fromNodeId = in.readOptionalString();
}
@Override

View File

@ -73,7 +73,7 @@ public class LocalAllocateDangledIndices {
this.clusterService = clusterService;
this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, AllocateDangledRequest::new,
new AllocateDangledRequestHandler());
}
@ -89,9 +89,7 @@ public class LocalAllocateDangledIndices {
transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler<AllocateDangledResponse>() {
@Override
public AllocateDangledResponse read(StreamInput in) throws IOException {
final AllocateDangledResponse response = new AllocateDangledResponse();
response.readFrom(in);
return response;
return new AllocateDangledResponse(in);
}
@Override
@ -212,17 +210,8 @@ public class LocalAllocateDangledIndices {
DiscoveryNode fromNode;
IndexMetaData[] indices;
public AllocateDangledRequest() {
}
AllocateDangledRequest(DiscoveryNode fromNode, IndexMetaData[] indices) {
this.fromNode = fromNode;
this.indices = indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public AllocateDangledRequest(StreamInput in) throws IOException {
super(in);
fromNode = new DiscoveryNode(in);
indices = new IndexMetaData[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
@ -230,6 +219,11 @@ public class LocalAllocateDangledIndices {
}
}
AllocateDangledRequest(DiscoveryNode fromNode, IndexMetaData[] indices) {
this.fromNode = fromNode;
this.indices = indices;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -245,19 +239,15 @@ public class LocalAllocateDangledIndices {
private boolean ack;
AllocateDangledResponse() {
public AllocateDangledResponse(StreamInput in) throws IOException {
super(in);
ack = in.readBoolean();
}
AllocateDangledResponse(boolean ack) {
this.ack = ack;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ack = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(ack);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.cache.query;
import org.apache.lucene.search.DocIdSet;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -31,7 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class QueryCacheStats implements Streamable, Writeable, ToXContentFragment {
public class QueryCacheStats implements Writeable, ToXContentFragment {
private long ramBytesUsed;
private long hitCount;
@ -116,11 +115,6 @@ public class QueryCacheStats implements Streamable, Writeable, ToXContentFragmen
return cacheCount - cacheSize;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(ramBytesUsed);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.cache.request;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -29,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class RequestCacheStats implements Streamable, Writeable, ToXContentFragment {
public class RequestCacheStats implements Writeable, ToXContentFragment {
private long memorySize;
private long evictions;
@ -80,11 +79,6 @@ public class RequestCacheStats implements Streamable, Writeable, ToXContentFragm
return this.missCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);

View File

@ -31,7 +31,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -42,7 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
public class Segment implements Streamable {
public class Segment implements Writeable {
private String name;
private long generation;
@ -59,7 +59,32 @@ public class Segment implements Streamable {
public Accountable ramTree = null;
public Map<String, String> attributes;
Segment() {
public Segment(StreamInput in) throws IOException {
name = in.readString();
generation = Long.parseLong(name.substring(1), Character.MAX_RADIX);
committed = in.readBoolean();
search = in.readBoolean();
docCount = in.readInt();
delDocCount = in.readInt();
sizeInBytes = in.readLong();
version = Lucene.parseVersionLenient(in.readOptionalString(), null);
compound = in.readOptionalBoolean();
mergeId = in.readOptionalString();
memoryInBytes = in.readLong();
if (in.readBoolean()) {
// verbose mode
ramTree = readRamTree(in);
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
segmentSort = readSegmentSort(in);
} else {
segmentSort = null;
}
if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.readBoolean()) {
attributes = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
attributes = null;
}
}
public Segment(String name) {
@ -151,41 +176,6 @@ public class Segment implements Streamable {
return name != null ? name.hashCode() : 0;
}
public static Segment readSegment(StreamInput in) throws IOException {
Segment segment = new Segment();
segment.readFrom(in);
return segment;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
generation = Long.parseLong(name.substring(1), Character.MAX_RADIX);
committed = in.readBoolean();
search = in.readBoolean();
docCount = in.readInt();
delDocCount = in.readInt();
sizeInBytes = in.readLong();
version = Lucene.parseVersionLenient(in.readOptionalString(), null);
compound = in.readOptionalBoolean();
mergeId = in.readOptionalString();
memoryInBytes = in.readLong();
if (in.readBoolean()) {
// verbose mode
ramTree = readRamTree(in);
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
segmentSort = readSegmentSort(in);
} else {
segmentSort = null;
}
if (in.getVersion().onOrAfter(Version.V_6_1_0) && in.readBoolean()) {
attributes = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
attributes = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -31,7 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class SegmentsStats implements Streamable, Writeable, ToXContentFragment {
public class SegmentsStats implements Writeable, ToXContentFragment {
private long count;
private long memoryInBytes;
@ -365,11 +364,6 @@ public class SegmentsStats implements Streamable, Writeable, ToXContentFragment
static final String DESCRIPTION = "description";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -32,7 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class FieldDataStats implements Streamable, Writeable, ToXContentFragment {
public class FieldDataStats implements Writeable, ToXContentFragment {
private static final String FIELDDATA = "fielddata";
private static final String MEMORY_SIZE = "memory_size";
@ -89,11 +88,6 @@ public class FieldDataStats implements Streamable, Writeable, ToXContentFragment
return fields;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.flush;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -30,7 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class FlushStats implements Streamable, Writeable, ToXContentFragment {
public class FlushStats implements Writeable, ToXContentFragment {
private long total;
private long periodic;
@ -119,11 +118,6 @@ public class FlushStats implements Streamable, Writeable, ToXContentFragment {
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);

View File

@ -27,7 +27,7 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -48,7 +48,7 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class GetResult implements Streamable, Iterable<DocumentField>, ToXContentObject {
public class GetResult implements Writeable, Iterable<DocumentField>, ToXContentObject {
public static final String _INDEX = "_index";
public static final String _TYPE = "_type";
@ -72,7 +72,34 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
private BytesReference source;
private byte[] sourceAsBytes;
GetResult() {
public GetResult(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
version = in.readLong();
exists = in.readBoolean();
if (exists) {
source = in.readBytesReference();
if (source.length() == 0) {
source = null;
}
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
documentFields = readFields(in);
metaFields = readFields(in);
} else {
Map<String, DocumentField> fields = readFields(in);
documentFields = new HashMap<>();
metaFields = new HashMap<>();
splitFieldsByMetadata(fields, documentFields, metaFields);
}
}
}
public GetResult(String index, String type, String id, long seqNo, long primaryTerm, long version, boolean exists,
@ -376,12 +403,6 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
return fromXContentEmbedded(parser);
}
public static GetResult readGetResult(StreamInput in) throws IOException {
GetResult result = new GetResult();
result.readFrom(in);
return result;
}
private Map<String, DocumentField> readFields(StreamInput in) throws IOException {
Map<String, DocumentField> fields = null;
int size = in.readVInt();
@ -411,37 +432,6 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
version = in.readLong();
exists = in.readBoolean();
if (exists) {
source = in.readBytesReference();
if (source.length() == 0) {
source = null;
}
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
documentFields = readFields(in);
metaFields = readFields(in);
} else {
Map<String, DocumentField> fields = readFields(in);
documentFields = new HashMap<>();
metaFields = new HashMap<>();
splitFieldsByMetadata(fields, documentFields, metaFields);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.get;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -29,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class GetStats implements Streamable, Writeable, ToXContentFragment {
public class GetStats implements Writeable, ToXContentFragment {
private long existsCount;
private long existsTimeInMillis;
@ -143,11 +142,6 @@ public class GetStats implements Streamable, Writeable, ToXContentFragment {
static final String CURRENT = "current";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(existsCount);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.merge;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -30,7 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class MergeStats implements Streamable, Writeable, ToXContentFragment {
public class MergeStats implements Writeable, ToXContentFragment {
private long total;
private long totalTimeInMillis;
@ -238,11 +237,6 @@ public class MergeStats implements Streamable, Writeable, ToXContentFragment {
static final String TOTAL_THROTTLE_BYTES_PER_SEC = "total_auto_throttle";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);

View File

@ -20,7 +20,6 @@ package org.elasticsearch.index.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -34,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Recovery related statistics, starting at the shard level and allowing aggregation to
* indices and node level
*/
public class RecoveryStats implements ToXContentFragment, Writeable, Streamable {
public class RecoveryStats implements ToXContentFragment, Writeable {
private final AtomicInteger currentAsSource = new AtomicInteger();
private final AtomicInteger currentAsTarget = new AtomicInteger();
@ -122,11 +121,6 @@ public class RecoveryStats implements ToXContentFragment, Writeable, Streamable
static final String THROTTLE_TIME_IN_MILLIS = "throttle_time_in_millis";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(currentAsSource.get());

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.refresh;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -31,7 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class RefreshStats implements Streamable, Writeable, ToXContentFragment {
public class RefreshStats implements Writeable, ToXContentFragment {
private long total;
@ -151,11 +150,6 @@ public class RefreshStats implements Streamable, Writeable, ToXContentFragment {
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != RefreshStats.class) {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,7 +29,7 @@ import org.elasticsearch.index.store.StoreStats;
import java.io.IOException;
public class DocsStats implements Streamable, Writeable, ToXContentFragment {
public class DocsStats implements Writeable, ToXContentFragment {
private long count = 0;
private long deleted = 0;
@ -93,11 +92,6 @@ public class DocsStats implements Streamable, Writeable, ToXContentFragment {
return totalDocs == 0 ? 0 : totalSizeInBytes / totalDocs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -33,9 +32,9 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class IndexingStats implements Streamable, Writeable, ToXContentFragment {
public class IndexingStats implements Writeable, ToXContentFragment {
public static class Stats implements Streamable, Writeable, ToXContentFragment {
public static class Stats implements Writeable, ToXContentFragment {
private long indexCount;
private long indexTimeInMillis;
@ -147,11 +146,6 @@ public class IndexingStats implements Streamable, Writeable, ToXContentFragment
return noopUpdateCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
@ -282,11 +276,6 @@ public class IndexingStats implements Streamable, Writeable, ToXContentFragment
static final String THROTTLED_TIME = "throttle_time";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
totalStats.writeTo(out);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.store;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -30,7 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class StoreStats implements Streamable, Writeable, ToXContentFragment {
public class StoreStats implements Writeable, ToXContentFragment {
private long sizeInBytes;
@ -73,11 +72,6 @@ public class StoreStats implements Streamable, Writeable, ToXContentFragment {
return size();
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(sizeInBytes);

View File

@ -22,7 +22,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -30,7 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class TranslogStats implements Streamable, Writeable, ToXContentFragment {
public class TranslogStats implements Writeable, ToXContentFragment {
private long translogSizeInBytes;
private int numberOfOperations;
@ -130,11 +129,6 @@ public class TranslogStats implements Streamable, Writeable, ToXContentFragment
return Strings.toString(this, true, true);
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numberOfOperations);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.warmer;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -29,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class WarmerStats implements Streamable, Writeable, ToXContentFragment {
public class WarmerStats implements Writeable, ToXContentFragment {
private long current;
@ -111,11 +110,6 @@ public class WarmerStats implements Streamable, Writeable, ToXContentFragment {
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(current);

View File

@ -25,7 +25,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
@ -55,12 +55,26 @@ import java.util.Map;
/**
* Global information on indices stats running on a specific node.
*/
public class NodeIndicesStats implements Streamable, ToXContentFragment {
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, List<IndexShardStats>> statsByShard;
NodeIndicesStats() {
public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
}
}
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard) {
@ -158,30 +172,6 @@ public class NodeIndicesStats implements Streamable, ToXContentFragment {
return stats.getRecoveryStats();
}
public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException {
NodeIndicesStats stats = new NodeIndicesStats();
stats.readFrom(in);
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

View File

@ -21,7 +21,7 @@ package org.elasticsearch.indices.flush;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -34,7 +34,7 @@ import static java.util.Collections.unmodifiableMap;
/**
* Result for all copies of a shard
*/
public class ShardsSyncedFlushResult implements Streamable {
public class ShardsSyncedFlushResult implements Writeable {
private String failureReason;
private Map<ShardRouting, SyncedFlushService.ShardSyncedFlushResponse> shardResponses;
private String syncId;
@ -42,7 +42,18 @@ public class ShardsSyncedFlushResult implements Streamable {
// some shards may be unassigned, so we need this as state
private int totalShards;
private ShardsSyncedFlushResult() {
public ShardsSyncedFlushResult(StreamInput in) throws IOException {
failureReason = in.readOptionalString();
int numResponses = in.readInt();
shardResponses = new HashMap<>();
for (int i = 0; i < numResponses; i++) {
ShardRouting shardRouting = new ShardRouting(in);
SyncedFlushService.ShardSyncedFlushResponse response = SyncedFlushService.ShardSyncedFlushResponse.readSyncedFlushResponse(in);
shardResponses.put(shardRouting, response);
}
syncId = in.readOptionalString();
shardId = new ShardId(in);
totalShards = in.readInt();
}
public ShardId getShardId() {
@ -138,21 +149,6 @@ public class ShardsSyncedFlushResult implements Streamable {
return shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
failureReason = in.readOptionalString();
int numResponses = in.readInt();
shardResponses = new HashMap<>();
for (int i = 0; i < numResponses; i++) {
ShardRouting shardRouting = new ShardRouting(in);
SyncedFlushService.ShardSyncedFlushResponse response = SyncedFlushService.ShardSyncedFlushResponse.readSyncedFlushResponse(in);
shardResponses.put(shardRouting, response);
}
syncId = in.readOptionalString();
shardId = new ShardId(in);
totalShards = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(failureReason);
@ -165,10 +161,4 @@ public class ShardsSyncedFlushResult implements Streamable {
shardId.writeTo(out);
out.writeInt(totalShards);
}
public static ShardsSyncedFlushResult readShardsSyncedFlushResult(StreamInput in) throws IOException {
ShardsSyncedFlushResult shardsSyncedFlushResult = new ShardsSyncedFlushResult();
shardsSyncedFlushResult.readFrom(in);
return shardsSyncedFlushResult;
}
}

View File

@ -94,11 +94,11 @@ public class SyncedFlushService implements IndexEventListener {
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH,
transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, ThreadPool.Names.FLUSH, PreShardSyncedFlushRequest::new,
new PreSyncedFlushTransportHandler());
transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH,
transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ThreadPool.Names.FLUSH, ShardSyncedFlushRequest::new,
new SyncedFlushTransportHandler());
transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, ThreadPool.Names.SAME, InFlightOpsRequest::new,
new InFlightOpCountTransportHandler());
}
@ -283,7 +283,7 @@ public class SyncedFlushService implements IndexEventListener {
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
for (final ShardRouting shard : shards) {
if (preSyncResponses.containsKey(shard.currentNodeId())) {
results.put(shard, new ShardSyncedFlushResponse());
results.put(shard, new ShardSyncedFlushResponse((String) null));
}
}
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
@ -323,9 +323,7 @@ public class SyncedFlushService implements IndexEventListener {
new TransportResponseHandler<InFlightOpsResponse>() {
@Override
public InFlightOpsResponse read(StreamInput in) throws IOException {
InFlightOpsResponse response = new InFlightOpsResponse();
response.readFrom(in);
return response;
return new InFlightOpsResponse(in);
}
@Override
@ -403,9 +401,7 @@ public class SyncedFlushService implements IndexEventListener {
new TransportResponseHandler<ShardSyncedFlushResponse>() {
@Override
public ShardSyncedFlushResponse read(StreamInput in) throws IOException {
ShardSyncedFlushResponse response = new ShardSyncedFlushResponse();
response.readFrom(in);
return response;
return new ShardSyncedFlushResponse(in);
}
@Override
@ -469,9 +465,7 @@ public class SyncedFlushService implements IndexEventListener {
new TransportResponseHandler<PreSyncedFlushResponse>() {
@Override
public PreSyncedFlushResponse read(StreamInput in) throws IOException {
PreSyncedFlushResponse response = new PreSyncedFlushResponse();
response.readFrom(in);
return response;
return new PreSyncedFlushResponse(in);
}
@Override
@ -521,7 +515,7 @@ public class SyncedFlushService implements IndexEventListener {
logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
switch (result) {
case SUCCESS:
return new ShardSyncedFlushResponse();
return new ShardSyncedFlushResponse((String) null);
case COMMIT_MISMATCH:
return new ShardSyncedFlushResponse("commit has changed");
case PENDING_OPERATIONS:
@ -544,7 +538,9 @@ public class SyncedFlushService implements IndexEventListener {
public static final class PreShardSyncedFlushRequest extends TransportRequest {
private ShardId shardId;
public PreShardSyncedFlushRequest() {
public PreShardSyncedFlushRequest(StreamInput in) throws IOException {
super(in);
this.shardId = new ShardId(in);
}
public PreShardSyncedFlushRequest(ShardId shardId) {
@ -564,12 +560,6 @@ public class SyncedFlushService implements IndexEventListener {
shardId.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.shardId = new ShardId(in);
}
public ShardId shardId() {
return shardId;
}
@ -585,7 +575,17 @@ public class SyncedFlushService implements IndexEventListener {
int numDocs;
@Nullable String existingSyncId = null;
PreSyncedFlushResponse() {
PreSyncedFlushResponse(StreamInput in) throws IOException {
super(in);
commitId = new Engine.CommitId(in);
if (includeNumDocs(in.getVersion())) {
numDocs = in.readInt();
} else {
numDocs = UNKNOWN_NUM_DOCS;
}
if (includeExistingSyncId(in.getVersion())) {
existingSyncId = in.readOptionalString();
}
}
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) {
@ -602,20 +602,6 @@ public class SyncedFlushService implements IndexEventListener {
return version.onOrAfter(Version.V_6_3_0);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
commitId = new Engine.CommitId(in);
if (includeNumDocs(in.getVersion())) {
numDocs = in.readInt();
} else {
numDocs = UNKNOWN_NUM_DOCS;
}
if (includeExistingSyncId(in.getVersion())) {
existingSyncId = in.readOptionalString();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
commitId.writeTo(out);
@ -634,7 +620,11 @@ public class SyncedFlushService implements IndexEventListener {
private Engine.CommitId expectedCommitId;
private ShardId shardId;
public ShardSyncedFlushRequest() {
public ShardSyncedFlushRequest(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
expectedCommitId = new Engine.CommitId(in);
syncId = in.readString();
}
public ShardSyncedFlushRequest(ShardId shardId, String syncId, Engine.CommitId expectedCommitId) {
@ -643,14 +633,6 @@ public class SyncedFlushService implements IndexEventListener {
this.syncId = syncId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
expectedCommitId = new Engine.CommitId(in);
syncId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -690,20 +672,15 @@ public class SyncedFlushService implements IndexEventListener {
*/
String failureReason;
public ShardSyncedFlushResponse() {
failureReason = null;
public ShardSyncedFlushResponse(StreamInput in) throws IOException {
super(in);
failureReason = in.readOptionalString();
}
public ShardSyncedFlushResponse(String failureReason) {
this.failureReason = failureReason;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureReason = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(failureReason);
@ -726,9 +703,7 @@ public class SyncedFlushService implements IndexEventListener {
}
public static ShardSyncedFlushResponse readSyncedFlushResponse(StreamInput in) throws IOException {
ShardSyncedFlushResponse shardSyncedFlushResponse = new ShardSyncedFlushResponse();
shardSyncedFlushResponse.readFrom(in);
return shardSyncedFlushResponse;
return new ShardSyncedFlushResponse(in);
}
}
@ -737,19 +712,15 @@ public class SyncedFlushService implements IndexEventListener {
private ShardId shardId;
public InFlightOpsRequest() {
public InFlightOpsRequest(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
}
public InFlightOpsRequest(ShardId shardId) {
this.shardId = shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -775,7 +746,9 @@ public class SyncedFlushService implements IndexEventListener {
int opCount;
InFlightOpsResponse() {
InFlightOpsResponse(StreamInput in) throws IOException {
super(in);
opCount = in.readVInt();
}
InFlightOpsResponse(int opCount) {
@ -783,12 +756,6 @@ public class SyncedFlushService implements IndexEventListener {
this.opCount = opCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
opCount = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(opCount);

View File

@ -71,7 +71,7 @@ public class PeerRecoverySourceService implements IndexEventListener {
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC,
transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
new StartRecoveryTransportRequestHandler());
}

View File

@ -112,22 +112,22 @@ public class PeerRecoveryTargetService implements IndexEventListener {
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.FILES_INFO, ThreadPool.Names.GENERIC, RecoveryFilesInfoRequest::new,
new FilesInfoRequestHandler());
transportService.registerRequestHandler(Actions.FILE_CHUNK, ThreadPool.Names.GENERIC, RecoveryFileChunkRequest::new,
new FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, ThreadPool.Names.GENERIC, RecoveryFinalizeRecoveryRequest::new,
new FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(
Actions.HANDOFF_PRIMARY_CONTEXT,
RecoveryHandoffPrimaryContextRequest::new,
ThreadPool.Names.GENERIC,
RecoveryHandoffPrimaryContextRequest::new,
new HandoffPrimaryContextRequestHandler());
}

View File

@ -41,7 +41,21 @@ public final class RecoveryFileChunkRequest extends TransportRequest {
private int totalTranslogOps;
public RecoveryFileChunkRequest() {
public RecoveryFileChunkRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
String name = in.readString();
position = in.readVLong();
long length = in.readVLong();
String checksum = in.readString();
content = in.readBytesReference();
Version writtenBy = Lucene.parseVersionLenient(in.readString(), null);
assert writtenBy != null;
metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
lastChunk = in.readBoolean();
totalTranslogOps = in.readVInt();
sourceThrottleTimeInNanos = in.readLong();
}
public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content,
@ -92,24 +106,6 @@ public final class RecoveryFileChunkRequest extends TransportRequest {
return sourceThrottleTimeInNanos;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
String name = in.readString();
position = in.readVLong();
long length = in.readVLong();
String checksum = in.readString();
content = in.readBytesReference();
Version writtenBy = Lucene.parseVersionLenient(in.readString(), null);
assert writtenBy != null;
metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
lastChunk = in.readBoolean();
totalTranslogOps = in.readVInt();
sourceThrottleTimeInNanos = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -40,31 +40,8 @@ public class RecoveryFilesInfoRequest extends TransportRequest {
int totalTranslogOps;
public RecoveryFilesInfoRequest() {
}
RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.totalTranslogOps = totalTranslogOps;
}
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() {
return shardId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public RecoveryFilesInfoRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
int size = in.readVInt();
@ -93,6 +70,25 @@ public class RecoveryFilesInfoRequest extends TransportRequest {
totalTranslogOps = in.readVInt();
}
RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.totalTranslogOps = totalTranslogOps;
}
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() {
return shardId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -34,7 +34,15 @@ public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
private ShardId shardId;
private long globalCheckpoint;
public RecoveryFinalizeRecoveryRequest() {
public RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) {
@ -55,18 +63,6 @@ public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
return globalCheckpoint;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -39,7 +39,11 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
/**
* Initialize an empty request (used to serialize into when reading from a stream).
*/
RecoveryHandoffPrimaryContextRequest() {
RecoveryHandoffPrimaryContextRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
primaryContext = new ReplicationTracker.PrimaryContext(in);
}
/**
@ -68,14 +72,6 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
return primaryContext;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
primaryContext = new ReplicationTracker.PrimaryContext(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -43,7 +43,7 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
}
RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
totalTranslogOps = in.readVInt();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -47,7 +46,7 @@ import java.util.Map;
/**
* Keeps track of state related to shard recovery.
*/
public class RecoveryState implements ToXContentFragment, Streamable, Writeable {
public class RecoveryState implements ToXContentFragment, Writeable {
public enum Stage {
INIT((byte) 0),
@ -255,11 +254,6 @@ public class RecoveryState implements ToXContentFragment, Streamable, Writeable
return new RecoveryState(in);
}
@Override
public synchronized void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -100,7 +100,7 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
}
RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
operations = Translog.readOperations(in, "recovery");

View File

@ -31,7 +31,11 @@ public class RecoveryWaitForClusterStateRequest extends TransportRequest {
private ShardId shardId;
private long clusterStateVersion;
public RecoveryWaitForClusterStateRequest() {
public RecoveryWaitForClusterStateRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
clusterStateVersion = in.readVLong();
}
RecoveryWaitForClusterStateRequest(long recoveryId, ShardId shardId, long clusterStateVersion) {
@ -52,14 +56,6 @@ public class RecoveryWaitForClusterStateRequest extends TransportRequest {
return clusterStateVersion;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
clusterStateVersion = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -44,7 +44,20 @@ public class StartRecoveryRequest extends TransportRequest {
private boolean primaryRelocation;
private long startingSeqNo;
public StartRecoveryRequest() {
public StartRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
targetAllocationId = in.readString();
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
primaryRelocation = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
startingSeqNo = in.readLong();
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
/**
@ -111,23 +124,6 @@ public class StartRecoveryRequest extends TransportRequest {
return startingSeqNo;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
targetAllocationId = in.readString();
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
primaryRelocation = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
startingSeqNo = in.readLong();
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -100,7 +100,7 @@ public class IndicesStore implements ClusterStateListener, Closeable {
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ThreadPool.Names.SAME, ShardActiveRequest::new,
new ShardActiveRequestHandler());
this.deleteShardTimeout = INDICES_STORE_DELETE_SHARD_TIMEOUT.get(settings);
// Doesn't make sense to delete shards on non-data nodes
@ -397,7 +397,12 @@ public class IndicesStore implements ClusterStateListener, Closeable {
private String indexUUID;
private ShardId shardId;
ShardActiveRequest() {
ShardActiveRequest(StreamInput in) throws IOException {
super(in);
clusterName = new ClusterName(in);
indexUUID = in.readString();
shardId = new ShardId(in);
timeout = new TimeValue(in.readLong(), TimeUnit.MILLISECONDS);
}
ShardActiveRequest(ClusterName clusterName, String indexUUID, ShardId shardId, TimeValue timeout) {
@ -407,15 +412,6 @@ public class IndicesStore implements ClusterStateListener, Closeable {
this.timeout = timeout;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = new ClusterName(in);
indexUUID = in.readString();
shardId = new ShardId(in);
timeout = new TimeValue(in.readLong(), TimeUnit.MILLISECONDS);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -37,7 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -174,11 +174,13 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
}
}
public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Streamable {
public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Writeable {
private ShardId shardId;
Store.MetadataSnapshot metadataSnapshot;
StoreFilesMetaData() {
public StoreFilesMetaData(StreamInput in) throws IOException {
this.shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
}
public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) {
@ -207,18 +209,6 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
return metadataSnapshot.asMap().get(name);
}
public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException {
StoreFilesMetaData md = new StoreFilesMetaData();
md.readFrom(in);
return md;
}
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
@ -311,7 +301,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
public NodeStoreFilesMetaData(StreamInput in) throws IOException {
super(in);
storeFilesMetaData = StoreFilesMetaData.readStoreFilesMetaData(in);
storeFilesMetaData = new StoreFilesMetaData(in);
}
public NodeStoreFilesMetaData(DiscoveryNode node, StoreFilesMetaData storeFilesMetaData) {

View File

@ -63,7 +63,7 @@ public class VerifyNodeRepositoryAction {
this.transportService = transportService;
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SNAPSHOT,
transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SNAPSHOT, VerifyNodeRepositoryRequest::new,
new VerifyNodeRepositoryRequestHandler());
}
@ -131,7 +131,10 @@ public class VerifyNodeRepositoryAction {
private String repository;
private String verificationToken;
public VerifyNodeRepositoryRequest() {
public VerifyNodeRepositoryRequest(StreamInput in) throws IOException {
super(in);
repository = in.readString();
verificationToken = in.readString();
}
VerifyNodeRepositoryRequest(String repository, String verificationToken) {
@ -139,13 +142,6 @@ public class VerifyNodeRepositoryAction {
this.verificationToken = verificationToken;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
repository = in.readString();
verificationToken = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -90,11 +90,6 @@ public abstract class SearchPhaseResult extends TransportResponse {
*/
public FetchSearchResult fetchResult() { return null; }
@Override
public final void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
// TODO: this seems wrong, SearchPhaseResult should have a writeTo?

View File

@ -35,8 +35,10 @@ public class BytesTransportRequest extends TransportRequest {
BytesReference bytes;
Version version;
public BytesTransportRequest() {
public BytesTransportRequest(StreamInput in) throws IOException {
super(in);
bytes = in.readBytesReference();
version = in.getVersion();
}
public BytesTransportRequest(BytesReference bytes, Version version) {
@ -52,13 +54,6 @@ public class BytesTransportRequest extends TransportRequest {
return this.bytes;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
bytes = in.readBytesReference();
version = in.getVersion();
}
/**
* Writes the data in a "thin" manner, without the actual bytes, assumes
* the actual bytes will be appended right after this content.

View File

@ -171,7 +171,7 @@ final class TransportHandshaker {
}
HandshakeRequest(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
super(streamInput);
BytesReference remainingMessage;
try {
remainingMessage = streamInput.readBytesReference();
@ -187,11 +187,6 @@ final class TransportHandshaker {
}
}
@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
@ -217,11 +212,6 @@ final class TransportHandshaker {
responseVersion = Version.readVersion(in);
}
@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
assert responseVersion != null;

View File

@ -20,8 +20,7 @@
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.function.Supplier;
import org.elasticsearch.common.io.stream.Writeable.Reader;
/**
* This interface allows plugins to intercept requests on both the sender and the receiver side.
@ -29,8 +28,8 @@ import java.util.function.Supplier;
public interface TransportInterceptor {
/**
* This is called for each handler that is registered via
* {@link TransportService#registerRequestHandler(String, Supplier, String, boolean, boolean, TransportRequestHandler)} or
* {@link TransportService#registerRequestHandler(String, Supplier, String, TransportRequestHandler)}. The returned handler is
* {@link TransportService#registerRequestHandler(String, String, boolean, boolean, Reader, TransportRequestHandler)} or
* {@link TransportService#registerRequestHandler(String, String, Reader, TransportRequestHandler)}. The returned handler is
* used instead of the passed in handler. By default the provided handler is returned.
*/
default <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

View File

@ -52,7 +52,7 @@ public abstract class TransportMessage implements Streamable, Writeable {
}
@Override
public void readFrom(StreamInput in) throws IOException {
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -65,12 +65,6 @@ public abstract class TransportRequest extends TransportMessage implements TaskA
return parentTaskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
parentTaskId = TaskId.readFromStream(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
parentTaskId.writeTo(out);

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
@ -184,9 +183,9 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
}
registerRequestHandler(
HANDSHAKE_ACTION_NAME,
() -> HandshakeRequest.INSTANCE,
ThreadPool.Names.SAME,
false, false,
HandshakeRequest::new,
(request, channel, task) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
}
@ -485,6 +484,10 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
public static final HandshakeRequest INSTANCE = new HandshakeRequest();
HandshakeRequest(StreamInput in) throws IOException {
super(in);
}
private HandshakeRequest() {
}
@ -838,23 +841,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
return false;
}
/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestFactory a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory,
String executor, TransportRequestHandler<Request> handler) {
validateActionName(action);
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true);
transport.registerRequestHandler(reg);
}
/**
* Registers a new request handler
*
@ -873,27 +859,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
transport.registerRequestHandler(reg);
}
/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param request The request class that will be used to construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request,
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) {
validateActionName(action);
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
transport.registerRequestHandler(reg);
}
/**
* Registers a new request handler
*

View File

@ -168,7 +168,7 @@ public class SyncedFlushUnitTests extends ESTestCase {
shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId));
} else {
successful++;
shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse());
shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse((String) null));
}
}
shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses));

View File

@ -95,8 +95,7 @@ public class SegmentTests extends ESTestCase {
segment.writeTo(output);
output.flush();
StreamInput input = output.bytes().streamInput();
Segment deserialized = new Segment();
deserialized.readFrom(input);
Segment deserialized = new Segment(input);
assertTrue(isSegmentEquals(deserialized, segment));
}
}

View File

@ -872,7 +872,7 @@ public class StoreTests extends ESTestCase {
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
TransportNodesListShardStoreMetaData.StoreFilesMetaData inStoreFileMetaData =
TransportNodesListShardStoreMetaData.StoreFilesMetaData.readStoreFilesMetaData(in);
new TransportNodesListShardStoreMetaData.StoreFilesMetaData(in);
Iterator<StoreFileMetaData> outFiles = outStoreFileMetaData.iterator();
for (StoreFileMetaData inFile : inStoreFileMetaData) {
assertThat(inFile.name(), equalTo(outFiles.next().name()));

View File

@ -30,7 +30,7 @@ import static org.hamcrest.object.HasToString.hasToString;
public class NodeIndicesStatsTests extends ESTestCase {
public void testInvalidLevel() {
final NodeIndicesStats stats = new NodeIndicesStats();
final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap());
final String level = randomAlphaOfLength(16);
final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level));
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContent(null, params));

View File

@ -65,8 +65,7 @@ public class StartRecoveryRequestTests extends ESTestCase {
final ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
final StartRecoveryRequest inRequest = new StartRecoveryRequest();
inRequest.readFrom(in);
final StartRecoveryRequest inRequest = new StartRecoveryRequest(in);
assertThat(outRequest.shardId(), equalTo(inRequest.shardId()));
assertThat(outRequest.targetAllocationId(), equalTo(inRequest.targetAllocationId()));

View File

@ -170,11 +170,6 @@ public class InboundHandlerTests extends ESTestCase {
this.value = in.readString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -195,11 +190,6 @@ public class InboundHandlerTests extends ESTestCase {
this.value = in.readString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);

View File

@ -224,22 +224,14 @@ public class InboundMessageTests extends ESTestCase {
public String value;
private Message() {
}
private Message(StreamInput in) throws IOException {
readFrom(in);
value = in.readString();
}
private Message(String value) {
this.value = value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);

View File

@ -158,8 +158,7 @@ public class OutboundHandlerTests extends ESTestCase {
InboundMessage.Request inboundRequest = (InboundMessage.Request) inboundMessage;
assertThat(inboundRequest.getFeatures(), contains(feature1, feature2));
Request readMessage = new Request();
readMessage.readFrom(inboundMessage.getStreamInput());
Request readMessage = new Request(inboundMessage.getStreamInput());
assertEquals(value, readMessage.value);
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
@ -226,8 +225,7 @@ public class OutboundHandlerTests extends ESTestCase {
InboundMessage.Response inboundResponse = (InboundMessage.Response) inboundMessage;
assertFalse(inboundResponse.isError());
Response readMessage = new Response();
readMessage.readFrom(inboundMessage.getStreamInput());
Response readMessage = new Response(inboundMessage.getStreamInput());
assertEquals(value, readMessage.value);
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
@ -302,18 +300,14 @@ public class OutboundHandlerTests extends ESTestCase {
public String value;
private Request() {
private Request(StreamInput in) throws IOException {
value = in.readString();
}
private Request(String value) {
this.value = value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);
@ -324,18 +318,15 @@ public class OutboundHandlerTests extends ESTestCase {
public String value;
private Response() {
private Response(StreamInput in) throws IOException {
super(in);
value = in.readString();
}
private Response(String value) {
this.value = value;
}
@Override
public void readFrom(StreamInput in) throws IOException {
value = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);

View File

@ -214,11 +214,6 @@ public class TransportActionProxyTests extends ESTestCase {
sourceNode = in.readString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -1148,11 +1148,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
return timeout;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -1173,8 +1168,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
this.message = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(message);
@ -1193,11 +1186,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
value1 = in.readInt();
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -1657,11 +1645,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
this.info = info;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -1682,6 +1665,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final String info;
TestResponse(StreamInput in) throws IOException {
super(in);
this.info = in.readOptionalString();
}
@ -1689,11 +1673,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
this.info = info;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(info);