Provide target allocation id as part of start recovery request (#24333)

This makes it possible for the recovery source to verify that it is talking to the shard it thinks it is talking to.

Closes #24167
This commit is contained in:
Yannick Welsch 2017-04-27 14:45:44 +02:00 committed by GitHub
parent 65f90b25e0
commit 2fa1c9fff1
10 changed files with 46 additions and 41 deletions

View File

@ -113,7 +113,14 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
}
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, targetShardRouting.allocationId().getId(), shard);
if (request.targetAllocationId().equals(targetShardRouting.allocationId().getId()) == false) {
logger.debug("delaying recovery of {} due to target allocation id mismatch (expected: [{}], but was: [{}])",
request.shardId(), request.targetAllocationId(), targetShardRouting.allocationId().getId());
throw new DelayRecoveryException("source node has the state of the target shard to have allocation id [" +
targetShardRouting.allocationId().getId() + "], expecting to be [" + request.targetAllocationId() + "]");
}
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
try {
return handler.recoverToTarget();
@ -133,9 +140,9 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
private final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, String targetAllocationId, IndexShard shard) {
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
RecoverySourceHandler handler = shardContext.addNewRecovery(request, targetAllocationId, shard);
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
shard.recoveryStats().incCurrentAsSource();
return handler;
}
@ -181,20 +188,19 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
* Adds recovery source handler if recoveries are not delayed from starting (see also {@link #delayNewRecoveries}.
* Throws {@link DelayRecoveryException} if new recoveries are delayed from starting.
*/
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, String targetAllocationId, IndexShard shard) {
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
if (onNewRecoveryException != null) {
throw onNewRecoveryException;
}
RecoverySourceHandler handler = createRecoverySourceHandler(request, targetAllocationId, shard);
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
recoveryHandlers.add(handler);
return handler;
}
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, String targetAllocationId,
IndexShard shard) {
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), targetAllocationId, transportService,
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,

View File

@ -329,6 +329,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
clusterService.localNode(),
metadataSnapshot,

View File

@ -78,7 +78,6 @@ public class RecoverySourceHandler {
protected final Logger logger;
// Shard that is going to be recovered (the "source")
private final IndexShard shard;
private final String indexName;
private final int shardId;
// Request containing source and target node information
private final StartRecoveryRequest request;
@ -116,7 +115,6 @@ public class RecoverySourceHandler {
this.request = request;
this.currentClusterStateVersionSupplier = currentClusterStateVersionSupplier;
this.delayNewRecoveries = delayNewRecoveries;
this.indexName = this.request.shardId().getIndex().getName();
this.shardId = this.request.shardId().id();
this.logger = Loggers.getLogger(getClass(), nodeSettings, request.shardId(), "recover to " + request.targetNode().getName());
this.chunkSizeInBytes = fileChunkSizeInBytes;
@ -443,7 +441,7 @@ public class RecoverySourceHandler {
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
cancellableThreads.execute(() -> {
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
shard.markAllocationIdAsInSync(request.targetAllocationId());
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});

View File

@ -368,11 +368,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
indexShard.finalizeRecovery();
}
@Override
public String getTargetAllocationId() {
return indexShard().routingEntry().allocationId().getId();
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
ensureClusterStateVersionCallback.handle(clusterStateVersion);

View File

@ -77,9 +77,4 @@ public interface RecoveryTargetHandler {
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException;
/***
* @return the allocation id of the target shard.
*/
String getTargetAllocationId();
}

View File

@ -50,14 +50,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final Consumer<Long> onSourceThrottle;
private String targetAllocationId;
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, String targetAllocationId, TransportService transportService,
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService,
DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
this.targetAllocationId = targetAllocationId;
this.transportService = transportService;
this.recoveryId = recoveryId;
this.shardId = shardId;
this.targetNode = targetNode;
@ -164,8 +160,4 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public String getTargetAllocationId() {
return targetAllocationId;
}
}

View File

@ -21,10 +21,8 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -39,6 +37,7 @@ public class StartRecoveryRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private String targetAllocationId;
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private Store.MetadataSnapshot metadataSnapshot;
@ -51,15 +50,17 @@ public class StartRecoveryRequest extends TransportRequest {
/**
* Construct a request for starting a peer recovery.
*
* @param shardId the shard ID to recover
* @param sourceNode the source node to remover from
* @param targetNode the target node to recover to
* @param metadataSnapshot the Lucene metadata
* @param primaryRelocation whether or not the recovery is a primary relocation
* @param recoveryId the recovery ID
* @param startingSeqNo the starting sequence number
* @param shardId the shard ID to recover
* @param targetAllocationId the allocation id of the target shard
* @param sourceNode the source node to remover from
* @param targetNode the target node to recover to
* @param metadataSnapshot the Lucene metadata
* @param primaryRelocation whether or not the recovery is a primary relocation
* @param recoveryId the recovery ID
* @param startingSeqNo the starting sequence number
*/
public StartRecoveryRequest(final ShardId shardId,
final String targetAllocationId,
final DiscoveryNode sourceNode,
final DiscoveryNode targetNode,
final Store.MetadataSnapshot metadataSnapshot,
@ -68,6 +69,7 @@ public class StartRecoveryRequest extends TransportRequest {
final long startingSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.targetAllocationId = targetAllocationId;
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.metadataSnapshot = metadataSnapshot;
@ -83,6 +85,10 @@ public class StartRecoveryRequest extends TransportRequest {
return shardId;
}
public String targetAllocationId() {
return targetAllocationId;
}
public DiscoveryNode sourceNode() {
return sourceNode;
}
@ -108,6 +114,7 @@ public class StartRecoveryRequest extends TransportRequest {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
targetAllocationId = in.readString();
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
@ -124,6 +131,7 @@ public class StartRecoveryRequest extends TransportRequest {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeString(targetAllocationId);
sourceNode.writeTo(out);
targetNode.writeTo(out);
metadataSnapshot.writeTo(out);

View File

@ -99,6 +99,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
final StartRecoveryRequest request = new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
@ -155,6 +156,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final long startingSeqNo = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
final StartRecoveryRequest request = new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
@ -223,6 +225,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
@ -292,6 +295,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
@ -358,6 +362,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
@ -417,6 +422,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final StartRecoveryRequest request =
new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -44,6 +45,7 @@ public class StartRecoveryRequestTests extends ESTestCase {
final Version targetNodeVersion = randomVersion(random());
final StartRecoveryRequest outRequest = new StartRecoveryRequest(
new ShardId("test", "_na_", 0),
UUIDs.base64UUID(),
new DiscoveryNode("a", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion),
Store.MetadataSnapshot.EMPTY,
@ -63,6 +65,7 @@ public class StartRecoveryRequestTests extends ESTestCase {
inRequest.readFrom(in);
assertThat(outRequest.shardId(), equalTo(inRequest.shardId()));
assertThat(outRequest.targetAllocationId(), equalTo(inRequest.targetAllocationId()));
assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode()));
assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode()));
assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap()));

View File

@ -405,6 +405,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
replica.prepareForIndexRecovery();
final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
final String targetAllocationId = recoveryTarget.indexShard().routingEntry().allocationId().getId();
final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica);
final long startingSeqNo;
@ -414,8 +415,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
final StartRecoveryRequest request =
new StartRecoveryRequest(replica.shardId(), pNode, rNode, snapshot, false, 0, startingSeqNo);
final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
pNode, rNode, snapshot, false, 0, startingSeqNo);
final RecoverySourceHandler recovery = new RecoverySourceHandler(
primary,
recoveryTarget,