Failure to recover properly on node(s) restart

When a node restarts, it might be canceling one recovery of a shard id only to get another one in the next cycle. We should detect this case and handle it properly.

This is a fix to the annoying message seen by users: suspect illegal state: trying to move shard from primary mode to replica mode.
This commit is contained in:
Shay Banon 2012-06-22 17:46:57 +02:00
parent cc3fab45ff
commit 1780a2a067
7 changed files with 129 additions and 16 deletions

View File

@ -294,6 +294,7 @@ public class AllocationService extends AbstractComponent {
// we know this since it has a relocating node id (the node we relocate from) and our state is INITIALIZING (and not RELOCATING)
boolean isRelocationDestinationShard = relocatingNodeId != null && shardRoutingEntry.initializing();
boolean remove = false;
boolean currentNodeIsDead = false;
if (!liveNodeIds.contains(shardRoutingEntry.currentNodeId())) {
changed = true;
@ -305,7 +306,7 @@ public class AllocationService extends AbstractComponent {
shardRoutingEntry.deassignNode();
currentNodeIsDead = true;
shardsIterator.remove();
remove = true;
}
// move source shard back to active state and cancel relocation mode.
@ -319,6 +320,10 @@ public class AllocationService extends AbstractComponent {
if (isRelocationDestinationShard && !liveNodeIds.contains(relocatingNodeId)) {
changed = true;
remove = true;
}
if (remove) {
shardsIterator.remove();
}
}

View File

@ -102,6 +102,7 @@ public class ShardId implements Serializable, Streamable {
public void readFrom(StreamInput in) throws IOException {
index = Index.readIndexName(in);
shardId = in.readVInt();
hashCode = computeHashCode();
}
@Override

View File

@ -217,7 +217,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
if (currentRouting != null) {
if (!shardRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to backup mode");
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(shardRouting)) {

View File

@ -493,6 +493,32 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
continue;
}
if (indexService.hasShard(shardId)) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId);
if (!shardRouting.equals(indexShard.routingEntry())) {
ShardRouting currentRoutingEntry = indexShard.routingEntry();
boolean needToDeleteCurrentShard = false;
if (currentRoutingEntry.initializing() && shardRouting.initializing()) {
// both are initializing, see if they are different instanceof of the shard routing, so they got switched on us
if (currentRoutingEntry.primary() && !shardRouting.primary()) {
needToDeleteCurrentShard = true;
}
// recovering from different nodes..., restart recovery
if (currentRoutingEntry.relocatingNodeId() != null && shardRouting.relocatingNodeId() != null &&
!currentRoutingEntry.relocatingNodeId().equals(shardRouting.relocatingNodeId())) {
needToDeleteCurrentShard = true;
}
}
if (needToDeleteCurrentShard) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node)", shardRouting.index(), shardRouting.id());
}
recoveryTarget.cancelRecovery(shardRouting.shardId());
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
}
}
}
if (indexService.hasShard(shardId)) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId);
if (!shardRouting.equals(indexShard.routingEntry())) {

View File

@ -39,6 +39,10 @@ public class RecoveryStatus {
DONE
}
volatile Thread recoveryThread;
volatile boolean canceled;
volatile boolean sentCanceledToSource;
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
ConcurrentMap<String, String> checksums = ConcurrentCollections.newConcurrentMap();

View File

@ -117,6 +117,34 @@ public class RecoveryTarget extends AbstractComponent {
return peerRecoveryStatus;
}
public void cancelRecovery(ShardId shardId) {
RecoveryStatus recoveryStatus = onGoingRecoveries.get(shardId);
// it might be if the recovery source got canceled first
if (recoveryStatus == null) {
return;
}
if (recoveryStatus.sentCanceledToSource) {
return;
}
recoveryStatus.canceled = true;
if (recoveryStatus.recoveryThread != null) {
recoveryStatus.recoveryThread.interrupt();
}
long time = System.currentTimeMillis();
// give it a grace period of actually getting the sent ack part
while (!recoveryStatus.sentCanceledToSource) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
if (System.currentTimeMillis() - time > 10000) {
break;
}
}
removeAndCleanOnGoingRecovery(shardId);
}
public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update");
@ -170,6 +198,7 @@ public class RecoveryTarget extends AbstractComponent {
recovery = new RecoveryStatus();
onGoingRecoveries.put(request.shardId(), recovery);
}
recovery.recoveryThread = Thread.currentThread();
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
@ -207,6 +236,11 @@ public class RecoveryTarget extends AbstractComponent {
listener.onRecoveryDone();
} catch (Exception e) {
// logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
if (recovery.canceled) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
return;
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
@ -274,6 +308,9 @@ public class RecoveryTarget extends AbstractComponent {
// clean it from the on going recoveries since it is being closed
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.remove(shardId);
if (peerRecoveryStatus != null) {
// just mark it as canceled as well, just in case there are in flight requests
// coming from the recovery target
peerRecoveryStatus.canceled = true;
// clean open index outputs
for (Map.Entry<String, IndexOutput> entry : peerRecoveryStatus.openIndexOutputs.entrySet()) {
synchronized (entry.getValue()) {
@ -310,6 +347,10 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG;
shard.performRecoveryPrepareForTranslog();
@ -332,15 +373,19 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shard.shardId());
if (peerRecoveryStatus == null) {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
peerRecoveryStatus.stage = RecoveryStatus.Stage.FINALIZE;
shard.performRecoveryFinalization(false, peerRecoveryStatus);
peerRecoveryStatus.time = System.currentTimeMillis() - peerRecoveryStatus.startTime;
peerRecoveryStatus.stage = RecoveryStatus.Stage.DONE;
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = RecoveryStatus.Stage.FINALIZE;
shard.performRecoveryFinalization(false, onGoingRecovery);
onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime;
onGoingRecovery.stage = RecoveryStatus.Stage.DONE;
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -360,17 +405,35 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
shard.performRecoveryOperation(operation);
}
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
shard.performRecoveryOperation(operation);
onGoingRecovery.currentTranslogOperations++;
}
onGoingRecovery = onGoingRecoveries.get(request.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
onGoingRecovery.currentTranslogOperations += request.operations().size();
channel.sendResponse(VoidStreamable.INSTANCE);
}
@ -396,6 +459,10 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.phase1FileNames = request.phase1FileNames;
onGoingRecovery.phase1FileSizes = request.phase1FileSizes;
onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames;
@ -427,6 +494,10 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId());
}
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
@ -495,6 +566,10 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId());
}
IndexOutput indexOutput;
if (request.position() == 0) {
// first request

View File

@ -91,7 +91,9 @@ public class QuickRollingRestartStressTest {
if (clusterHealthResponse.timedOut()) {
System.err.println("--> timed out waiting for green state...");
ClusterState state = client.client().admin().cluster().prepareState().execute().actionGet().state();
System.out.println(state.nodes().prettyPrint());
System.out.println(state.routingTable().prettyPrint());
System.out.println(state.routingNodes().prettyPrint());
throw new ElasticSearchException("timed out waiting for green state");
} else {
System.out.println("--> got green status");