Decouple recovery source/target logic and transport piping

The current logic for doing recovery from a source to a target shourd is tightly coupled with the underlying network pipes. This changes decouple the two, making it easier to add unit tests for shard recovery that doesn't involve the node and network environment.

On top that, RecoveryTarget is renamed to RecoveryTargetService leaving space to renaming RecoveryStatus to RecoveryTarget (and thus avoid the confusion we have today with RecoveryState).

Correspondingly RecoverySource is renamed to RecoverySourceService.

Closes #16605
This commit is contained in:
Boaz Leskes 2016-02-11 09:45:43 +01:00
parent 52ee4c7027
commit 8bc2332d9a
23 changed files with 1333 additions and 1026 deletions

View File

@ -662,9 +662,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySource.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySourceHandler.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryStatus.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryTarget.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]SharedFSRecoverySourceHandler.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]StartRecoveryRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />

View File

@ -80,14 +80,32 @@ public class CancellableThreads {
* @param interruptable code to run
*/
public void execute(Interruptable interruptable) {
try {
executeIO(interruptable);
} catch (IOException e) {
assert false : "the passed interruptable can not result in an IOException";
throw new RuntimeException("unexpected IO exception", e);
}
}
/**
* run the Interruptable, capturing the executing thread. Concurrent calls to {@link #cancel(String)} will interrupt this thread
* causing the call to prematurely return.
*
* @param interruptable code to run
*/
public void executeIO(IOInterruptable interruptable) throws IOException {
boolean wasInterrupted = add();
RuntimeException throwable = null;
RuntimeException runtimeException = null;
IOException ioException = null;
try {
interruptable.run();
} catch (InterruptedException | ThreadInterruptedException e) {
// assume this is us and ignore
} catch (RuntimeException t) {
throwable = t;
runtimeException = t;
} catch (IOException e) {
ioException = e;
} finally {
remove();
}
@ -101,10 +119,14 @@ public class CancellableThreads {
}
synchronized (this) {
if (isCancelled()) {
onCancel(reason, throwable);
} else if (throwable != null) {
onCancel(reason, ioException != null ? ioException : runtimeException);
} else if (ioException != null) {
// if we're not canceling, we throw the original exception
throw throwable;
throw ioException;
}
if (runtimeException != null) {
// if we're not canceling, we throw the original exception
throw runtimeException;
}
}
}
@ -131,10 +153,14 @@ public class CancellableThreads {
}
public interface Interruptable {
public interface Interruptable extends IOInterruptable {
void run() throws InterruptedException;
}
public interface IOInterruptable {
void run() throws IOException, InterruptedException;
}
public static class ExecutionCancelledException extends ElasticsearchException {
public ExecutionCancelledException(String msg) {

View File

@ -61,7 +61,7 @@ import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -155,7 +155,7 @@ public class IndicesModule extends AbstractModule {
bind(IndicesService.class).asEagerSingleton();
bind(RecoverySettings.class).asEagerSingleton();
bind(RecoveryTarget.class).asEagerSingleton();
bind(RecoveryTargetService.class).asEagerSingleton();
bind(RecoverySource.class).asEagerSingleton();
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();

View File

@ -63,7 +63,7 @@ import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.RestoreService;
@ -83,7 +83,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final IndicesService indicesService;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final RecoveryTarget recoveryTarget;
private final RecoveryTargetService recoveryTargetService;
private final ShardStateAction shardStateAction;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;
@ -105,7 +105,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ThreadPool threadPool, RecoveryTargetService recoveryTargetService,
ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
@ -113,11 +113,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
SearchService searchService, SyncedFlushService syncedFlushService,
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider) {
super(settings);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTargetService, searchService, syncedFlushService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.recoveryTarget = recoveryTarget;
this.recoveryTargetService = recoveryTargetService;
this.shardStateAction = shardStateAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
@ -466,7 +466,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} else if (isPeerRecovery(shardRouting)) {
final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
if (recoveryTarget.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", status -> !status.sourceNode().equals(sourceNode))) {
if (recoveryTargetService.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", status -> !status.sourceNode().equals(sourceNode))) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
@ -609,7 +609,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA;
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.localNode());
indexShard.markAsRecovering("from " + sourceNode, recoveryState);
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
recoveryTargetService.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
indexShard.failShard("corrupted preexisting index", e);
handleRecoveryFailure(indexService, shardRouting, true, e);
@ -698,7 +698,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
return !shardRouting.primary() || shardRouting.relocatingNodeId() != null;
}
private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {
private class PeerRecoveryListener implements RecoveryTargetService.RecoveryListener {
private final ShardRouting shardRouting;
private final IndexService indexService;

View File

@ -37,13 +37,13 @@ import java.util.function.Predicate;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
* of those recoveries). The class is used to guarantee concurrent semantics such that once a recoveries was done/cancelled/failed
* no other thread will be able to find it. Last, the {@link StatusRef} inner class verifies that recovery temporary files
* no other thread will be able to find it. Last, the {@link RecoveryRef} inner class verifies that recovery temporary files
* and store will only be cleared once on going usage is finished.
*/
public class RecoveriesCollection {
/** This is the single source of truth for ongoing recoveries. If it's not here, it was canceled or done */
private final ConcurrentMap<Long, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<Long, RecoveryTarget> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
final private ESLogger logger;
final private ThreadPool threadPool;
@ -59,9 +59,9 @@ public class RecoveriesCollection {
* @return the id of the new recovery.
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
RecoveryTarget.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryStatus status = new RecoveryStatus(indexShard, sourceNode, listener);
RecoveryStatus existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
RecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener);
RecoveryTarget existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC,
@ -70,33 +70,33 @@ public class RecoveriesCollection {
}
/**
* gets the {@link RecoveryStatus } for a given id. The RecoveryStatus returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link RecoveryStatus#decRef()} when you are done with it, typically
* gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically
* by using this method in a try-with-resources clause.
* <p>
* Returns null if recovery is not found
*/
public StatusRef getStatus(long id) {
RecoveryStatus status = onGoingRecoveries.get(id);
public RecoveryRef getRecovery(long id) {
RecoveryTarget status = onGoingRecoveries.get(id);
if (status != null && status.tryIncRef()) {
return new StatusRef(status);
return new RecoveryRef(status);
}
return null;
}
/** Similar to {@link #getStatus(long)} but throws an exception if no recovery is found */
public StatusRef getStatusSafe(long id, ShardId shardId) {
StatusRef statusRef = getStatus(id);
if (statusRef == null) {
/** Similar to {@link #getRecovery(long)} but throws an exception if no recovery is found */
public RecoveryRef getRecoverySafe(long id, ShardId shardId) {
RecoveryRef recoveryRef = getRecovery(id);
if (recoveryRef == null) {
throw new IndexShardClosedException(shardId);
}
assert statusRef.status().shardId().equals(shardId);
return statusRef;
assert recoveryRef.status().shardId().equals(shardId);
return recoveryRef;
}
/** cancel the recovery with the given id (if found) and remove it from the recovery collection */
public boolean cancelRecovery(long id, String reason) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
RecoveryTarget removed = onGoingRecoveries.remove(id);
boolean cancelled = false;
if (removed != null) {
logger.trace("{} canceled recovery from {}, id [{}] (reason [{}])",
@ -115,7 +115,7 @@ public class RecoveriesCollection {
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFailure) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} failing recovery from {}, id [{}]. Send shard failure: [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId(), sendShardFailure);
removed.fail(e, sendShardFailure);
@ -124,7 +124,7 @@ public class RecoveriesCollection {
/** mark the recovery with the given id as done (if found) */
public void markRecoveryAsDone(long id) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId());
removed.markAsDone();
@ -151,9 +151,9 @@ public class RecoveriesCollection {
* already issued outstanding references.
* @return true if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate<RecoveryStatus> shouldCancel) {
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, Predicate<RecoveryTarget> shouldCancel) {
boolean cancelled = false;
for (RecoveryStatus status : onGoingRecoveries.values()) {
for (RecoveryTarget status : onGoingRecoveries.values()) {
if (status.shardId().equals(shardId)) {
boolean cancel = false;
// if we can't increment the status, the recovery is not there any more.
@ -174,20 +174,20 @@ public class RecoveriesCollection {
/**
* a reference to {@link RecoveryStatus}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryStatus#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveriesCollection.StatusRef#close()} is called.
* a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveryRef#close()} is called.
*/
public static class StatusRef implements AutoCloseable {
public static class RecoveryRef implements AutoCloseable {
private final RecoveryStatus status;
private final RecoveryTarget status;
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Important: {@link org.elasticsearch.indices.recovery.RecoveryStatus#tryIncRef()} should
* Important: {@link RecoveryTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public StatusRef(RecoveryStatus status) {
public RecoveryRef(RecoveryTarget status) {
this.status = status;
this.status.setLastAccessTime();
}
@ -199,7 +199,7 @@ public class RecoveriesCollection {
}
}
public RecoveryStatus status() {
public RecoveryTarget status() {
return status;
}
}
@ -223,7 +223,7 @@ public class RecoveriesCollection {
@Override
protected void doRun() throws Exception {
RecoveryStatus status = onGoingRecoveries.get(recoveryId);
RecoveryTarget status = onGoingRecoveries.get(recoveryId);
if (status == null) {
logger.trace("[monitor] no status found for [{}], shutting down", recoveryId);
return;

View File

@ -120,10 +120,13 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
final RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(),
recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, logger);
} else {
handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt(), logger);
}
ongoingRecoveries.add(shard, handler);
try {

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
@ -47,18 +46,13 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
@ -82,9 +76,8 @@ public class RecoverySourceHandler {
private final int shardId;
// Request containing source and target node information
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
protected final RecoveryResponse response;
@ -104,16 +97,17 @@ public class RecoverySourceHandler {
}
};
public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final ESLogger logger) {
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
final StartRecoveryRequest request,
final int fileChunkSizeInBytes,
final ESLogger logger) {
this.shard = shard;
this.recoveryTarget = recoveryTarget;
this.request = request;
this.recoverySettings = recoverySettings;
this.logger = logger;
this.transportService = transportService;
this.indexName = this.request.shardId().getIndex().getName();
this.shardId = this.request.shardId().id();
this.chunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
this.chunkSizeInBytes = fileChunkSizeInBytes;
this.response = new RecoveryResponse();
}
@ -200,11 +194,14 @@ public class RecoverySourceHandler {
final long numDocsTarget = request.metadataSnapshot().getNumDocs();
final long numDocsSource = recoverySourceMetadata.getNumDocs();
if (numDocsTarget != numDocsSource) {
throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number of docs differ: " + numDocsTarget + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsSource + "(" + request.targetNode().getName() + ")");
throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " +
"of docs differ: " + numDocsTarget + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsSource
+ "(" + request.targetNode().getName() + ")");
}
// we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target.
// so we don't return here
logger.trace("[{}][{}] skipping [phase1] to {} - identical sync id [{}] found on both source and target", indexName, shardId,
logger.trace("[{}][{}] skipping [phase1] to {} - identical sync id [{}] found on both source and target", indexName,
shardId,
request.targetNode(), recoverySourceSyncId);
} else {
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
@ -213,7 +210,8 @@ public class RecoverySourceHandler {
response.phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]",
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}]," +
" size [{}]",
indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length());
}
totalSize += md.length();
@ -223,7 +221,8 @@ public class RecoverySourceHandler {
phase1Files.addAll(diff.missing);
for (StoreFileMetaData md : phase1Files) {
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote " +
"[{}], local [{}]",
indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote",
@ -237,20 +236,16 @@ public class RecoverySourceHandler {
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with " +
"total_size [{}]",
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() -> {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
translogView.totalOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
response.phase1ExistingFileSizes, translogView.totalOperations()));
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes);
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
@ -261,23 +256,19 @@ public class RecoverySourceHandler {
// related to this recovery (out of date segments, for example)
// are deleted
try {
cancellableThreads.execute(() -> {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
} catch (RemoteTransportException remoteException) {
cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogView.totalOperations(), recoverySourceMetadata));
} catch (RemoteTransportException | IOException targetException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
// the index was corrupted:
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
// - maybe due to old segments without checksums or length only checks
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) {
try {
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
StoreFileMetaData[] metadata =
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new
StoreFileMetaData[size]);
ArrayUtil.timSort(metadata, (o1, o2) -> {
return Long.compare(o1.length(), o2.length()); // check small files first
});
@ -291,17 +282,18 @@ public class RecoverySourceHandler {
}
}
} catch (IOException ex) {
remoteException.addSuppressed(ex);
throw remoteException;
targetException.addSuppressed(ex);
throw targetException;
}
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(remoteException);
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
"checksums are ok", null);
exception.addSuppressed(targetException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException;
throw targetException;
}
}
}
@ -318,22 +310,14 @@ public class RecoverySourceHandler {
}
protected void prepareTargetForTranslog(final int totalTranslogOps) {
protected void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
final long startEngineStart = stopWatch.totalTime().millis();
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Send a request preparing the new shard's translog to receive
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
// Send a request preparing the new shard's translog to receive
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
stopWatch.stop();
response.startTime = stopWatch.totalTime().millis() - startEngineStart;
@ -378,20 +362,7 @@ public class RecoverySourceHandler {
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Send the FINALIZE request to the target node. The finalize request
// clears unreferenced translog files, refreshes the engine now that
// new segments are available, and enables garbage collection of
// tombstone files. The shard is also moved to the POST_RECOVERY phase
// during this time
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
cancellableThreads.execute(recoveryTarget::finalizeRecovery);
if (isPrimaryRelocation()) {
/**
@ -408,7 +379,7 @@ public class RecoverySourceHandler {
}
stopWatch.stop();
logger.trace("[{}][{}] finalizing recovery to {}: took [{}]",
indexName, shardId, request.targetNode(), stopWatch.totalTime());
indexName, shardId, request.targetNode(), stopWatch.totalTime());
}
protected boolean isPrimaryRelocation() {
@ -435,12 +406,6 @@ public class RecoverySourceHandler {
throw new ElasticsearchException("failed to get next operation from translog", ex);
}
final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
.withCompress(true)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
if (operation == null) {
logger.trace("[{}][{}] no translog operations to send to {}",
indexName, shardId, request.targetNode());
@ -464,12 +429,7 @@ public class RecoverySourceHandler {
// index docs to replicas while the index files are recovered
// the lock can potentially be removed, in which case, it might
// make sense to re-enable throttling in this phase
cancellableThreads.execute(() -> {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, snapshot.totalOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
indexName, shardId, ops, new ByteSizeValue(size),
@ -489,12 +449,7 @@ public class RecoverySourceHandler {
}
// send the leftover
if (!operations.isEmpty()) {
cancellableThreads.execute(() -> {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, snapshot.totalOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));
}
if (logger.isTraceEnabled()) {
@ -525,13 +480,11 @@ public class RecoverySourceHandler {
final class RecoveryOutputStream extends OutputStream {
private final StoreFileMetaData md;
private final AtomicLong bytesSinceLastPause;
private final Translog.View translogView;
private long position = 0;
RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) {
RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView) {
this.md = md;
this.bytesSinceLastPause = bytesSinceLastPause;
this.translogView = translogView;
}
@ -548,43 +501,10 @@ public class RecoverySourceHandler {
}
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder()
.withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so we are safing the cpu for other things
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();
cancellableThreads.execute(() -> {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
try {
throttleTimeInNanos = rl.pause(bytes);
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
} catch (IOException e) {
throw new ElasticsearchException("failed to pause recovery", e);
}
} else {
throttleTimeInNanos = 0;
}
} else {
throttleTimeInNanos = 0;
}
// Actually send the file chunk to the target node, waiting for it to complete
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk,
translogView.totalOperations(),
/* we send totalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
// Actually send the file chunk to the target node, waiting for it to complete
cancellableThreads.executeIO(() ->
recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.totalOperations())
);
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(request.shardId());
}
@ -594,7 +514,7 @@ public class RecoverySourceHandler {
void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Throwable {
store.incRef();
try {
ArrayUtil.timSort(files, (a,b) -> Long.compare(a.length(), b.length())); // send smallest first
ArrayUtil.timSort(files, (a, b) -> Long.compare(a.length(), b.length())); // send smallest first
for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
@ -609,10 +529,11 @@ public class RecoverySourceHandler {
failEngine(corruptIndexException);
throw corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
"checksums are ok", null);
exception.addSuppressed(t);
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
corruptIndexException, shardId, request.targetNode(), md);
corruptIndexException, shardId, request.targetNode(), md);
throw exception;
}
} else {

View File

@ -1,288 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*/
public class RecoveryStatus extends AbstractRefCounted {
private final ESLogger logger;
private final static AtomicLong idGenerator = new AtomicLong();
private final String RECOVERY_PREFIX = "recovery.";
private final ShardId shardId;
private final long recoveryId;
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final String tempFilePrefix;
private final Store store;
private final RecoveryTarget.RecoveryListener listener;
private final AtomicBoolean finished = new AtomicBoolean();
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
private final CancellableThreads cancellableThreads = new CancellableThreads();
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();
public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTarget.RecoveryListener listener) {
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId());
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + indexShard.recoveryState().getTimer().startTime() + ".";
this.store = indexShard.store();
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
}
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
public long recoveryId() {
return recoveryId;
}
public ShardId shardId() {
return shardId;
}
public IndexShard indexShard() {
ensureRefCount();
return indexShard;
}
public DiscoveryNode sourceNode() {
return this.sourceNode;
}
public RecoveryState state() {
return indexShard.recoveryState();
}
public CancellableThreads CancellableThreads() {
return cancellableThreads;
}
/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
}
/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}
public Store store() {
ensureRefCount();
return store;
}
public RecoveryState.Stage stage() {
return state().getStage();
}
public Store.LegacyChecksums legacyChecksums() {
return legacyChecksums;
}
/** renames all temporary files to their true name, potentially overriding existing files */
public void renameAllTempFiles() throws IOException {
ensureRefCount();
store.renameTempFilesSafe(tempFileNames);
}
/**
* cancel the recovery. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
* <p>
* if {@link #CancellableThreads()} was used, the threads will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
try {
logger.debug("recovery canceled (reason: [{}])", reason);
cancellableThreads.cancel(reason);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
/**
* fail the recovery and call listener
*
* @param e exception that encapsulating the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
listener.onRecoveryFailure(state(), e, sendShardFailure);
} finally {
try {
cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
}
/** mark the current recovery as done */
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
assert tempFileNames.isEmpty() : "not all temporary files are renamed";
try {
// this might still throw an exception ie. if the shard is CLOSED due to some other event.
// it's safer to decrement the reference in a try finally here.
indexShard.postRecovery("peer recovery done");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
listener.onRecoveryDone(state());
}
}
/** Get a temporary name for the provided file name. */
public String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}
public IndexOutput getOpenIndexOutput(String key) {
ensureRefCount();
return openIndexOutputs.get(key);
}
/** remove and {@link org.apache.lucene.store.IndexOutput} for a given file. It is the caller's responsibility to close it */
public IndexOutput removeOpenIndexOutputs(String name) {
ensureRefCount();
return openIndexOutputs.remove(name);
}
/**
* Creates an {@link org.apache.lucene.store.IndexOutput} for the given file name. Note that the
* IndexOutput actually point at a temporary file.
* <p>
* Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput
* at a later stage
*/
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
ensureRefCount();
String tempFileName = getTempNameForFile(fileName);
if (tempFileNames.containsKey(tempFileName)) {
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
}
// add first, before it's created
tempFileNames.put(tempFileName, fileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}
public void resetRecovery() throws IOException {
cleanOpenFiles();
indexShard().performRecoveryRestart();
}
@Override
protected void closeInternal() {
try {
cleanOpenFiles();
} finally {
// free store. increment happens in constructor
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
}
}
protected void cleanOpenFiles() {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Throwable t) {
logger.debug("error while closing recovery output [{}]", t, entry.getValue());
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
legacyChecksums.clear();
}
@Override
public String toString() {
return shardId + " [" + recoveryId + "]";
}
private void ensureRefCount() {
if (refCount() <= 0) {
throw new ElasticsearchException("RecoveryStatus is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls");
}
}
}

View File

@ -22,505 +22,392 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
* <p>
* Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and
* not several of them (since we don't allocate several shard replicas to the same node).
*
*/
public class RecoveryTarget extends AbstractComponent implements IndexEventListener {
public static class Actions {
public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo";
public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk";
public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files";
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler {
private final ESLogger logger;
private final static AtomicLong idGenerator = new AtomicLong();
private final String RECOVERY_PREFIX = "recovery.";
private final ShardId shardId;
private final long recoveryId;
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final String tempFilePrefix;
private final Store store;
private final RecoveryTargetService.RecoveryListener listener;
private final AtomicBoolean finished = new AtomicBoolean();
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
private final CancellableThreads cancellableThreads = new CancellableThreads();
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId());
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + indexShard.recoveryState().getTimer().startTime() + ".";
this.store = indexShard.store();
indexShard.recoveryStats().incCurrentAsTarget();
// make sure the store is not released until we are done.
store.incRef();
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final RecoveriesCollection onGoingRecoveries;
@Inject
public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, RecoverySettings recoverySettings, ClusterService clusterService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new FilesInfoRequestHandler());
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new FinalizeRecoveryRequestHandler());
public long recoveryId() {
return recoveryId;
}
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
public ShardId shardId() {
return shardId;
}
public IndexShard indexShard() {
ensureRefCount();
return indexShard;
}
public DiscoveryNode sourceNode() {
return this.sourceNode;
}
public RecoveryState state() {
return indexShard.recoveryState();
}
public CancellableThreads CancellableThreads() {
return cancellableThreads;
}
/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
}
/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}
public Store store() {
ensureRefCount();
return store;
}
public RecoveryState.Stage stage() {
return state().getStage();
}
public Store.LegacyChecksums legacyChecksums() {
return legacyChecksums;
}
/** renames all temporary files to their true name, potentially overriding existing files */
public void renameAllTempFiles() throws IOException {
ensureRefCount();
store.renameTempFilesSafe(tempFileNames);
}
/**
* cancel the recovery. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
* <p>
* if {@link #CancellableThreads()} was used, the threads will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
try {
logger.debug("recovery canceled (reason: [{}])", reason);
cancellableThreads.cancel(reason);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
/**
* cancel all ongoing recoveries for the given shard, if their status match a predicate
* fail the recovery and call listener
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel recoveries
* @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check.
* note that the recovery state can change after this check, but before it is being cancelled via other
* already issued outstanding references.
* @return true if a recovery was cancelled
* @param e exception that encapsulating the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate<RecoveryStatus> shouldCancel) {
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel);
}
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
protected void retryRecovery(final RecoveryStatus recoveryStatus, final Throwable reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
logger.trace("will retry recovery with id [{}] in [{}]", reason, recoveryStatus.recoveryId(), retryAfter);
retryRecovery(recoveryStatus, retryAfter, currentRequest);
}
protected void retryRecovery(final RecoveryStatus recoveryStatus, final String reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryStatus.recoveryId(), retryAfter, reason);
retryRecovery(recoveryStatus, retryAfter, currentRequest);
}
private void retryRecovery(final RecoveryStatus recoveryStatus, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
try {
recoveryStatus.resetRecovery();
} catch (Throwable e) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(currentRequest, e), true);
}
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryStatus.recoveryId()));
}
private void doRecovery(final RecoveryStatus recoveryStatus) {
assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node";
logger.trace("collecting local files for {}", recoveryStatus);
Store.MetadataSnapshot metadataSnapshot = null;
try {
metadataSnapshot = recoveryStatus.store().getMetadataOrEmpty();
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} catch (Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while listing local files, failing recovery", e);
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(),
new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true);
return;
}
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().getIndex().getName(), request.shardId().id(), request.sourceNode());
recoveryStatus.indexShard().prepareForIndexRecovery();
recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() {
@Override
public void run() throws InterruptedException {
responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet());
}
});
final RecoveryResponse recoveryResponse = responseHolder.get();
assert responseHolder != null;
final TimeValue recoveryTime = new TimeValue(recoveryStatus.state().getTimer().time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryStatus.recoveryId());
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
logger.trace(sb.toString());
} else {
logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), recoveryStatus.sourceNode(), recoveryTime);
}
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().getIndex().getName(), request.shardId().id());
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
// this can also come from the source wrapped in a RemoteTransportException
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source has canceled the recovery", cause), false);
return;
}
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException || cause instanceof ShardNotFoundException) {
// if the target is not ready yet, retry
retryRecovery(recoveryStatus, "remote shard not ready", recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryStatus, cause, recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", recoveryStatus.shardId(), recoverySettings.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryStatus, cause.getMessage(), recoverySettings.retryDelayNetwork(), request);
return;
}
if (cause instanceof IndexShardClosedException) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, e), true);
}
}
public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
recoveryStatus.indexShard().skipTranslogRecovery();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class FinalizeRecoveryRequestHandler implements TransportRequestHandler<RecoveryFinalizeRecoveryRequest> {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.indexShard().finalizeRecovery();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
translog.totalOperations(request.totalTranslogOps());
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
try {
recoveryStatus.indexShard().performBatchRecovery(request.operations());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class);
if (mapperException == null) {
throw exception;
}
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes. we want to wait until these mappings are processed.
logger.trace("delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", exception, exception.completedOperations());
translog.decrementRecoveredOperations(exception.completedOperations());
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel);
} catch (Exception e) {
onFailure(e);
}
}
protected void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
}
}
@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
}
@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout + "])"));
}
});
}
}
}
}
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Index index = recoveryStatus.state().getIndex();
for (int i = 0; i < request.phase1ExistingFileNames.size(); i++) {
index.addFileDetail(request.phase1ExistingFileNames.get(i), request.phase1ExistingFileSizes.get(i), true);
}
for (int i = 0; i < request.phase1FileNames.size(); i++) {
index.addFileDetail(request.phase1FileNames.get(i), request.phase1FileSizes.get(i), false);
}
recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps);
recoveryStatus.state().getTranslog().totalOperationsOnStart(request.totalTranslogOps);
// recoveryBytesCount / recoveryFileCount will be set as we go...
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}
class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanFilesRequest> {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
recoveryStatus.indexShard().deleteShardState(); // we have to delete it first since even if we fail to rename the shard might be invalid
recoveryStatus.renameAllTempFiles();
final Store store = recoveryStatus.store();
// now write checksums
recoveryStatus.legacyChecksums().write(store);
Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
try {
try {
store.removeCorruptionMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
}
} catch (Throwable e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
}
RecoveryFailedException rfe = new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
recoveryStatus.fail(rfe, true);
throw rfe;
} catch (Exception ex) {
RecoveryFailedException rfe = new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
recoveryStatus.fail(rfe, true);
throw rfe;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}
class FileChunkTransportRequestHandler implements TransportRequestHandler<RecoveryFileChunkRequest> {
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
final Store store = recoveryStatus.store();
recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps());
final RecoveryState.Index indexState = recoveryStatus.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}
IndexOutput indexOutput;
if (request.position() == 0) {
indexOutput = recoveryStatus.openAndPutIndexOutput(request.name(), request.metadata(), store);
} else {
indexOutput = recoveryStatus.getOpenIndexOutput(request.name());
}
BytesReference content = request.content();
if (!content.hasArray()) {
content = content.toBytesArray();
}
RateLimiter rl = recoverySettings.rateLimiter();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rl.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
recoveryStatus.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
indexState.addRecoveredBytesToFile(request.name(), content.length());
if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
try {
Store.verify(indexOutput);
} finally {
// we are done
indexOutput.close();
}
// write the checksum
recoveryStatus.legacyChecksums().add(request.metadata());
final String temporaryFileName = recoveryStatus.getTempNameForFile(request.name());
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName);
store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = recoveryStatus.removeOpenIndexOutputs(request.name());
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class RecoveryRunner extends AbstractRunnable {
final long recoveryId;
RecoveryRunner(long recoveryId) {
this.recoveryId = recoveryId;
}
@Override
public void onFailure(Throwable t) {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatus(recoveryId)) {
if (statusRef != null) {
logger.error("unexpected error during recovery [{}], failing shard", t, recoveryId);
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(statusRef.status().state(), "unexpected error", t),
true // be safe
);
} else {
logger.debug("unexpected error during recovery, but recovery id [{}] is finished", t, recoveryId);
}
}
}
@Override
public void doRun() {
RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatus(recoveryId);
if (statusRef == null) {
logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId);
return;
}
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
doRecovery(statusRef.status());
listener.onRecoveryFailure(state(), e, sendShardFailure);
} finally {
statusRef.close();
try {
cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
}
/** mark the current recovery as done */
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
assert tempFileNames.isEmpty() : "not all temporary files are renamed";
try {
// this might still throw an exception ie. if the shard is CLOSED due to some other event.
// it's safer to decrement the reference in a try finally here.
indexShard.postRecovery("peer recovery done");
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
listener.onRecoveryDone(state());
}
}
/** Get a temporary name for the provided file name. */
public String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}
public IndexOutput getOpenIndexOutput(String key) {
ensureRefCount();
return openIndexOutputs.get(key);
}
/** remove and {@link org.apache.lucene.store.IndexOutput} for a given file. It is the caller's responsibility to close it */
public IndexOutput removeOpenIndexOutputs(String name) {
ensureRefCount();
return openIndexOutputs.remove(name);
}
/**
* Creates an {@link org.apache.lucene.store.IndexOutput} for the given file name. Note that the
* IndexOutput actually point at a temporary file.
* <p>
* Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput
* at a later stage
*/
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
ensureRefCount();
String tempFileName = getTempNameForFile(fileName);
if (tempFileNames.containsKey(tempFileName)) {
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
}
// add first, before it's created
tempFileNames.put(tempFileName, fileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}
public void resetRecovery() throws IOException {
cleanOpenFiles();
indexShard().performRecoveryRestart();
}
@Override
protected void closeInternal() {
try {
cleanOpenFiles();
} finally {
// free store. increment happens in constructor
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
}
}
protected void cleanOpenFiles() {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Throwable t) {
logger.debug("error while closing recovery output [{}]", t, entry.getValue());
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
legacyChecksums.clear();
}
@Override
public String toString() {
return shardId + " [" + recoveryId + "]";
}
private void ensureRefCount() {
if (refCount() <= 0) {
throw new ElasticsearchException("RecoveryStatus is used but it's refcount is 0. Probably a mismatch between incRef/decRef " +
"calls");
}
}
/*** Implementation of {@link RecoveryTargetHandler } */
@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().skipTranslogRecovery();
}
@Override
public void finalizeRecovery() {
indexShard().finalizeRecovery();
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer
.BatchOperationException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
indexShard().performBatchRecovery(operations);
}
@Override
public void receiveFileInfo(List<String> phase1FileNames,
List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes,
int totalTranslogOps) {
final RecoveryState.Index index = state().getIndex();
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
}
for (int i = 0; i < phase1FileNames.size(); i++) {
index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
}
state().getTranslog().totalOperations(totalTranslogOps);
state().getTranslog().totalOperationsOnStart(totalTranslogOps);
}
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
indexShard().deleteShardState(); // we have to delete it first since even if we fail to rename the shard
// might be invalid
renameAllTempFiles();
final Store store = store();
// now write checksums
legacyChecksums().write(store);
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
try {
try {
store.removeCorruptionMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
}
} catch (Throwable e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
}
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} catch (Exception ex) {
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
}
}
@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException {
final Store store = store();
final String name = fileMetaData.name();
state().getTranslog().totalOperations(totalTranslogOps);
final RecoveryState.Index indexState = state().getIndex();
IndexOutput indexOutput;
if (position == 0) {
indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
} else {
indexOutput = getOpenIndexOutput(name);
}
if (content.hasArray() == false) {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
indexState.addRecoveredBytesToFile(name, content.length());
if (indexOutput.getFilePointer() >= fileMetaData.length() || lastChunk) {
try {
Store.verify(indexOutput);
} finally {
// we are done
indexOutput.close();
}
// write the checksum
legacyChecksums().add(fileMetaData);
final String temporaryFileName = getTempNameForFile(name);
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName);
store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.List;
public interface RecoveryTargetHandler {
/**
* Prepares the tranget to receive translog operations, after all file have been copied
*
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
/**
* The finalize request clears unreferenced translog files, refreshes the engine now that
* new segments are available, and enables garbage collection of
* tombstone files. The shard is also moved to the POST_RECOVERY phase during this time
**/
void finalizeRecovery();
/**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
*/
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps);
/**
* Notifies the target of the files it is going to receive
*/
void receiveFileInfo(List<String> phase1FileNames,
List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes,
int totalTranslogOps);
/**
* After all source files has been sent over, this command is sent to the target so it can clean any local
* files that are not part of the source store
* @param totalTranslogOps an update number of translog operations that will be replayed later on
* @param sourceMetaData meta data of the source store
*/
void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException;
/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps) throws IOException;
}

View File

@ -0,0 +1,470 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
* <p>
* Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and
* not several of them (since we don't allocate several shard replicas to the same node).
*/
public class RecoveryTargetService extends AbstractComponent implements IndexEventListener {
public static class Actions {
public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo";
public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk";
public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files";
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final RecoveriesCollection onGoingRecoveries;
@Inject
public RecoveryTargetService(Settings settings, ThreadPool threadPool, TransportService transportService, RecoverySettings
recoverySettings,
ClusterService clusterService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
}
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
}
}
/**
* cancel all ongoing recoveries for the given shard, if their status match a predicate
*
* @param reason reason for cancellation
* @param shardId shardId for which to cancel recoveries
* @param shouldCancel a predicate to check if a recovery should be cancelled or not. Null means cancel without an extra check.
* note that the recovery state can change after this check, but before it is being cancelled via other
* already issued outstanding references.
* @return true if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason, @Nullable Predicate<RecoveryTarget> shouldCancel) {
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason, shouldCancel);
}
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final
RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
protected void retryRecovery(final RecoveryTarget recoveryTarget, final Throwable reason, TimeValue retryAfter, final
StartRecoveryRequest currentRequest) {
logger.trace("will retry recovery with id [{}] in [{}]", reason, recoveryTarget.recoveryId(), retryAfter);
retryRecovery(recoveryTarget, retryAfter, currentRequest);
}
protected void retryRecovery(final RecoveryTarget recoveryTarget, final String reason, TimeValue retryAfter, final
StartRecoveryRequest currentRequest) {
logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryTarget.recoveryId(), retryAfter, reason);
retryRecovery(recoveryTarget, retryAfter, currentRequest);
}
private void retryRecovery(final RecoveryTarget recoveryTarget, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
try {
recoveryTarget.resetRecovery();
} catch (Throwable e) {
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(currentRequest, e), true);
}
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryTarget.recoveryId()));
}
private void doRecovery(final RecoveryTarget recoveryTarget) {
assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node";
logger.trace("collecting local files for {}", recoveryTarget);
Store.MetadataSnapshot metadataSnapshot = null;
try {
metadataSnapshot = recoveryTarget.store().getMetadataOrEmpty();
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} catch (Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while listing local files, failing recovery", e);
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(),
new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true);
return;
}
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(),
clusterService.localNode(),
metadataSnapshot, recoveryTarget.state().getType(), recoveryTarget.recoveryId());
final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().getIndex().getName(), request.shardId().id(), request
.sourceNode());
recoveryTarget.indexShard().prepareForIndexRecovery();
recoveryTarget.CancellableThreads().execute(() -> responseHolder.set(
transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request,
new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet()));
final RecoveryResponse recoveryResponse = responseHolder.get();
assert responseHolder != null;
final TimeValue recoveryTime = new TimeValue(recoveryTarget.state().getTimer().time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryTarget.recoveryId());
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id())
.append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with " +
"total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append
(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with " +
"total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log " +
"operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
logger.trace(sb.toString());
} else {
logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), recoveryTarget.sourceNode(), recoveryTime);
}
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().getIndex().getName(), request.shardId().id());
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
// this can also come from the source wrapped in a RemoteTransportException
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source has canceled the" +
" recovery", cause), false);
return;
}
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException || cause instanceof
ShardNotFoundException) {
// if the target is not ready yet, retry
retryRecovery(recoveryTarget, "remote shard not ready", recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryTarget, cause, recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", recoveryTarget.shardId(), recoverySettings
.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryTarget, cause.getMessage(), recoverySettings.retryDelayNetwork(), request);
return;
}
if (cause instanceof IndexShardClosedException) {
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source shard is " +
"closed", cause), false);
return;
}
if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source shard is " +
"closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, e), true);
}
}
public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().prepareForTranslogOperations(request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class FinalizeRecoveryRequestHandler implements TransportRequestHandler<RecoveryFinalizeRecoveryRequest> {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().finalizeRecovery();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws IOException {
try (RecoveriesCollection.RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.status();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class);
if (mapperException == null) {
throw exception;
}
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes. we want to wait until these mappings are processed.
logger.trace("delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", exception, exception
.completedOperations());
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel);
} catch (Exception e) {
onFailure(e);
}
}
protected void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
}
}
@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
}
@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout +
"])"));
}
});
}
}
}
}
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames,
request.phase1ExistingFileSizes, request.totalTranslogOps);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}
class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanFilesRequest> {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}
class FileChunkTransportRequestHandler implements TransportRequestHandler<RecoveryFileChunkRequest> {
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
final RecoveryTarget status = recoveryRef.status();
final RecoveryState.Index indexState = status.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}
RateLimiter rateLimiter = recoverySettings.rateLimiter();
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
long throttleTimeInNanos = rateLimiter.pause(bytes);
indexState.addTargetThrottling(throttleTimeInNanos);
status.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
status.writeFileChunk(request.metadata(), request.position(), request.content(),
request.lastChunk(), request.totalTranslogOps()
);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class RecoveryRunner extends AbstractRunnable {
final long recoveryId;
RecoveryRunner(long recoveryId) {
this.recoveryId = recoveryId;
}
@Override
public void onFailure(Throwable t) {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef != null) {
logger.error("unexpected error during recovery [{}], failing shard", t, recoveryId);
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(recoveryRef.status().state(), "unexpected error", t),
true // be safe
);
} else {
logger.debug("unexpected error during recovery, but recovery id [{}] is finished", t, recoveryId);
}
}
}
@Override
public void doRun() {
RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId);
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId);
return;
}
try {
doRecovery(recoveryRef.status());
} finally {
recoveryRef.close();
}
}
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final TransportService transportService;
private final long recoveryId;
private final ShardId shardId;
private final DiscoveryNode targetNode;
private final RecoverySettings recoverySettings;
private final TransportRequestOptions translogOpsRequestOptions;
private final TransportRequestOptions fileChunkRequestOptions;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final Consumer<Long> onSourceThrottle;
public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, DiscoveryNode targetNode,
RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
this.transportService = transportService;
this.recoveryId = recoveryId;
this.shardId = shardId;
this.targetNode = targetNode;
this.recoverySettings = recoverySettings;
this.onSourceThrottle = onSourceThrottle;
this.translogOpsRequestOptions = TransportRequestOptions.builder()
.withCompress(true)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so
// we are saving the cpu for other things
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();
}
@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void finalizeRecovery() {
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
recoveryId, shardId, operations, totalTranslogOps);
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.TRANSLOG_OPS, translogOperationsRequest,
translogOpsRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean
lastChunk, int totalTranslogOps) throws IOException {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
try {
throttleTimeInNanos = rl.pause(bytes);
onSourceThrottle.accept(throttleTimeInNanos);
} catch (IOException e) {
throw new ElasticsearchException("failed to pause recovery", e);
}
} else {
throttleTimeInNanos = 0;
}
} else {
throttleTimeInNanos = 0;
}
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk,
totalTranslogOps,
/* we send totalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -35,15 +34,16 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) {
super(shard, request, recoverySettings, transportService, logger);
public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, ESLogger
logger) {
super(shard, recoveryTarget, request, -1, logger);
this.shard = shard;
this.request = request;
}
@Override
public RecoveryResponse recoverToTarget() {
boolean engineClosed = false;
public RecoveryResponse recoverToTarget() throws IOException {
boolean engineClosed = false;
try {
logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
if (isPrimaryRelocation()) {
@ -83,5 +83,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
shard.shardId(), request.targetNode());
return 0;
}
}

View File

@ -18,10 +18,12 @@
*/
package org.elasticsearch.common.util;
import org.elasticsearch.common.util.CancellableThreads.IOInterruptable;
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CancellableThreadsTests extends ESTestCase {
@ -31,6 +33,13 @@ public class CancellableThreadsTests extends ESTestCase {
}
}
public static class IOCustomException extends IOException {
public IOCustomException(String msg) {
super(msg);
}
}
private class TestPlan {
public final int id;
public final boolean busySpin;
@ -38,6 +47,8 @@ public class CancellableThreadsTests extends ESTestCase {
public final boolean exitBeforeCancel;
public final boolean exceptAfterCancel;
public final boolean presetInterrupt;
public final boolean ioOp;
private final boolean ioException;
private TestPlan(int id) {
this.id = id;
@ -46,9 +57,77 @@ public class CancellableThreadsTests extends ESTestCase {
this.exitBeforeCancel = randomBoolean();
this.exceptAfterCancel = randomBoolean();
this.presetInterrupt = randomBoolean();
this.ioOp = randomBoolean();
this.ioException = ioOp && randomBoolean();
}
}
static class TestRunnable implements Interruptable {
final TestPlan plan;
final CountDownLatch readyForCancel;
TestRunnable(TestPlan plan, CountDownLatch readyForCancel) {
this.plan = plan;
this.readyForCancel = readyForCancel;
}
@Override
public void run() throws InterruptedException {
assertFalse("interrupt thread should have been clear", Thread.currentThread().isInterrupted());
if (plan.exceptBeforeCancel) {
throw new CustomException("thread [" + plan.id + "] pre-cancel exception");
} else if (plan.exitBeforeCancel) {
return;
}
readyForCancel.countDown();
try {
if (plan.busySpin) {
while (!Thread.currentThread().isInterrupted()) {
}
} else {
Thread.sleep(50000);
}
} finally {
if (plan.exceptAfterCancel) {
throw new CustomException("thread [" + plan.id + "] post-cancel exception");
}
}
}
}
static class TestIORunnable implements IOInterruptable {
final TestPlan plan;
final CountDownLatch readyForCancel;
TestIORunnable(TestPlan plan, CountDownLatch readyForCancel) {
this.plan = plan;
this.readyForCancel = readyForCancel;
}
@Override
public void run() throws IOException, InterruptedException {
assertFalse("interrupt thread should have been clear", Thread.currentThread().isInterrupted());
if (plan.exceptBeforeCancel) {
throw new IOCustomException("thread [" + plan.id + "] pre-cancel exception");
} else if (plan.exitBeforeCancel) {
return;
}
readyForCancel.countDown();
try {
if (plan.busySpin) {
while (!Thread.currentThread().isInterrupted()) {
}
} else {
Thread.sleep(50000);
}
} finally {
if (plan.exceptAfterCancel) {
throw new IOCustomException("thread [" + plan.id + "] post-cancel exception");
}
}
}
}
public void testCancellableThreads() throws InterruptedException {
Thread[] threads = new Thread[randomIntBetween(3, 10)];
@ -60,47 +139,28 @@ public class CancellableThreadsTests extends ESTestCase {
for (int i = 0; i < threads.length; i++) {
final TestPlan plan = new TestPlan(i);
plans[i] = plan;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
if (plan.presetInterrupt) {
Thread.currentThread().interrupt();
threads[i] = new Thread(() -> {
try {
if (plan.presetInterrupt) {
Thread.currentThread().interrupt();
}
if (plan.ioOp) {
if (plan.ioException) {
cancellableThreads.executeIO(new TestIORunnable(plan, readyForCancel));
} else {
cancellableThreads.executeIO(new TestRunnable(plan, readyForCancel));
}
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
assertFalse("interrupt thread should have been clear", Thread.currentThread().isInterrupted());
if (plan.exceptBeforeCancel) {
throw new CustomException("thread [" + plan.id + "] pre-cancel exception");
} else if (plan.exitBeforeCancel) {
return;
}
readyForCancel.countDown();
try {
if (plan.busySpin) {
while (!Thread.currentThread().isInterrupted()) {
}
} else {
Thread.sleep(50000);
}
} finally {
if (plan.exceptAfterCancel) {
throw new CustomException("thread [" + plan.id + "] post-cancel exception");
}
}
}
});
} catch (Throwable t) {
throwables[plan.id] = t;
} else {
cancellableThreads.execute(new TestRunnable(plan, readyForCancel));
}
if (plan.exceptBeforeCancel || plan.exitBeforeCancel) {
// we have to mark we're ready now (actually done).
readyForCancel.countDown();
}
interrupted[plan.id] = Thread.currentThread().isInterrupted();
} catch (Throwable t) {
throwables[plan.id] = t;
}
if (plan.exceptBeforeCancel || plan.exitBeforeCancel) {
// we have to mark we're ready now (actually done).
readyForCancel.countDown();
}
interrupted[plan.id] = Thread.currentThread().isInterrupted();
});
threads[i].setDaemon(true);
threads[i].start();
@ -114,8 +174,9 @@ public class CancellableThreadsTests extends ESTestCase {
}
for (int i = 0; i < threads.length; i++) {
TestPlan plan = plans[i];
final Class<?> exceptionClass = plan.ioException ? IOCustomException.class : CustomException.class;
if (plan.exceptBeforeCancel) {
assertThat(throwables[i], Matchers.instanceOf(CustomException.class));
assertThat(throwables[i], Matchers.instanceOf(exceptionClass));
} else if (plan.exitBeforeCancel) {
assertNull(throwables[i]);
} else {
@ -124,7 +185,7 @@ public class CancellableThreadsTests extends ESTestCase {
if (plan.exceptAfterCancel) {
assertThat(throwables[i].getSuppressed(),
Matchers.arrayContaining(
Matchers.instanceOf(CustomException.class)
Matchers.instanceOf(exceptionClass)
));
} else {
assertThat(throwables[i].getSuppressed(), Matchers.emptyArray());

View File

@ -41,7 +41,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
@ -485,7 +485,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
public void sendRequest(DiscoveryNode node, long requestId, String action,
TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (keepFailing.get() && action.equals(RecoveryTarget.Actions.TRANSLOG_OPS)) {
if (keepFailing.get() && action.equals(RecoveryTargetService.Actions.TRANSLOG_OPS)) {
logger.info("--> failing translog ops");
throw new ElasticsearchException("failing on purpose");
}

View File

@ -50,13 +50,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
@ -343,7 +343,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (corrupt.get() && action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
if (corrupt.get() && action.equals(RecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
byte[] array = req.content().array();
int i = randomIntBetween(0, req.content().length() - 1);
@ -415,7 +415,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
if (action.equals(RecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
if (truncate && req.length() > 1) {
BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int) req.length() - 1);

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
@ -568,12 +567,12 @@ public class IndexRecoveryIT extends ESIntegTestCase {
String[] recoveryActions = new String[]{
RecoverySource.Actions.START_RECOVERY,
RecoveryTarget.Actions.FILES_INFO,
RecoveryTarget.Actions.FILE_CHUNK,
RecoveryTarget.Actions.CLEAN_FILES,
RecoveryTargetService.Actions.FILES_INFO,
RecoveryTargetService.Actions.FILE_CHUNK,
RecoveryTargetService.Actions.CLEAN_FILES,
//RecoveryTarget.Actions.TRANSLOG_OPS, <-- may not be sent if already flushed
RecoveryTarget.Actions.PREPARE_TRANSLOG,
RecoveryTarget.Actions.FINALIZE
RecoveryTargetService.Actions.PREPARE_TRANSLOG,
RecoveryTargetService.Actions.FINALIZE
};
final String recoveryActionToBlock = randomFrom(recoveryActions);
final boolean dropRequests = randomBoolean();

View File

@ -39,7 +39,6 @@ import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
@ -71,7 +70,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
null, RecoveryState.Type.STORE, randomLong());
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(),
logger);
Directory dir = store.directory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
int numDocs = randomIntBetween(10, 100);
@ -122,7 +122,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) {
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
@ -185,7 +185,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) {
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());

View File

@ -41,7 +41,7 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
IndexShard indexShard = service.getShardOrNull(0);
DiscoveryNode node = new DiscoveryNode("foo", new LocalTransportAddress("bar"), Version.CURRENT);
RecoveryStatus status = new RecoveryStatus(indexShard, node, new RecoveryTarget.RecoveryListener() {
RecoveryTarget status = new RecoveryTarget(indexShard, node, new RecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
}

View File

@ -52,7 +52,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class RecoveryStateTests extends ESTestCase {
public class RecoveryTargetTests extends ESTestCase {
abstract class Streamer<T extends Streamable> extends Thread {
private T lastRead;
final private AtomicBoolean shouldStop;
@ -329,8 +329,10 @@ public class RecoveryStateTests extends ESTestCase {
assertThat((double) index.recoveredFilesPercent(), equalTo(100.0));
assertThat((double) index.recoveredBytesPercent(), equalTo(100.0));
} else {
assertThat((double) index.recoveredFilesPercent(), closeTo(100.0 * index.recoveredFileCount() / index.totalRecoverFiles(), 0.1));
assertThat((double) index.recoveredBytesPercent(), closeTo(100.0 * index.recoveredBytes() / index.totalRecoverBytes(), 0.1));
assertThat((double) index.recoveredFilesPercent(),
closeTo(100.0 * index.recoveredFileCount() / index.totalRecoverFiles(), 0.1));
assertThat((double) index.recoveredBytesPercent(),
closeTo(100.0 * index.recoveredBytes() / index.totalRecoverBytes(), 0.1));
}
}
@ -346,7 +348,8 @@ public class RecoveryStateTests extends ESTestCase {
stages[i] = stages[j];
stages[j] = t;
try {
RecoveryState state = new RecoveryState(new ShardId("bla", "_na_", 0), randomBoolean(), randomFrom(Type.values()), discoveryNode, discoveryNode);
RecoveryState state = new RecoveryState(
new ShardId("bla", "_na_", 0), randomBoolean(), randomFrom(Type.values()), discoveryNode, discoveryNode);
for (Stage stage : stages) {
state.setStage(stage);
}
@ -360,7 +363,8 @@ public class RecoveryStateTests extends ESTestCase {
i = randomIntBetween(1, stages.length - 1);
ArrayList<Stage> list = new ArrayList<>(Arrays.asList(Arrays.copyOfRange(stages, 0, i)));
list.addAll(Arrays.asList(stages));
RecoveryState state = new RecoveryState(new ShardId("bla", "_na_", 0), randomBoolean(), randomFrom(Type.values()), discoveryNode, discoveryNode);
RecoveryState state = new RecoveryState(new ShardId("bla", "_na_", 0), randomBoolean(), randomFrom(Type.values()), discoveryNode,
discoveryNode);
for (Stage stage : list) {
state.setStage(stage);
}
@ -532,7 +536,7 @@ public class RecoveryStateTests extends ESTestCase {
if (f.equals(anotherFile)) {
assertEquals(f.hashCode(), anotherFile.hashCode());
} else if (f.hashCode() != anotherFile.hashCode()) {
assertFalse(f.equals(anotherFile));
assertFalse(f.equals(anotherFile));
}
}
}

View File

@ -30,8 +30,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveriesCollection;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -45,7 +45,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
final static RecoveryTarget.RecoveryListener listener = new RecoveryTarget.RecoveryListener() {
final static RecoveryTargetService.RecoveryListener listener = new RecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
@ -61,12 +61,12 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
createIndex();
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final long recoveryId = startRecovery(collection);
try (RecoveriesCollection.StatusRef status = collection.getStatus(recoveryId)) {
try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) {
final long lastSeenTime = status.status().lastAccessTime();
assertBusy(new Runnable() {
@Override
public void run() {
try (RecoveriesCollection.StatusRef currentStatus = collection.getStatus(recoveryId)) {
try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) {
assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.status().lastAccessTime()));
}
}
@ -81,7 +81,7 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final long recoveryId = startRecovery(collection, new RecoveryTarget.RecoveryListener() {
final long recoveryId = startRecovery(collection, new RecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
latch.countDown();
@ -107,8 +107,8 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class));
final long recoveryId = startRecovery(collection);
final long recoveryId2 = startRecovery(collection);
try (RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId)) {
ShardId shardId = statusRef.status().shardId();
try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) {
ShardId shardId = recoveryRef.status().shardId();
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test"));
assertThat("all recoveries should be cancelled", collection.size(), equalTo(0));
} finally {
@ -124,19 +124,19 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
final long recoveryId2 = startRecovery(collection);
final ArrayList<AutoCloseable> toClose = new ArrayList<>();
try {
RecoveriesCollection.StatusRef statusRef = collection.getStatus(recoveryId);
toClose.add(statusRef);
ShardId shardId = statusRef.status().shardId();
RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId);
toClose.add(recoveryRef);
ShardId shardId = recoveryRef.status().shardId();
assertFalse("should not have cancelled recoveries", collection.cancelRecoveriesForShard(shardId, "test", status -> false));
final Predicate<RecoveryStatus> shouldCancel = status -> status.recoveryId() == recoveryId;
final Predicate<RecoveryTarget> shouldCancel = status -> status.recoveryId() == recoveryId;
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test", shouldCancel));
assertThat("we should still have on recovery", collection.size(), equalTo(1));
statusRef = collection.getStatus(recoveryId);
toClose.add(statusRef);
assertNull("recovery should have been deleted", statusRef);
statusRef = collection.getStatus(recoveryId2);
toClose.add(statusRef);
assertNotNull("recovery should NOT have been deleted", statusRef);
recoveryRef = collection.getRecovery(recoveryId);
toClose.add(recoveryRef);
assertNull("recovery should have been deleted", recoveryRef);
recoveryRef = collection.getRecovery(recoveryId2);
toClose.add(recoveryRef);
assertNotNull("recovery should NOT have been deleted", recoveryRef);
} finally {
// TODO: do we want a lucene IOUtils version of this?
@ -163,7 +163,7 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
return startRecovery(collection, listener, TimeValue.timeValueMinutes(60));
}
long startRecovery(RecoveriesCollection collection, RecoveryTarget.RecoveryListener listener, TimeValue timeValue) {
long startRecovery(RecoveriesCollection collection, RecoveryTargetService.RecoveryListener listener, TimeValue timeValue) {
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
IndexShard indexShard = indexServices.indexServiceSafe("test").getShardOrNull(0);
final DiscoveryNode sourceNode = new DiscoveryNode("id", DummyTransportAddress.INSTANCE, Version.CURRENT);

View File

@ -44,7 +44,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
@ -440,7 +440,7 @@ public class RelocationIT extends ESIntegTestCase {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
if (action.equals(RecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files

View File

@ -32,7 +32,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.IndexRecoveryIT;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
@ -121,7 +121,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
if (action.equals(RecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
logger.debug("file chunk [" + req.toString() + "] lastChunk: " + req.lastChunk());
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {