[RECOVERY] Increment Store refcount on RecoveryTarget
We should make sure we have incremented the store refcount before we start the recovery on the recovyer target. Closes #6844
This commit is contained in:
parent
ab11c6821d
commit
e8ff007852
|
@ -169,16 +169,36 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
||||||
*
|
*
|
||||||
* Note: Close can safely be called multiple times.
|
* Note: Close can safely be called multiple times.
|
||||||
* @see #decRef
|
* @see #decRef
|
||||||
|
* @see #tryIncRef()
|
||||||
|
* @throws AlreadyClosedException iff the reference counter can not be incremented.
|
||||||
*/
|
*/
|
||||||
public final void incRef() {
|
public final void incRef() {
|
||||||
|
if (tryIncRef() == false) {
|
||||||
|
throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + refCount.get() + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to increment the refCount of this Store instance. This method will return <tt>true</tt> iff the refCount was
|
||||||
|
* incremented successfully otherwise <tt>false</tt>. RefCounts are used to determine when a
|
||||||
|
* Store can be closed safely, i.e. as soon as there are no more references. Be sure to always call a
|
||||||
|
* corresponding {@link #decRef}, in a finally clause; otherwise the store may never be closed. Note that
|
||||||
|
* {@link #close} simply calls decRef(), which means that the Store will not really be closed until {@link
|
||||||
|
* #decRef} has been called for all outstanding references.
|
||||||
|
*
|
||||||
|
* Note: Close can safely be called multiple times.
|
||||||
|
* @see #decRef()
|
||||||
|
* @see #incRef()
|
||||||
|
*/
|
||||||
|
public final boolean tryIncRef() {
|
||||||
do {
|
do {
|
||||||
int i = refCount.get();
|
int i = refCount.get();
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
if (refCount.compareAndSet(i, i + 1)) {
|
if (refCount.compareAndSet(i, i + 1)) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + i + "]");
|
return false;
|
||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -172,7 +172,7 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
threadPool.generic().execute(new Runnable() {
|
threadPool.generic().execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
doRecovery(request, recoveryStatus, listener);
|
doRecovery(request, recoveryStatus, listener);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,6 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
|
|
||||||
private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
|
private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
|
||||||
assert request.sourceNode() != null : "can't do a recovery without a source node";
|
assert request.sourceNode() != null : "can't do a recovery without a source node";
|
||||||
|
|
||||||
final InternalIndexShard shard = recoveryStatus.indexShard;
|
final InternalIndexShard shard = recoveryStatus.indexShard;
|
||||||
if (shard == null) {
|
if (shard == null) {
|
||||||
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
|
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
|
||||||
|
@ -205,106 +204,111 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
recoveryStatus.recoveryThread = Thread.currentThread();
|
recoveryStatus.recoveryThread = Thread.currentThread();
|
||||||
|
if (shard.store().tryIncRef()) {
|
||||||
|
try {
|
||||||
|
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
|
||||||
|
|
||||||
try {
|
StopWatch stopWatch = new StopWatch().start();
|
||||||
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
|
RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
|
||||||
|
@Override
|
||||||
StopWatch stopWatch = new StopWatch().start();
|
public RecoveryResponse newInstance() {
|
||||||
RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
|
return new RecoveryResponse();
|
||||||
@Override
|
}
|
||||||
public RecoveryResponse newInstance() {
|
}).txGet();
|
||||||
return new RecoveryResponse();
|
if (shard.state() == IndexShardState.CLOSED) {
|
||||||
|
removeAndCleanOnGoingRecovery(recoveryStatus);
|
||||||
|
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
stopWatch.stop();
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
|
||||||
|
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
|
||||||
|
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
|
||||||
|
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
|
||||||
|
.append("\n");
|
||||||
|
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
|
||||||
|
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
|
||||||
|
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
|
||||||
|
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
|
||||||
|
.append("\n");
|
||||||
|
sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
|
||||||
|
.append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
|
||||||
|
logger.trace(sb.toString());
|
||||||
|
} else if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("{} recovery completed from [{}], took [{}]", request.shardId(), request.sourceNode(), stopWatch.totalTime());
|
||||||
}
|
}
|
||||||
}).txGet();
|
|
||||||
if (shard.state() == IndexShardState.CLOSED) {
|
|
||||||
removeAndCleanOnGoingRecovery(recoveryStatus);
|
removeAndCleanOnGoingRecovery(recoveryStatus);
|
||||||
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
|
listener.onRecoveryDone();
|
||||||
return;
|
} catch (Throwable e) {
|
||||||
}
|
if (logger.isTraceEnabled()) {
|
||||||
stopWatch.stop();
|
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
|
||||||
if (logger.isTraceEnabled()) {
|
}
|
||||||
StringBuilder sb = new StringBuilder();
|
if (recoveryStatus.isCanceled()) {
|
||||||
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
|
// don't remove it, the cancellation code will remove it...
|
||||||
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
|
listener.onIgnoreRecovery(false, "canceled recovery");
|
||||||
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
|
return;
|
||||||
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
|
}
|
||||||
.append("\n");
|
if (shard.state() == IndexShardState.CLOSED) {
|
||||||
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
|
removeAndCleanOnGoingRecovery(recoveryStatus);
|
||||||
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
|
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
|
||||||
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
|
return;
|
||||||
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
|
}
|
||||||
.append("\n");
|
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||||
sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
|
if (cause instanceof RecoveryEngineException) {
|
||||||
.append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
|
// unwrap an exception that was thrown as part of the recovery
|
||||||
logger.trace(sb.toString());
|
cause = cause.getCause();
|
||||||
} else if (logger.isDebugEnabled()) {
|
}
|
||||||
logger.debug("{} recovery completed from [{}], took [{}]", request.shardId(), request.sourceNode(), stopWatch.totalTime());
|
// do it twice, in case we have double transport exception
|
||||||
}
|
cause = ExceptionsHelper.unwrapCause(cause);
|
||||||
removeAndCleanOnGoingRecovery(recoveryStatus);
|
if (cause instanceof RecoveryEngineException) {
|
||||||
listener.onRecoveryDone();
|
// unwrap an exception that was thrown as part of the recovery
|
||||||
} catch (Throwable e) {
|
cause = cause.getCause();
|
||||||
if (logger.isTraceEnabled()) {
|
}
|
||||||
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
|
|
||||||
}
|
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
|
||||||
if (recoveryStatus.isCanceled()) {
|
|
||||||
// don't remove it, the cancellation code will remove it...
|
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
|
||||||
listener.onIgnoreRecovery(false, "canceled recovery");
|
// if the target is not ready yet, retry
|
||||||
return;
|
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
|
||||||
}
|
return;
|
||||||
if (shard.state() == IndexShardState.CLOSED) {
|
}
|
||||||
|
|
||||||
|
if (cause instanceof DelayRecoveryException) {
|
||||||
|
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// here, we check against ignore recovery options
|
||||||
|
|
||||||
|
// in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later
|
||||||
|
// it will get deleted in the IndicesStore if all are allocated and no shard exists on this node...
|
||||||
|
|
||||||
removeAndCleanOnGoingRecovery(recoveryStatus);
|
removeAndCleanOnGoingRecovery(recoveryStatus);
|
||||||
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
|
|
||||||
return;
|
if (cause instanceof ConnectTransportException) {
|
||||||
|
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cause instanceof IndexShardClosedException) {
|
||||||
|
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cause instanceof AlreadyClosedException) {
|
||||||
|
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warn("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
|
||||||
|
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
|
||||||
|
} finally {
|
||||||
|
shard.store().decRef();
|
||||||
}
|
}
|
||||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
} else {
|
||||||
if (cause instanceof RecoveryEngineException) {
|
listener.onIgnoreRecovery(false, "local store closed, stop recovery");
|
||||||
// unwrap an exception that was thrown as part of the recovery
|
|
||||||
cause = cause.getCause();
|
|
||||||
}
|
|
||||||
// do it twice, in case we have double transport exception
|
|
||||||
cause = ExceptionsHelper.unwrapCause(cause);
|
|
||||||
if (cause instanceof RecoveryEngineException) {
|
|
||||||
// unwrap an exception that was thrown as part of the recovery
|
|
||||||
cause = cause.getCause();
|
|
||||||
}
|
|
||||||
|
|
||||||
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
|
|
||||||
|
|
||||||
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
|
|
||||||
// if the target is not ready yet, retry
|
|
||||||
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cause instanceof DelayRecoveryException) {
|
|
||||||
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// here, we check against ignore recovery options
|
|
||||||
|
|
||||||
// in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later
|
|
||||||
// it will get deleted in the IndicesStore if all are allocated and no shard exists on this node...
|
|
||||||
|
|
||||||
removeAndCleanOnGoingRecovery(recoveryStatus);
|
|
||||||
|
|
||||||
if (cause instanceof ConnectTransportException) {
|
|
||||||
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cause instanceof IndexShardClosedException) {
|
|
||||||
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cause instanceof AlreadyClosedException) {
|
|
||||||
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.warn("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
|
|
||||||
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue