Recovery: refactor RecoveryTarget state management

This commit rewrites the state controls in the RecoveryTarget family classes to make it easier to guarantee that:
- recovery resources are only cleared once there are no ongoing requests
- recovery is automatically canceled when the target shard is closed/removed
- canceled recoveries do not leave temp files behind when canceled.

Highlights of the change:
1) All temporary files are cleared upon failure/cancel (see #7315 )
2) All newly created files are always temporary
3) Doesn't list local files on the cluster state update thread (which throw unwanted exception)
4) Recoveries are canceled by a listener to IndicesLifecycle.beforeIndexShardClosed, so we don't need to explicitly call it.
5) Simplifies RecoveryListener to only notify when a recovery is done or failed. Removed subtleties like ignore and retry (they are dealt with internally)

Closes #8092 , Closes #7315
This commit is contained in:
Boaz Leskes 2014-10-06 17:28:14 +02:00
parent 1557c34f2c
commit 24bc8d331e
9 changed files with 861 additions and 515 deletions

View File

@ -40,7 +40,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -144,19 +143,15 @@ public class TransportRecoveryAction extends
InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.shardId().getIndex());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId().id());
ShardRouting shardRouting = indexShard.routingEntry();
ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId());
RecoveryState state;
RecoveryStatus recoveryStatus = indexShard.recoveryStatus();
RecoveryState state = indexShard.recoveryState();
if (recoveryStatus == null) {
recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
if (state == null) {
state = recoveryTarget.recoveryState(indexShard);
}
if (recoveryStatus != null) {
state = recoveryStatus.recoveryState();
} else {
if (state == null) {
IndexShardGatewayService gatewayService =
indexService.shardInjector(request.shardId().id()).getInstance(IndexShardGatewayService.class);
state = gatewayService.recoveryState();
@ -183,7 +178,8 @@ public class TransportRecoveryAction extends
static class ShardRecoveryRequest extends BroadcastShardOperationRequest {
ShardRecoveryRequest() { }
ShardRecoveryRequest() {
}
ShardRecoveryRequest(ShardId shardId, RecoveryRequest request) {
super(shardId, request);

View File

@ -62,6 +62,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
this.shardGateway = shardGateway;
this.snapshotService = snapshotService;
this.recoveryState = new RecoveryState(shardId);
this.recoveryState.setType(RecoveryState.Type.GATEWAY);
this.clusterService = clusterService;
}

View File

@ -89,7 +89,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@ -146,7 +146,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;
private RecoveryStatus recoveryStatus;
@Nullable
private RecoveryState recoveryState;
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
@ -733,15 +734,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
/**
* The peer recovery status if this shard recovered from a peer shard.
* The peer recovery state if this shard recovered from a peer shard, null o.w.
*/
public RecoveryStatus recoveryStatus() {
return this.recoveryStatus;
public RecoveryState recoveryState() {
return this.recoveryState;
}
public void performRecoveryFinalization(boolean withFlush, RecoveryStatus recoveryStatus) throws ElasticsearchException {
public void performRecoveryFinalization(boolean withFlush, RecoveryState recoveryState) throws ElasticsearchException {
performRecoveryFinalization(withFlush);
this.recoveryStatus = recoveryStatus;
this.recoveryState = recoveryState;
}
public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {

View File

@ -61,9 +61,10 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.*;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap;
@ -559,19 +560,18 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
boolean shardHasBeenRemoved = false;
if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
recoveryTarget.cancelRecovery(indexShard);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
shardHasBeenRemoved = true;
} else if (isPeerRecovery(shardRouting)) {
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard);
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
if (!recoveryStatus.sourceNode().equals(sourceNode)) {
if (!recoveryState.getSourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
recoveryTarget.cancelRecovery(indexShard);
// closing the shard will also cancel any ongoing recovery.
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
shardHasBeenRemoved = true;
}
@ -728,17 +728,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
final Store store = indexShard.store();
final StartRecoveryRequest request;
store.incRef();
try {
store.failIfCorrupted();
request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, store.getMetadata().asMap(), type, recoveryIdGenerator.incrementAndGet());
} finally {
store.decRef();
}
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
indexShard.engine().failEngine("corrupted preexisting index", e);
@ -808,68 +798,41 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {
private final StartRecoveryRequest request;
private final ShardRouting shardRouting;
private final IndexService indexService;
private final IndexMetaData indexMetaData;
private PeerRecoveryListener(StartRecoveryRequest request, ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.request = request;
private PeerRecoveryListener(ShardRouting shardRouting, IndexService indexService, IndexMetaData indexMetaData) {
this.shardRouting = shardRouting;
this.indexService = indexService;
this.indexMetaData = indexMetaData;
}
@Override
public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + request.sourceNode() + "]");
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
}
@Override
public void onRetryRecovery(TimeValue retryAfter, RecoveryStatus recoveryStatus) {
recoveryTarget.retryRecovery(request, retryAfter, recoveryStatus, PeerRecoveryListener.this);
}
@Override
public void onIgnoreRecovery(boolean removeShard, String reason) {
if (!removeShard) {
return;
}
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard on ignored recovery, reason [{}]", shardRouting.index(), shardRouting.shardId().id(), reason);
}
try {
indexService.removeShard(shardRouting.shardId().id(), "ignore recovery: " + reason);
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
}
}
@Override
public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, sendShardFailure, e);
}
}
private void handleRecoveryFailure(IndexService indexService, IndexMetaData indexMetaData, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
logger.debug("[{}][{}] removing shard on failed recovery [{}]", shardRouting.index(), shardRouting.shardId().id(), failure.getMessage());
indexService.removeShard(shardRouting.shardId().id(), "recovery failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
logger.warn("[{}][{}] failed to delete shard after recovery failure", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
if (sendShardFailure) {
logger.warn("[{}][{}] sending failed shard after recovery failure", failure, indexService.index().name(), shardRouting.shardId().id());
try {
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexMetaData.getUUID(), "Failed to start shard, message [" + detailedMessage(failure) + "]");

View File

@ -0,0 +1,184 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
* of those recoveries). The class is used to guarantee concurrent semantics such that once a recoveries was done/cancelled/failed
* no other thread will be able to find it. Last, the {@link StatusRef} inner class verifies that recovery temporary files
* and store will only be cleared once on going usage is finished.
*/
public class RecoveriesCollection {
/** This is the single source of truth for ongoing recoveries. If it's not here, it was canceled or done */
private final ConcurrentMap<Long, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
final private ESLogger logger;
public RecoveriesCollection(ESLogger logger) {
this.logger = logger;
}
/**
* Starts are new recovery for the given shard, source node and state
*
* @return the id of the new recovery.
*/
public long startRecovery(InternalIndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
RecoveryStatus status = new RecoveryStatus(indexShard, sourceNode, state, listener);
RecoveryStatus existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
return status.recoveryId();
}
/**
* gets the {@link RecoveryStatus } for a given id. The RecoveryStatus returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link RecoveryStatus#decRef()} when you are done with it, typically
* by using this method in a try-with-resources clause.
* <p/>
* Returns null if recovery is not found
*/
public StatusRef getStatus(long id) {
RecoveryStatus status = onGoingRecoveries.get(id);
if (status != null && status.tryIncRef()) {
return new StatusRef(status);
}
return null;
}
/** Similar to {@link #getStatus(long)} but throws an exception if no recovery is found */
public StatusRef getStatusSafe(long id, ShardId shardId) {
StatusRef statusRef = getStatus(id);
if (statusRef == null) {
throw new IndexShardClosedException(shardId);
}
assert statusRef.status().shardId().equals(shardId);
return statusRef;
}
/** cancel the recovery with the given id (if found) and remove it from the recovery collection */
public void cancelRecovery(long id, String reason) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(), removed.sourceNode(), removed.recoveryId(), reason);
removed.cancel(reason);
}
}
/**
* fail the recovery with the given id (if found) and remove it from the recovery collection
*
* @param id id of the recovery to fail
* @param e exception with reason for the failure
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFailure) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} failing recovery from {}, id [{}]. Send shard failure: [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId(), sendShardFailure);
removed.fail(e, sendShardFailure);
}
}
/** mark the recovery with the given id as done (if found) */
public void markRecoveryAsDone(long id) {
RecoveryStatus removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId());
removed.markAsDone();
}
}
/**
* Try to find an ongoing recovery for a given shard. returns null if not found.
*/
@Nullable
public StatusRef findRecoveryByShard(IndexShard indexShard) {
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
if (recoveryStatus.indexShard() == indexShard) {
if (recoveryStatus.tryIncRef()) {
return new StatusRef(recoveryStatus);
} else {
return null;
}
}
}
return null;
}
/** cancel all ongoing recoveries for the given shard. typically because the shards is closed */
public void cancelRecoveriesForShard(ShardId shardId, String reason) {
for (RecoveryStatus status : onGoingRecoveries.values()) {
if (status.shardId().equals(shardId)) {
cancelRecovery(status.recoveryId(), reason);
}
}
}
/**
* a reference to {@link RecoveryStatus}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryStatus#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link RecoveriesCollection.StatusRef#close()} is called.
*/
public static class StatusRef implements AutoCloseable {
private final RecoveryStatus status;
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Important: {@link org.elasticsearch.indices.recovery.RecoveryStatus#tryIncRef()} should
* be *successfully* called on status before
*/
public StatusRef(RecoveryStatus status) {
this.status = status;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
status.decRef();
}
}
public RecoveryStatus status() {
return status;
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;
/**
@ -29,10 +30,22 @@ import org.elasticsearch.index.shard.ShardId;
public class RecoveryFailedException extends ElasticsearchException {
public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) {
this(request.shardId(), request.sourceNode(), request.targetNode(), cause);
this(request, null, cause);
}
public RecoveryFailedException(StartRecoveryRequest request, @Nullable String extraInfo, Throwable cause) {
this(request.shardId(), request.sourceNode(), request.targetNode(), extraInfo, cause);
}
public RecoveryFailedException(RecoveryState state, @Nullable String extraInfo, Throwable cause) {
this(state.getShardId(), state.getSourceNode(), state.getTargetNode(), extraInfo, cause);
}
public RecoveryFailedException(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Throwable cause) {
super(shardId + ": Recovery failed from " + sourceNode + " into " + targetNode, cause);
this(shardId, sourceNode, targetNode, null, cause);
}
public RecoveryFailedException(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, @Nullable String extraInfo, Throwable cause) {
super(shardId + ": Recovery failed from " + sourceNode + " into " + targetNode + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause);
}
}

View File

@ -19,106 +19,310 @@
package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class RecoveryStatus {
final ShardId shardId;
final long recoveryId;
final InternalIndexShard indexShard;
final RecoveryState recoveryState;
final DiscoveryNode sourceNode;
private final ESLogger logger;
public RecoveryStatus(long recoveryId, InternalIndexShard indexShard, DiscoveryNode sourceNode) {
this.recoveryId = recoveryId;
private final static AtomicLong idGenerator = new AtomicLong();
private final String RECOVERY_PREFIX = "recovery.";
private final ShardId shardId;
private final long recoveryId;
private final InternalIndexShard indexShard;
private final RecoveryState state;
private final DiscoveryNode sourceNode;
private final String tempFilePrefix;
private final Store store;
private final RecoveryTarget.RecoveryListener listener;
private AtomicReference<Thread> waitingRecoveryThread = new AtomicReference<>();
private final AtomicBoolean finished = new AtomicBoolean();
// we start with 1 which will be decremented on cancel/close/failure
private final AtomicInteger refCount = new AtomicInteger(1);
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
public RecoveryStatus(InternalIndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings(), indexShard.shardId());
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.recoveryState = new RecoveryState(shardId);
recoveryState.getTimer().startTime(System.currentTimeMillis());
this.state = state;
this.state.getTimer().startTime(System.currentTimeMillis());
this.tempFilePrefix = RECOVERY_PREFIX + this.state.getTimer().startTime() + ".";
this.store = indexShard.store();
// make sure the store is not released until we are done.
store.incRef();
}
volatile Thread recoveryThread;
private volatile boolean canceled;
volatile boolean sentCanceledToSource;
private final Set<String> tempFileNames = ConcurrentCollections.newConcurrentSet();
private volatile ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
public final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
public long recoveryId() {
return recoveryId;
}
public ShardId shardId() {
return shardId;
}
public InternalIndexShard indexShard() {
ensureNotFinished();
return indexShard;
}
public DiscoveryNode sourceNode() {
return this.sourceNode;
}
public RecoveryState recoveryState() {
return recoveryState;
public RecoveryState state() {
return state;
}
public Store store() {
ensureNotFinished();
return store;
}
/** set a thread that should be interrupted if the recovery is canceled */
public void setWaitingRecoveryThread(Thread thread) {
waitingRecoveryThread.set(thread);
}
/**
* clear the thread set by {@link #setWaitingRecoveryThread(Thread)}, making sure we
* do not override another thread.
*/
public void clearWaitingRecoveryThread(Thread threadToClear) {
waitingRecoveryThread.compareAndSet(threadToClear, null);
}
public void stage(RecoveryState.Stage stage) {
recoveryState.setStage(stage);
state.setStage(stage);
}
public RecoveryState.Stage stage() {
return recoveryState.getStage();
return state.getStage();
}
public boolean isCanceled() {
return canceled;
public Store.LegacyChecksums legacyChecksums() {
return legacyChecksums;
}
public synchronized void cancel() {
canceled = true;
/** renames all temporary files to their true name, potentially overriding existing files */
public void renameAllTempFiles() throws IOException {
ensureNotFinished();
Iterator<String> tempFileIterator = tempFileNames.iterator();
final Directory directory = store.directory();
while (tempFileIterator.hasNext()) {
String tempFile = tempFileIterator.next();
String origFile = originalNameForTempFile(tempFile);
// first, go and delete the existing ones
try {
directory.deleteFile(origFile);
} catch (NoSuchFileException e) {
} catch (Throwable ex) {
logger.debug("failed to delete file [{}]", ex, origFile);
}
// now, rename the files... and fail it it won't work
store.renameFile(tempFile, origFile);
// upon success, remove the temp file
tempFileIterator.remove();
}
}
/** cancel the recovery. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call
* {@link #decRef()}
*
* if {@link #setWaitingRecoveryThread(Thread)} was used, the thread will be interrupted.
*/
public void cancel(String reason) {
if (finished.compareAndSet(false, true)) {
logger.debug("recovery canceled (reason: [{}])", reason);
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
final Thread thread = waitingRecoveryThread.get();
if (thread != null) {
thread.interrupt();
}
}
}
/**
* fail the recovery and call listener
*
* @param e exception that encapsulating the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
**/
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
listener.onRecoveryFailure(state, e, sendShardFailure);
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
}
}
/** mark the current recovery as done */
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
assert tempFileNames.isEmpty() : "not all temporary files are renamed";
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
listener.onRecoveryDone(state);
}
}
private String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}
/** return true if the give file is a temporary file name issued by this recovery */
private boolean isTempFile(String filename) {
return tempFileNames.contains(filename);
}
public IndexOutput getOpenIndexOutput(String key) {
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
if (canceled || outputs == null) {
return null;
}
return outputs.get(key);
ensureNotFinished();
return openIndexOutputs.get(key);
}
public synchronized Set<Entry<String, IndexOutput>> cancelAndClearOpenIndexInputs() {
cancel();
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
openIndexOutputs = null;
if (outputs == null) {
return null;
/** returns the original file name for a temporary file name issued by this recovery */
private String originalNameForTempFile(String tempFile) {
if (!isTempFile(tempFile)) {
throw new ElasticsearchException("[" + tempFile + "] is not a temporary file made by this recovery");
}
Set<Entry<String, IndexOutput>> entrySet = outputs.entrySet();
return entrySet;
return tempFile.substring(tempFilePrefix.length());
}
/** remove and {@link org.apache.lucene.store.IndexOutput} for a given file. It is the caller's responsibility to close it */
public IndexOutput removeOpenIndexOutputs(String name) {
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
if (outputs == null) {
return null;
}
return outputs.remove(name);
ensureNotFinished();
return openIndexOutputs.remove(name);
}
public synchronized IndexOutput openAndPutIndexOutput(String key, String fileName, StoreFileMetaData metaData, Store store) throws IOException {
if (isCanceled()) {
return null;
}
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
IndexOutput indexOutput = store.createVerifyingOutput(fileName, IOContext.DEFAULT, metaData);
outputs.put(key, indexOutput);
/**
* Creates an {@link org.apache.lucene.store.IndexOutput} for the given file name. Note that the
* IndexOutput actually point at a temporary file.
* <p/>
* Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput
* at a later stage
*/
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
ensureNotFinished();
String tempFileName = getTempNameForFile(fileName);
// add first, before it's created
tempFileNames.add(tempFileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, IOContext.DEFAULT, metaData);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}
/**
* Tries to increment the refCount of this RecoveryStatus instance. This method will return <tt>true</tt> iff the refCount was
* incremented successfully otherwise <tt>false</tt>. Be sure to always call a corresponding {@link #decRef}, in a finally clause;
*
* @see #decRef()
*/
public final boolean tryIncRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i + 1)) {
return true;
}
} else {
return false;
}
} while (true);
}
/**
* Decreases the refCount of this Store instance.If the refCount drops to 0, the recovery process this status represents
* is seen as done and resources and temporary files are deleted.
*
* @see #tryIncRef
*/
public final void decRef() {
int i = refCount.decrementAndGet();
assert i >= 0;
if (i == 0) {
closeInternal();
}
}
private void closeInternal() {
try {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
IOUtils.closeWhileHandlingException(entry.getValue());
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
legacyChecksums.clear();
} finally {
// free store. increment happens in constructor
store.decRef();
}
}
@Override
public String toString() {
return shardId + " [" + recoveryId + "]";
}
private void ensureNotFinished() {
if (finished.get()) {
throw new ElasticsearchException("RecoveryStatus is used after it was finished. Probably a mismatch between incRef/decRef calls");
}
}
}

View File

@ -19,12 +19,13 @@
package org.elasticsearch.indices.recovery;
import com.google.common.collect.Sets;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesReference;
@ -33,26 +34,25 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -77,20 +77,20 @@ public class RecoveryTarget extends AbstractComponent {
private final TransportService transportService;
private final IndicesService indicesService;
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final ConcurrentMapLong<RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMapLong();
private final RecoveriesCollection onGoingRecoveries;
@Inject
public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
IndicesLifecycle indicesLifecycle, RecoverySettings recoverySettings) {
public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService,
IndicesLifecycle indicesLifecycle, RecoverySettings recoverySettings, ClusterService clusterService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger);
transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler());
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
@ -103,261 +103,154 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
if (indexShard != null) {
removeAndCleanOnGoingRecovery(findRecoveryByShard(indexShard));
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
}
}
});
}
public RecoveryStatus recoveryStatus(IndexShard indexShard) {
RecoveryStatus recoveryStatus = findRecoveryByShard(indexShard);
if (recoveryStatus == null) {
return null;
public RecoveryState recoveryState(IndexShard indexShard) {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.findRecoveryByShard(indexShard)) {
if (statusRef == null) {
return null;
}
final RecoveryStatus recoveryStatus = statusRef.status();
if (recoveryStatus.state().getTimer().startTime() > 0 && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
recoveryStatus.state().getTimer().time(System.currentTimeMillis() - recoveryStatus.state().getTimer().startTime());
}
return recoveryStatus.state();
} catch (Exception e) {
// shouldn't really happen, but have to be here due to auto close
throw new ElasticsearchException("error while getting recovery state", e);
}
if (recoveryStatus.recoveryState().getTimer().startTime() > 0 && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
recoveryStatus.recoveryState().getTimer().time(System.currentTimeMillis() - recoveryStatus.recoveryState().getTimer().startTime());
}
return recoveryStatus;
}
public void cancelRecovery(IndexShard indexShard) {
RecoveryStatus recoveryStatus = findRecoveryByShard(indexShard);
// it might be if the recovery source got canceled first
if (recoveryStatus == null) {
return;
}
if (recoveryStatus.sentCanceledToSource) {
return;
}
recoveryStatus.cancel();
public void startRecovery(final InternalIndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {
try {
if (recoveryStatus.recoveryThread != null) {
recoveryStatus.recoveryThread.interrupt();
}
// give it a grace period of actually getting the sent ack part
final long sleepTime = 100;
final long maxSleepTime = 10000;
long rounds = Math.round(maxSleepTime / sleepTime);
while (!recoveryStatus.sentCanceledToSource &&
transportService.nodeConnected(recoveryStatus.sourceNode) &&
rounds > 0) {
rounds--;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break; // interrupted - step out!
}
}
} finally {
removeAndCleanOnGoingRecovery(recoveryStatus);
}
}
public void startRecovery(final StartRecoveryRequest request, final InternalIndexShard indexShard, final RecoveryListener listener) {
try {
indexShard.recovering("from " + request.sourceNode());
indexShard.recovering("from " + sourceNode);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());
return;
}
// create a new recovery status, and process...
final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard, request.sourceNode());
recoveryStatus.recoveryState.setType(request.recoveryType());
recoveryStatus.recoveryState.setSourceNode(request.sourceNode());
recoveryStatus.recoveryState.setTargetNode(request.targetNode());
recoveryStatus.recoveryState.setPrimary(indexShard.routingEntry().primary());
onGoingRecoveries.put(recoveryStatus.recoveryId, recoveryStatus);
RecoveryState recoveryState = new RecoveryState(indexShard.shardId());
recoveryState.setType(recoveryType);
recoveryState.setSourceNode(sourceNode);
recoveryState.setTargetNode(clusterService.localNode());
recoveryState.setPrimary(indexShard.routingEntry().primary());
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener);
threadPool.generic().execute(new RecoveryRunner(recoveryId));
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
doRecovery(request, recoveryStatus, listener);
}
});
}
public void retryRecovery(final StartRecoveryRequest request, TimeValue retryAfter, final RecoveryStatus status, final RecoveryListener listener) {
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
doRecovery(request, status, listener);
}
});
protected void retryRecovery(final long recoveryId, TimeValue retryAfter) {
logger.trace("will retrying recovery with id [{}] in [{}]", recoveryId, retryAfter);
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId));
}
private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
assert request.sourceNode() != null : "can't do a recovery without a source node";
final InternalIndexShard shard = recoveryStatus.indexShard;
if (shard == null) {
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
return;
}
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
if (recoveryStatus.isCanceled()) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
private void doRecovery(final RecoveryStatus recoveryStatus) {
assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node";
logger.trace("collecting local files for {}", recoveryStatus);
final Map<String, StoreFileMetaData> existingFiles;
try {
existingFiles = recoveryStatus.store().getMetadata().asMap();
} catch (Exception e) {
logger.debug("error while listing local files, recovery as if there are none", e);
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(),
new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true);
return;
}
StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
recoveryStatus.recoveryThread = Thread.currentThread();
if (shard.store().tryIncRef()) {
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
StopWatch stopWatch = new StopWatch().start();
RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet();
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
stopWatch.stop();
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("{} recovery completed from [{}], took [{}]", request.shardId(), request.sourceNode(), stopWatch.totalTime());
}
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onRecoveryDone();
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
}
if (recoveryStatus.isCanceled()) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
return;
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
StopWatch stopWatch = new StopWatch().start();
recoveryStatus.setWaitingRecoveryThread(Thread.currentThread());
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return;
RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
if (cause instanceof DelayRecoveryException) {
listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return;
}
// here, we check against ignore recovery options
// in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later
// it will get deleted in the IndicesStore if all are allocated and no shard exists on this node...
removeAndCleanOnGoingRecovery(recoveryStatus);
if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
return;
}
if (cause instanceof IndexShardClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}
if (cause instanceof AlreadyClosedException) {
listener.onIgnoreRecovery(true, "source shard is closed (" + request.sourceNode() + ")");
return;
}
logger.warn("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
} finally {
shard.store().decRef();
}).txGet();
recoveryStatus.clearWaitingRecoveryThread(Thread.currentThread());
stopWatch.stop();
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("{} recovery completed from [{}], took [{}]", request.shardId(), request.sourceNode(), stopWatch.totalTime());
}
} else {
listener.onIgnoreRecovery(false, "local store closed, stop recovery");
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryStatus.recoveryId());
} catch (Throwable e) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// do it twice, in case we have double transport exception
cause = ExceptionsHelper.unwrapCause(cause);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
// here, we would add checks against exception that need to be retried (and not removeAndClean in this case)
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry
retryRecovery(recoveryStatus.recoveryId(), TimeValue.timeValueMillis(500));
return;
}
if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryStatus.recoveryId(), TimeValue.timeValueMillis(500));
return;
}
if (cause instanceof ConnectTransportException) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source node disconnected", cause), false);
return;
}
if (cause instanceof IndexShardClosedException) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false);
return;
}
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, e), true);
}
}
public static interface RecoveryListener {
void onRecoveryDone();
void onRecoveryDone(RecoveryState state);
void onRetryRecovery(TimeValue retryAfter, RecoveryStatus status);
void onIgnoreRecovery(boolean removeShard, String reason);
void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure);
}
@Nullable
private RecoveryStatus findRecoveryByShard(IndexShard indexShard) {
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
if (recoveryStatus.indexShard == indexShard) {
return recoveryStatus;
}
}
return null;
}
private void removeAndCleanOnGoingRecovery(@Nullable RecoveryStatus status) {
if (status == null) {
return;
}
// clean it from the on going recoveries since it is being closed
status = onGoingRecoveries.remove(status.recoveryId);
if (status == null) {
return;
}
// just mark it as canceled as well, just in case there are in flight requests
// coming from the recovery target
status.cancel();
// clean open index outputs
Set<Entry<String, IndexOutput>> entrySet = status.cancelAndClearOpenIndexInputs();
Iterator<Entry<String, IndexOutput>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
synchronized (entry.getValue()) {
IOUtils.closeWhileHandlingException(entry.getValue());
}
iterator.remove();
}
status.legacyChecksums.clear();
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}
class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@ -374,12 +267,12 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
validateRecoveryStatus(onGoingRecovery, request.shardId());
onGoingRecovery.indexShard.performRecoveryPrepareForTranslog();
onGoingRecovery.stage(RecoveryState.Stage.TRANSLOG);
onGoingRecovery.recoveryState.getStart().checkIndexTime(onGoingRecovery.indexShard.checkIndexTook());
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.indexShard().performRecoveryPrepareForTranslog();
recoveryStatus.stage(RecoveryState.Stage.TRANSLOG);
recoveryStatus.state().getStart().checkIndexTime(recoveryStatus.indexShard().checkIndexTook());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -398,13 +291,12 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
validateRecoveryStatus(onGoingRecovery, request.shardId());
onGoingRecovery.stage(RecoveryState.Stage.FINALIZE);
onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery);
onGoingRecovery.recoveryState().getTimer().time(System.currentTimeMillis() - onGoingRecovery.recoveryState().getTimer().startTime());
onGoingRecovery.stage(RecoveryState.Stage.DONE);
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.indexShard().performRecoveryFinalization(false, recoveryStatus.state());
recoveryStatus.state().getTimer().time(System.currentTimeMillis() - recoveryStatus.state().getTimer().startTime());
recoveryStatus.stage(RecoveryState.Stage.DONE);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -424,16 +316,15 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
validateRecoveryStatus(onGoingRecovery, request.shardId());
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
validateRecoveryStatus(onGoingRecovery, request.shardId());
shard.performRecoveryOperation(operation);
onGoingRecovery.recoveryState.getTranslog().incrementTranslogOperations();
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
for (Translog.Operation operation : request.operations()) {
recoveryStatus.indexShard().performRecoveryOperation(operation);
recoveryStatus.state().getTranslog().incrementTranslogOperations();
}
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -451,18 +342,19 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
validateRecoveryStatus(onGoingRecovery, request.shardId());
final RecoveryState.Index index = onGoingRecovery.recoveryState().getIndex();
index.addFileDetails(request.phase1FileNames, request.phase1FileSizes);
index.addReusedFileDetails(request.phase1ExistingFileNames, request.phase1ExistingFileSizes);
index.totalByteCount(request.phase1TotalSize);
index.totalFileCount(request.phase1FileNames.size() + request.phase1ExistingFileNames.size());
index.reusedByteCount(request.phase1ExistingTotalSize);
index.reusedFileCount(request.phase1ExistingFileNames.size());
// recoveryBytesCount / recoveryFileCount will be set as we go...
onGoingRecovery.stage(RecoveryState.Stage.INDEX);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Index index = recoveryStatus.state().getIndex();
index.addFileDetails(request.phase1FileNames, request.phase1FileSizes);
index.addReusedFileDetails(request.phase1ExistingFileNames, request.phase1ExistingFileSizes);
index.totalByteCount(request.phase1TotalSize);
index.totalFileCount(request.phase1FileNames.size() + request.phase1ExistingFileNames.size());
index.reusedByteCount(request.phase1ExistingTotalSize);
index.reusedFileCount(request.phase1ExistingFileNames.size());
// recoveryBytesCount / recoveryFileCount will be set as we go...
recoveryStatus.stage(RecoveryState.Stage.INDEX);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}
@ -480,40 +372,15 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
validateRecoveryStatus(onGoingRecovery, request.shardId());
final Store store = onGoingRecovery.indexShard.store();
store.incRef();
try {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
String prefix = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + ".";
Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : store.directory().listAll()) {
if (existingFile.startsWith(prefix)) {
filesToRename.add(existingFile.substring(prefix.length(), existingFile.length()));
}
}
Exception failureToRename = null;
if (!filesToRename.isEmpty()) {
// first, go and delete the existing ones
final Directory directory = store.directory();
for (String file : filesToRename) {
try {
directory.deleteFile(file);
} catch (Throwable ex) {
logger.debug("failed to delete file [{}]", ex, file);
}
}
for (String fileToRename : filesToRename) {
// now, rename the files... and fail it it won't work
store.renameFile(prefix + fileToRename, fileToRename);
}
}
recoveryStatus.renameAllTempFiles();
final Store store = recoveryStatus.store();
// now write checksums
onGoingRecovery.legacyChecksums.write(store);
recoveryStatus.legacyChecksums().write(store);
for (String existingFile : store.directory().listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
@ -526,8 +393,6 @@ public class RecoveryTarget extends AbstractComponent {
}
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} finally {
store.decRef();
}
}
}
@ -546,103 +411,85 @@ public class RecoveryTarget extends AbstractComponent {
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
validateRecoveryStatus(onGoingRecovery, request.shardId());
final Store store = onGoingRecovery.indexShard.store();
store.incRef();
try {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
final Store store = recoveryStatus.store();
IndexOutput indexOutput;
if (request.position() == 0) {
// first request
onGoingRecovery.legacyChecksums.remove(request.name());
indexOutput = onGoingRecovery.removeOpenIndexOutputs(request.name());
IOUtils.closeWhileHandlingException(indexOutput);
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
// also, we check if the file already exists, if it does, we create a file name based
// on the current recovery "id" and later we make the switch, the reason for that is that
// we only want to overwrite the index files once we copied all over, and not create a
// case where the index is half moved
String fileName = request.name();
if (store.directory().fileExists(fileName)) {
fileName = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + "." + fileName;
}
indexOutput = onGoingRecovery.openAndPutIndexOutput(request.name(), fileName, request.metadata(), store);
indexOutput = recoveryStatus.openAndPutIndexOutput(request.name(), request.metadata(), store);
} else {
indexOutput = onGoingRecovery.getOpenIndexOutput(request.name());
indexOutput = recoveryStatus.getOpenIndexOutput(request.name());
}
if (indexOutput == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(request.content().length());
}
boolean success = false;
synchronized (indexOutput) {
BytesReference content = request.content();
if (!content.hasArray()) {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
recoveryStatus.state().getIndex().addRecoveredByteCount(content.length());
RecoveryState.File file = recoveryStatus.state().getIndex().file(request.name());
if (file != null) {
file.updateRecovered(request.length());
}
if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
try {
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(request.content().length());
}
BytesReference content = request.content();
if (!content.hasArray()) {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
onGoingRecovery.recoveryState.getIndex().addRecoveredByteCount(content.length());
RecoveryState.File file = onGoingRecovery.recoveryState.getIndex().file(request.name());
if (file != null) {
file.updateRecovered(request.length());
}
if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
Store.verify(indexOutput);
// we are done
indexOutput.close();
// write the checksum
onGoingRecovery.legacyChecksums.add(request.metadata());
store.directory().sync(Collections.singleton(request.name()));
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
onGoingRecovery.recoveryState.getIndex().addRecoveredFileCount(1);
assert remove == null || remove == indexOutput; // remove maybe null if we got canceled
}
success = true;
Store.verify(indexOutput);
} finally {
if (!success || onGoingRecovery.isCanceled()) {
try {
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
assert remove == null || remove == indexOutput;
IOUtils.closeWhileHandlingException(indexOutput);
} finally {
// trash the file - unsuccessful
store.deleteQuiet(request.name(), "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + "." + request.name());
}
}
// we are done
indexOutput.close();
}
// write the checksum
recoveryStatus.legacyChecksums().add(request.metadata());
store.directory().sync(Collections.singleton(request.name()));
IndexOutput remove = recoveryStatus.removeOpenIndexOutputs(request.name());
recoveryStatus.state().getIndex().addRecoveredFileCount(1);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class RecoveryRunner extends AbstractRunnable {
final long recoveryId;
RecoveryRunner(long recoveryId) {
this.recoveryId = recoveryId;
}
@Override
public void onFailure(Throwable t) {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatus(recoveryId)) {
if (statusRef == null) {
logger.error("unexpected error during recovery [{}], failing shard", t, recoveryId);
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(statusRef.status().state(), "unexpected error", t),
true // be safe
);
} else {
logger.debug("unexpected error during recovery, but recovery id [{}] is finished", t, recoveryId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@Override
public void doRun() {
RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatus(recoveryId);
if (statusRef == null) {
logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId);
return;
}
try {
doRecovery(statusRef.status());
} finally {
store.decRef();
// make sure we never interrupt the thread after we have released it back to the pool
statusRef.status().clearWaitingRecoveryThread(Thread.currentThread());
statusRef.close();
}
}
}
private void validateRecoveryStatus(RecoveryStatus onGoingRecovery, ShardId shardId) {
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.indexShard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(onGoingRecovery);
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}
}
}

View File

@ -23,7 +23,9 @@ import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
@ -33,45 +35,70 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@TestLogging("indices.recovery:TRACE")
public class RelocationTests extends ElasticsearchIntegrationTest {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build();
}
@Test
public void testSimpleRelocationNoIndexing() {
@ -417,4 +444,114 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
}
@Test
@Slow
@TestLogging("indices.recovery:TRACE")
public void testCancellationCleansTempFiles() throws Exception {
final String indexName = "test";
final String p_node = internalCluster().startNode();
client().admin().indices().prepareCreate(indexName)
.setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get();
internalCluster().startNodesAsync(2).get();
List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut());
flush();
int allowedFailures = randomIntBetween(3, 10);
logger.info("--> blocking recoveries from primary (allowed failures: [{}])", allowedFailures);
CountDownLatch corruptionCount = new CountDownLatch(allowedFailures);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, p_node);
MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node);
for (DiscoveryNode node : clusterService.state().nodes()) {
if (!node.equals(clusterService.localNode())) {
mockTransportService.addDelegate(node, new RecoveryCorruption(mockTransportService.original(), corruptionCount));
}
}
client().admin().indices().prepareUpdateSettings(indexName).setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)).get();
corruptionCount.await();
logger.info("--> stopping replica assignment");
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")));
logger.info("--> wait for all replica shards to be removed, on all nodes");
assertBusy(new Runnable() {
@Override
public void run() {
for (String node : internalCluster().getNodeNames()) {
if (node.equals(p_node)) {
continue;
}
ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
assertThat(node + " indicates assigned replicas",
state.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
}
}
});
logger.info("--> verifying no temporary recoveries are left");
for (String node : internalCluster().getNodeNames()) {
NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, node);
for (final File shardLoc : nodeEnvironment.shardLocations(new ShardId(indexName, 0))) {
assertBusy(new Runnable() {
@Override
public void run() {
try {
Files.walkFileTree(shardLoc.toPath(), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), not(startsWith("recovery.")));
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
throw new ElasticsearchException("failed to walk tree", e);
}
}
});
}
}
}
class RecoveryCorruption extends MockTransportService.DelegateTransport {
private final CountDownLatch corruptionCount;
public RecoveryCorruption(Transport transport, CountDownLatch corruptionCount) {
super(transport);
this.corruptionCount = corruptionCount;
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
// if (action.equals(RecoveryTarget.Actions.PREPARE_TRANSLOG)) {
// logger.debug("dropped [{}] to {}", action, node);
//} else
if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files
logger.debug("corrupting [{}] to {}. file name: [{}]", action, node, chunkRequest.name());
byte[] array = chunkRequest.content().array();
array[0] = (byte) ~array[0]; // flip one byte in the content
corruptionCount.countDown();
}
transport.sendRequest(node, requestId, action, request, options);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
}
}
}