[RECOVERY] Allow to cancle recovery sources when shards are closed

Today recovery sources are not cancled if a shard is closed. The recovery target
is already cancled when shards are closed but we should also cleanup and cancel
the sources side since it holds on to shard locks / references until it's closed.
This commit is contained in:
Simon Willnauer 2014-11-19 17:22:57 +01:00
parent edc48f39c5
commit 043f18d5ff
4 changed files with 277 additions and 70 deletions

View File

@ -39,9 +39,19 @@ public abstract class AbstractRunnable implements Runnable {
onFailure(ex);
} catch (Throwable t) {
onFailure(t);
} finally {
onAfter();
}
}
/**
* This method is called in a finally block after successful execution
* or on a rejection.
*/
public void onAfter() {
// nothing by default
}
/**
* This method is invoked for all exception thrown by {@link #doRun()}
*/

View File

@ -81,7 +81,12 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don't need to rethrow it.
((AbstractRunnable)command).onRejection(ex);
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();
}
} else {
throw ex;
}

View File

@ -19,22 +19,31 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
@ -54,6 +63,7 @@ public class RecoverySource extends AbstractComponent {
private final TimeValue internalActionTimeout;
private final TimeValue internalActionLongTimeout;
private final OngoingRecoveres ongoingRecoveries = new OngoingRecoveres();
@Inject
@ -64,6 +74,14 @@ public class RecoverySource extends AbstractComponent {
this.indicesService = indicesService;
this.mappingUpdatedAction = mappingUpdatedAction;
this.clusterService = clusterService;
this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
if (indexShard != null) {
ongoingRecoveries.cancel(indexShard, "shard is closed");
}
}
});
this.recoverySettings = recoverySettings;
@ -102,10 +120,14 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
internalActionLongTimeout, clusterService, indicesService, mappingUpdatedAction, logger);
ongoingRecoveries.add(shard, handler);
try {
shard.recover(handler);
} finally {
ongoingRecoveries.remove(shard, handler);
}
return handler.getResponse();
}
@ -127,5 +149,45 @@ public class RecoverySource extends AbstractComponent {
channel.sendResponse(response);
}
}
private static final class OngoingRecoveres {
private final Map<IndexShard, Set<ShardRecoveryHandler>> ongoingRecoveries = new HashMap<>();
synchronized void add(IndexShard shard, ShardRecoveryHandler handler) {
Set<ShardRecoveryHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers == null) {
shardRecoveryHandlers = new HashSet<>();
ongoingRecoveries.put(shard, shardRecoveryHandlers);
}
assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]";
shardRecoveryHandlers.add(handler);
}
synchronized void remove(IndexShard shard, ShardRecoveryHandler handler) {
final Set<ShardRecoveryHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]";
boolean remove = shardRecoveryHandlers.remove(handler);
assert remove : "Handler was not registered [" + handler + "]";
if (shardRecoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
}
}
synchronized void cancel(IndexShard shard, String reason) {
final Set<ShardRecoveryHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers != null) {
final List<Exception> failures = new ArrayList<>();
for (ShardRecoveryHandler handlers : shardRecoveryHandlers) {
try {
handlers.cancel(reason);
} catch (Exception ex) {
failures.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures);
}
}
}
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
@ -59,6 +60,7 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
@ -87,6 +89,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
private final MappingUpdatedAction mappingUpdatedAction;
private final RecoveryResponse response;
private final CancelableThreads cancelableThreads = new CancelableThreads();
public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final TimeValue internalActionTimeout,
@ -129,6 +132,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
*/
@Override
public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchException {
cancelableThreads.failIfCanceled();
// Total size of segment files that are recovered
long totalSize = 0;
// Total size of segment files that were able to be re-used
@ -178,13 +182,18 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.options().withTimeout(internalActionTimeout),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
// This latch will be used to wait until all files have been transferred to the target node
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
@ -213,9 +222,21 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
pool = recoverySettings.concurrentSmallFileStreamPool();
}
pool.execute(new Runnable() {
pool.execute(new AbstractRunnable() {
@Override
public void run() {
public void onFailure(Throwable t) {
// we either got rejected or the store can't be incremented / we are canceled
logger.debug("Failed to transfer file [" + name + "] on recovery");
}
public void onAfter() {
// Signify this file has completed by decrementing the latch
latch.countDown();
}
@Override
protected void doRun() {
cancelableThreads.failIfCanceled();
store.incRef();
final StoreFileMetaData md = recoverySourceMetadata.get(name);
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
@ -226,9 +247,9 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
shouldCompressRequest = false;
}
long len = indexInput.length();
final long len = indexInput.length();
long readCount = 0;
TransportRequestOptions requestOptions = TransportRequestOptions.options()
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
.withCompress(shouldCompressRequest)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(internalActionTimeout);
@ -238,7 +259,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
throw new IndexShardClosedException(shard.shardId());
}
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
final long position = indexInput.getFilePointer();
// Pause using the rate limiter, if desired, to throttle the recovery
if (recoverySettings.rateLimiter() != null) {
@ -246,13 +267,20 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
}
indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead);
final BytesArray content = new BytesArray(buf, 0, toRead);
readCount += toRead;
final boolean lastChunk = readCount == len;
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Actually send the file chunk to the target node, waiting for it to complete
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, readCount == len),
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk),
requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
}
} catch (Throwable e) {
final Throwable corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
@ -274,27 +302,32 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
exceptions.add(0, e); // last exceptions first
}
} finally {
try {
store.decRef();
} finally {
// Signify this file has completed by decrementing the latch
latch.countDown();
}
}
}
});
fileIndex++;
}
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Wait for all files that need to be transferred to finish transferring
latch.await();
}
});
if (corruptedEngine.get() != null) {
throw corruptedEngine.get();
} else {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
final Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
@ -307,6 +340,8 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles),
TransportRequestOptions.options().withTimeout(internalActionTimeout),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
@ -334,14 +369,21 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancelableThreads.failIfCanceled();
logger.trace("{} recovery [phase2] to {}: start", request.shardId(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Send a request preparing the new shard's translog to receive
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()),
TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
stopWatch.stop();
response.startTime = stopWatch.totalTime().millis();
logger.trace("{} recovery [phase2] to {}: start took [{}]",
@ -378,12 +420,16 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancelableThreads.failIfCanceled();
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
StopWatch stopWatch = new StopWatch().start();
// Send the translog operations to the target node
int totalOperations = sendSnapshot(snapshot);
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Send the FINALIZE request to the target node. The finalize request
// clears unreferenced translog files, refreshes the engine now that
// new segments are available, and enables garbage collection of
@ -393,6 +439,9 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
TransportRequestOptions.options().withTimeout(internalActionLongTimeout),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
@ -455,11 +504,12 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
latch.countDown();
}
});
try {
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
if (documentMappersToUpdate.isEmpty()) {
return;
}
@ -503,10 +553,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
int ops = 0;
long size = 0;
int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList();
final List<Translog.Operation> operations = Lists.newArrayList();
Translog.Operation operation = snapshot.next();
TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
final TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
.withCompress(recoverySettings.compress())
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(internalActionLongTimeout);
@ -515,7 +565,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancelableThreads.failIfCanceled();
operations.add(operation);
ops += 1;
size += operation.estimateSize();
@ -534,9 +584,15 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// recoverySettings.rateLimiter().pause(size);
// }
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
ops = 0;
size = 0;
operations.clear();
@ -545,10 +601,84 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
}
// send the leftover
if (!operations.isEmpty()) {
cancelableThreads.run(new Interruptable() {
@Override
public void run() throws InterruptedException {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
}
return totalOperations;
}
/**
* Cancels the recovery and interrupts all eligible threads.
*/
public void cancel(String reason) {
cancelableThreads.cancel(reason);
}
private static final class CancelableThreads {
private final Set<Thread> threads = new HashSet<>();
private boolean canceled = false;
private String reason;
public synchronized boolean isCanceled() {
return canceled;
}
public synchronized void failIfCanceled() {
if (isCanceled()) {
throw new ElasticsearchException("recovery was canceled reason [" + reason + "]");
}
}
private synchronized void add() {
failIfCanceled();
threads.add(Thread.currentThread());
}
public void run(Interruptable interruptable) {
add();
try {
interruptable.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
remove();
}
}
private synchronized void remove() {
threads.remove(Thread.currentThread());
failIfCanceled();
}
public synchronized void cancel(String reason) {
canceled = true;
this.reason = reason;
for (Thread thread : threads) {
thread.interrupt();
}
threads.clear();
}
}
interface Interruptable {
public void run() throws InterruptedException;
}
@Override
public String toString() {
return "ShardRecoveryHandler{" +
"shardId=" + request.shardId() +
", sourceNode=" + request.sourceNode() +
", targetNode=" + request.targetNode() +
'}';
}
}