Remove allocation id from replica replication response (#25488)

The replica replication response object has an extra allocationId field that contains the allocation id of the replica on which the request was executed. As we are sending the allocation id with the actual replica replication request, and check when executing the replica replication action that the allocation id of the replica shard is what we expect, there is no need to communicate back the allocation id as part of the response object.
This commit is contained in:
Yannick Welsch 2017-07-01 11:36:45 +02:00 committed by GitHub
parent 6ae4497c13
commit bb23d3b2c5
8 changed files with 12 additions and 34 deletions

View File

@ -93,7 +93,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}

View File

@ -187,7 +187,7 @@ public class ReplicationOperation<
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
@ -429,9 +429,6 @@ public class ReplicationOperation<
/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
long localCheckpoint();
/** the allocation id of the replica shard */
String allocationId();
}
public static class RetryOnPrimaryException extends ElasticsearchException {

View File

@ -53,6 +53,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
@ -523,8 +524,7 @@ public abstract class TransportReplicationAction<
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
@ -1011,14 +1011,12 @@ public abstract class TransportReplicationAction<
public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private String allocationId;
ReplicaResponse() {
}
public ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
public ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@ -1027,9 +1025,9 @@ public abstract class TransportReplicationAction<
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
@ -1038,7 +1036,6 @@ public abstract class TransportReplicationAction<
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
@ -1049,11 +1046,6 @@ public abstract class TransportReplicationAction<
public long localCheckpoint() {
return localCheckpoint;
}
@Override
public String allocationId() {
return allocationId;
}
}
/**

View File

@ -89,7 +89,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}

View File

@ -464,11 +464,9 @@ public class ReplicationOperationTests extends ESTestCase {
}
static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
final String allocationId;
final long localCheckpoint;
ReplicaResponse(String allocationId, long localCheckpoint) {
this.allocationId = allocationId;
ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@ -476,11 +474,6 @@ public class ReplicationOperationTests extends ESTestCase {
public long localCheckpoint() {
return localCheckpoint;
}
@Override
public String allocationId() {
return allocationId;
}
}
static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
@ -515,7 +508,7 @@ public class ReplicationOperationTests extends ESTestCase {
final String allocationId = replica.allocationId().getId();
Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint);
assertNull(existing);
listener.onResponse(new ReplicaResponse(allocationId, checkpoint));
listener.onResponse(new ReplicaResponse(checkpoint));
}
}

View File

@ -636,8 +636,7 @@ public class TransportReplicationActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
final TransportReplicationAction.ReplicaResponse response =
new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));

View File

@ -286,8 +286,7 @@ public class TransportWriteActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
final TransportReplicationAction.ReplicaResponse response =
new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));

View File

@ -537,9 +537,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
try {
performOnReplica(request, replica);
releasable.close();
listener.onResponse(
new ReplicaResponse(
replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
listener.onFailure(e);