more info stored on shard recovery process, start time and recovery time

This commit is contained in:
kimchy 2010-08-16 18:22:19 +03:00
parent c7f765d025
commit 92fd9af2b9
2 changed files with 28 additions and 7 deletions

View File

@ -33,6 +33,7 @@ public class OnGoingRecovery {
public static enum Stage {
INIT,
RETRY,
FILES,
TRANSLOG,
FINALIZE
@ -40,6 +41,8 @@ public class OnGoingRecovery {
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
final long startTimeImMillis = System.currentTimeMillis();
volatile long retryTimeInMillis = 0;
List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;

View File

@ -102,18 +102,20 @@ public class RecoveryTarget extends AbstractComponent {
});
}
public void startRecovery(final StartRecoveryRequest request, boolean fromRetry, final RecoveryListener listener) {
public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update");
return;
}
IndexService indexService = indicesService.indexService(request.shardId().index().name());
if (indexService == null) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "index missing, stop recovery");
return;
}
final InternalIndexShard shard = (InternalIndexShard) indexService.shard(request.shardId().id());
if (shard == null) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard missing, stop recovery");
return;
}
@ -127,30 +129,41 @@ public class RecoveryTarget extends AbstractComponent {
}
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
threadPool.cached().execute(new Runnable() {
@Override public void run() {
doRecovery(shard, request, listener);
doRecovery(shard, request, fromRetry, listener);
}
});
}
private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoveryListener listener) {
private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) {
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
OnGoingRecovery recovery;
if (fromRetry) {
recovery = onGoingRecoveries.get(request.shardId());
} else {
recovery = new OnGoingRecovery();
onGoingRecoveries.put(request.shardId(), recovery);
}
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
recovery.stage = OnGoingRecovery.Stage.RETRY;
recovery.retryTimeInMillis = System.currentTimeMillis() - recovery.startTimeImMillis;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
onGoingRecoveries.put(request.shardId(), new OnGoingRecovery());
StopWatch stopWatch = new StopWatch().start();
RecoveryResponse recoveryStatus = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@ -164,7 +177,8 @@ public class RecoveryTarget extends AbstractComponent {
return;
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
removeAndCleanOnGoingRecovery(request.shardId());
recovery.stage = OnGoingRecovery.Stage.RETRY;
recovery.retryTimeInMillis = System.currentTimeMillis() - recovery.startTimeImMillis;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
@ -187,12 +201,11 @@ public class RecoveryTarget extends AbstractComponent {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onRecoveryDone();
} catch (Exception e) {
removeAndCleanOnGoingRecovery(request.shardId());
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
logger.trace("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
@ -206,10 +219,15 @@ public class RecoveryTarget extends AbstractComponent {
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
recovery.stage = OnGoingRecovery.Stage.RETRY;
recovery.retryTimeInMillis = System.currentTimeMillis() - recovery.startTimeImMillis;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
removeAndCleanOnGoingRecovery(request.shardId());
logger.trace("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected");
return;