Recovery: remove unneeded waits on recovery cancellation

When cancelling recoveries, we wait for up to 10s for the source node to be notified before continuing. This is not needed in two cases:
1) The source node has been disconnected due to node shutdown (recovery is canceled as a response to cluster state processing)
2) The current thread is the one that will be notifying the source node (happens when one of the calls from the source nodes discoveres the local index is closed)

The first one is especially important as it may delay cluster state update processing with 10s.

Closes #7717
This commit is contained in:
Boaz Leskes 2014-09-15 09:35:54 +02:00
parent ede39edbba
commit d228606bab
2 changed files with 8 additions and 6 deletions

View File

@ -91,8 +91,8 @@ public class RecoveryStatus {
}
return outputs.get(key);
}
public synchronized Set<Entry<String, IndexOutput>> cancleAndClearOpenIndexInputs() {
public synchronized Set<Entry<String, IndexOutput>> cancelAndClearOpenIndexInputs() {
cancel();
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
openIndexOutputs = null;

View File

@ -138,7 +138,9 @@ public class RecoveryTarget extends AbstractComponent {
final long sleepTime = 100;
final long maxSleepTime = 10000;
long rounds = Math.round(maxSleepTime / sleepTime);
while (!recoveryStatus.sentCanceledToSource && rounds > 0) {
while (!recoveryStatus.sentCanceledToSource &&
transportService.nodeConnected(recoveryStatus.sourceNode) &&
rounds > 0) {
rounds--;
try {
Thread.sleep(sleepTime);
@ -172,7 +174,7 @@ public class RecoveryTarget extends AbstractComponent {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
doRecovery(request, recoveryStatus, listener);
doRecovery(request, recoveryStatus, listener);
}
});
}
@ -345,7 +347,7 @@ public class RecoveryTarget extends AbstractComponent {
// coming from the recovery target
status.cancel();
// clean open index outputs
Set<Entry<String, IndexOutput>> entrySet = status.cancleAndClearOpenIndexInputs();
Set<Entry<String, IndexOutput>> entrySet = status.cancelAndClearOpenIndexInputs();
Iterator<Entry<String, IndexOutput>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
@ -634,7 +636,7 @@ public class RecoveryTarget extends AbstractComponent {
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.indexShard.state() == IndexShardState.CLOSED) {
cancelRecovery(onGoingRecovery.indexShard);
removeAndCleanOnGoingRecovery(onGoingRecovery);
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}