Clean up ShardId usage of Streamable (#41843)

ShardId already implements Writeable so there is no need for it to implement Streamable too. Also the readShardId static method can be
easily replaced with direct usages of the constructor that takes a
StreamInput as argument.
This commit is contained in:
Luca Cavanna 2019-05-09 11:47:36 +02:00
parent 96ba0b13e0
commit 29c9bb9181
40 changed files with 63 additions and 76 deletions

View File

@ -260,7 +260,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
type = in.readString();
id = in.readString();
version = in.readZLong();

View File

@ -40,7 +40,7 @@ public class ClusterSearchShardsGroup implements Writeable, ToXContentObject {
}
ClusterSearchShardsGroup(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
shards = new ShardRouting[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = new ShardRouting(shardId, in);

View File

@ -214,7 +214,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
int numberOfShards = in.readVInt();
Map<ShardId, SnapshotIndexShardStatus> shardMapBuilder = new HashMap<>(numberOfShards);
for (int j = 0; j < numberOfShards; j++) {
ShardId shardId = ShardId.readShardId(in);
ShardId shardId = new ShardId(in);
SnapshotIndexShardStatus status = SnapshotIndexShardStatus.readShardSnapshotStatus(in);
shardMapBuilder.put(shardId, status);
}

View File

@ -90,7 +90,7 @@ public class IndexShardStats implements Iterable<ShardStats>, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
int shardSize = in.readVInt();
shards = new ShardStats[shardSize];
for (int i = 0; i < shardSize; i++) {

View File

@ -68,7 +68,7 @@ class ShardUpgradeResult implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
primary = in.readBoolean();
upgradeVersion = Version.readVersion(in);
try {

View File

@ -67,7 +67,7 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);

View File

@ -65,7 +65,7 @@ public abstract class BroadcastShardRequest extends TransportRequest implements
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}

View File

@ -49,7 +49,7 @@ public abstract class BroadcastShardResponse extends TransportResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override

View File

@ -69,7 +69,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
public ReplicationRequest(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
} else {
shardId = null;
}

View File

@ -269,7 +269,7 @@ public class ReplicationResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
super.shardId = shardId.getId();
index = shardId.getIndexName();
nodeId = in.readOptionalString();

View File

@ -114,7 +114,7 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
super.readFrom(in);
index = in.readString();
if (in.readBoolean()) {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
} else {
shardId = null;
}
@ -126,7 +126,7 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeOptionalStreamable(shardId);
out.writeOptionalWriteable(shardId);
out.writeTimeValue(timeout);
out.writeOptionalString(concreteIndex);
}

View File

@ -97,7 +97,7 @@ public abstract class SingleShardRequest<Request extends SingleShardRequest<Requ
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
internalShardId = new ShardId(in);
}
index = in.readOptionalString();
// no need to pass threading over the network, they are always false when coming throw a thread pool
@ -106,9 +106,8 @@ public abstract class SingleShardRequest<Request extends SingleShardRequest<Requ
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeOptionalWriteable(internalShardId);
out.writeOptionalString(index);
}
}

View File

@ -451,7 +451,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> builder = ImmutableOpenMap.builder();
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in);
ShardId shardId = new ShardId(in);
ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in);
builder.put(shardId, shardState);
}

View File

@ -432,7 +432,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in);
ShardId shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
builder.put(shardId, new ShardSnapshotStatus(in));
} else {

View File

@ -397,7 +397,7 @@ public class ShardStateAction {
FailedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
allocationId = in.readString();
primaryTerm = in.readVLong();
message = in.readString();
@ -601,7 +601,7 @@ public class ShardStateAction {
StartedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
allocationId = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {
primaryTerm = in.readVLong();

View File

@ -279,7 +279,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
}
public ShardRouting(StreamInput in) throws IOException {
this(ShardId.readShardId(in), in);
this(new ShardId(in), in);
}
/**

View File

@ -186,7 +186,7 @@ public class TransportNodesListGatewayStartedShards extends
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override
@ -230,7 +230,7 @@ public class TransportNodesListGatewayStartedShards extends
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override

View File

@ -301,7 +301,7 @@ public class RetentionLeaseActions {
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
id = in.readString();
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -33,20 +32,12 @@ import java.io.IOException;
/**
* Allows for shard level components to be injected with the shard id.
*/
public class ShardId implements Streamable, Comparable<ShardId>, ToXContentFragment, Writeable {
public class ShardId implements Comparable<ShardId>, ToXContentFragment, Writeable {
private final Index index;
private final int shardId;
private final int hashCode;
public ShardId(StreamInput in) throws IOException {
index = new Index(in);
shardId = in.readVInt();
hashCode = computeHashCode();
}
public ShardId(Index index, int shardId) {
this.index = index;
this.shardId = shardId;
@ -57,6 +48,18 @@ public class ShardId implements Streamable, Comparable<ShardId>, ToXContentFragm
this(new Index(index, indexUUID), shardId);
}
public ShardId(StreamInput in) throws IOException {
index = new Index(in);
shardId = in.readVInt();
hashCode = computeHashCode();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
index.writeTo(out);
out.writeVInt(shardId);
}
public Index getIndex() {
return index;
}
@ -113,21 +116,6 @@ public class ShardId implements Streamable, Comparable<ShardId>, ToXContentFragm
return result;
}
public static ShardId readShardId(StreamInput in) throws IOException {
return new ShardId(in);
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
index.writeTo(out);
out.writeVInt(shardId);
}
@Override
public int compareTo(ShardId o) {
if (o.getId() == shardId) {

View File

@ -149,7 +149,7 @@ public class ShardsSyncedFlushResult implements Streamable {
shardResponses.put(shardRouting, response);
}
syncId = in.readOptionalString();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
totalShards = in.readInt();
}

View File

@ -567,7 +567,7 @@ public class SyncedFlushService implements IndexEventListener {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.shardId = ShardId.readShardId(in);
this.shardId = new ShardId(in);
}
public ShardId shardId() {
@ -647,7 +647,7 @@ public class SyncedFlushService implements IndexEventListener {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
expectedCommitId = new Engine.CommitId(in);
syncId = in.readString();
}
@ -749,7 +749,7 @@ public class SyncedFlushService implements IndexEventListener {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override

View File

@ -49,7 +49,7 @@ public class RecoveryCleanFilesRequest extends TransportRequest {
RecoveryCleanFilesRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
snapshotFiles = new Store.MetadataSnapshot(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_2_0)) {

View File

@ -96,7 +96,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
String name = in.readString();
position = in.readVLong();
long length = in.readVLong();

View File

@ -66,7 +66,7 @@ public class RecoveryFilesInfoRequest extends TransportRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
int size = in.readVInt();
phase1FileNames = new ArrayList<>(size);
for (int i = 0; i < size; i++) {

View File

@ -59,7 +59,7 @@ public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
} else {

View File

@ -72,7 +72,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
primaryContext = new ReplicationTracker.PrimaryContext(in);
}

View File

@ -45,7 +45,7 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp

View File

@ -136,7 +136,7 @@ public class RecoveryState implements ToXContentFragment, Streamable, Writeable
public RecoveryState(StreamInput in) throws IOException {
timer = new Timer(in);
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
recoverySource = RecoverySource.readFrom(in);
targetNode = new DiscoveryNode(in);
sourceNode = in.readOptionalWriteable(DiscoveryNode::new);

View File

@ -102,7 +102,7 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {

View File

@ -56,7 +56,7 @@ public class RecoveryWaitForClusterStateRequest extends TransportRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
clusterStateVersion = in.readVLong();
}

View File

@ -115,7 +115,7 @@ public class StartRecoveryRequest extends TransportRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
targetAllocationId = in.readString();
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);

View File

@ -412,7 +412,7 @@ public class IndicesStore implements ClusterStateListener, Closeable {
super.readFrom(in);
clusterName = new ClusterName(in);
indexUUID = in.readString();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
timeout = new TimeValue(in.readLong(), TimeUnit.MILLISECONDS);
}

View File

@ -213,7 +213,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
}
@ -255,7 +255,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override
@ -298,7 +298,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override

View File

@ -49,7 +49,7 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
} else {
nodeId = null;
}
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
this.originalIndices = null;
clusterAlias = in.readOptionalString();
}

View File

@ -108,7 +108,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
}
ShardSearchLocalRequest(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
searchType = SearchType.fromId(in.readByte());
numberOfShards = in.readVInt();
scroll = in.readOptionalWriteable(Scroll::new);

View File

@ -99,7 +99,7 @@ public class SnapshotShardFailure extends ShardOperationFailedException {
@Override
public void readFrom(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
super.shardId = shardId.getId();
index = shardId.getIndexName();
reason = in.readString();

View File

@ -447,7 +447,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
snapshot = new Snapshot(in);
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
status = new ShardSnapshotStatus(in);
}

View File

@ -152,7 +152,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
super.readFrom(in);
fromSeqNo = in.readVLong();
maxOperationCount = in.readVInt();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxBatchSize = new ByteSizeValue(in);

View File

@ -94,8 +94,8 @@ public class ShardFollowTask extends ImmutableFollowParameters implements XPackP
public static ShardFollowTask readFrom(StreamInput in) throws IOException {
String remoteCluster = in.readString();
ShardId followShardId = ShardId.readShardId(in);
ShardId leaderShardId = ShardId.readShardId(in);
ShardId followShardId = new ShardId(in);
ShardId leaderShardId = new ShardId(in);
return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, in);
}

View File

@ -37,7 +37,7 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestor
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
shardId = ShardId.readShardId(in);
shardId = new ShardId(in);
}
@Override