create a unique recovery id when recovering from a peer shard instead of using shard id

this allows to handle better cases where we need to cancel an existing recovery
This commit is contained in:
Shay Banon 2012-08-06 17:09:00 +02:00
parent d2ac219f01
commit 346fb9ed83
11 changed files with 240 additions and 169 deletions

View File

@ -57,6 +57,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -495,27 +496,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (indexService.hasShard(shardId)) { if (indexService.hasShard(shardId)) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId); InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId);
if (!shardRouting.equals(indexShard.routingEntry())) { ShardRouting currentRoutingEntry = indexShard.routingEntry();
ShardRouting currentRoutingEntry = indexShard.routingEntry(); // if the current and global routing are initializing, but are still not the same, its a different "shard" being allocated
boolean needToDeleteCurrentShard = false; // for example: a shard that recovers from one node and now needs to recover to another node,
if (currentRoutingEntry.initializing() && shardRouting.initializing()) { // or a replica allocated and then allocating a primary because the primary failed on another node
// both are initializing, see if they are different instanceof of the shard routing, so they got switched on us if (currentRoutingEntry.initializing() && shardRouting.initializing() && !currentRoutingEntry.equals(shardRouting)) {
if (currentRoutingEntry.primary() && !shardRouting.primary()) { logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
needToDeleteCurrentShard = true; // cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
} recoveryTarget.cancelRecovery(indexShard);
// recovering from different nodes..., restart recovery indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
if (currentRoutingEntry.relocatingNodeId() != null && shardRouting.relocatingNodeId() != null &&
!currentRoutingEntry.relocatingNodeId().equals(shardRouting.relocatingNodeId())) {
needToDeleteCurrentShard = true;
}
}
if (needToDeleteCurrentShard) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] removing shard (different instance of it allocated on this node)", shardRouting.index(), shardRouting.id());
}
recoveryTarget.cancelRecovery(shardRouting.shardId());
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
}
} }
} }
@ -607,7 +596,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try { try {
// we are recovering a backup from a primary, so no need to mark it as relocated // we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list()); final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService)); recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) { } catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e); handleRecoveryFailure(indexService, shardRouting, true, e);
break; break;
@ -643,7 +632,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we don't mark this one as relocated at the end, requests in any case are routed to both when its relocating // we don't mark this one as relocated at the end, requests in any case are routed to both when its relocating
// and that way we handle the edge case where its mark as relocated, and we might need to roll it back... // and that way we handle the edge case where its mark as relocated, and we might need to roll it back...
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list()); final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService)); recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) { } catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e); handleRecoveryFailure(indexService, shardRouting, true, e);
} }
@ -671,13 +660,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} }
@Override @Override
public void onRetryRecovery(TimeValue retryAfter) { public void onRetryRecovery(TimeValue retryAfter, RecoveryStatus recoveryStatus) {
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new Runnable() { recoveryTarget.retryRecovery(request, recoveryStatus, PeerRecoveryListener.this);
@Override
public void run() {
recoveryTarget.startRecovery(request, true, PeerRecoveryListener.this);
}
});
} }
@Override @Override

View File

@ -33,6 +33,7 @@ import java.util.Set;
*/ */
class RecoveryCleanFilesRequest implements Streamable { class RecoveryCleanFilesRequest implements Streamable {
private long recoveryId;
private ShardId shardId; private ShardId shardId;
private Set<String> snapshotFiles; private Set<String> snapshotFiles;
@ -40,11 +41,16 @@ class RecoveryCleanFilesRequest implements Streamable {
RecoveryCleanFilesRequest() { RecoveryCleanFilesRequest() {
} }
RecoveryCleanFilesRequest(ShardId shardId, Set<String> snapshotFiles) { RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Set<String> snapshotFiles) {
this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
this.snapshotFiles = snapshotFiles; this.snapshotFiles = snapshotFiles;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
@ -55,6 +61,7 @@ class RecoveryCleanFilesRequest implements Streamable {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
int size = in.readVInt(); int size = in.readVInt();
snapshotFiles = Sets.newHashSetWithExpectedSize(size); snapshotFiles = Sets.newHashSetWithExpectedSize(size);
@ -65,6 +72,7 @@ class RecoveryCleanFilesRequest implements Streamable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
out.writeVInt(snapshotFiles.size()); out.writeVInt(snapshotFiles.size());
for (String snapshotFile : snapshotFiles) { for (String snapshotFile : snapshotFiles) {

View File

@ -34,6 +34,7 @@ import java.io.IOException;
*/ */
class RecoveryFileChunkRequest implements Streamable { class RecoveryFileChunkRequest implements Streamable {
private long recoveryId;
private ShardId shardId; private ShardId shardId;
private String name; private String name;
private long position; private long position;
@ -44,7 +45,8 @@ class RecoveryFileChunkRequest implements Streamable {
RecoveryFileChunkRequest() { RecoveryFileChunkRequest() {
} }
RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, String checksum, BytesArray content) { RecoveryFileChunkRequest(long recoveryId, ShardId shardId, String name, long position, long length, String checksum, BytesArray content) {
this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
this.name = name; this.name = name;
this.position = position; this.position = position;
@ -53,6 +55,10 @@ class RecoveryFileChunkRequest implements Streamable {
this.content = content; this.content = content;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
@ -86,6 +92,7 @@ class RecoveryFileChunkRequest implements Streamable {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
name = in.readUTF(); name = in.readUTF();
position = in.readVLong(); position = in.readVLong();
@ -98,6 +105,7 @@ class RecoveryFileChunkRequest implements Streamable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
out.writeUTF(name); out.writeUTF(name);
out.writeVLong(position); out.writeVLong(position);

View File

@ -33,7 +33,8 @@ import java.util.List;
*/ */
class RecoveryFilesInfoRequest implements Streamable { class RecoveryFilesInfoRequest implements Streamable {
ShardId shardId; private long recoveryId;
private ShardId shardId;
List<String> phase1FileNames; List<String> phase1FileNames;
List<Long> phase1FileSizes; List<Long> phase1FileSizes;
@ -45,7 +46,8 @@ class RecoveryFilesInfoRequest implements Streamable {
RecoveryFilesInfoRequest() { RecoveryFilesInfoRequest() {
} }
RecoveryFilesInfoRequest(ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) { RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) {
this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
this.phase1FileNames = phase1FileNames; this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes; this.phase1FileSizes = phase1FileSizes;
@ -55,8 +57,17 @@ class RecoveryFilesInfoRequest implements Streamable {
this.phase1ExistingTotalSize = phase1ExistingTotalSize; this.phase1ExistingTotalSize = phase1ExistingTotalSize;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() {
return shardId;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
int size = in.readVInt(); int size = in.readVInt();
phase1FileNames = new ArrayList<String>(size); phase1FileNames = new ArrayList<String>(size);
@ -88,6 +99,7 @@ class RecoveryFilesInfoRequest implements Streamable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
out.writeVInt(phase1FileNames.size()); out.writeVInt(phase1FileNames.size());

View File

@ -31,26 +31,35 @@ import java.io.IOException;
*/ */
class RecoveryFinalizeRecoveryRequest implements Streamable { class RecoveryFinalizeRecoveryRequest implements Streamable {
private long recoveryId;
private ShardId shardId; private ShardId shardId;
RecoveryFinalizeRecoveryRequest() { RecoveryFinalizeRecoveryRequest() {
} }
RecoveryFinalizeRecoveryRequest(ShardId shardId) { RecoveryFinalizeRecoveryRequest(long recoveryId, ShardId shardId) {
this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
} }
} }

View File

@ -31,26 +31,35 @@ import java.io.IOException;
*/ */
class RecoveryPrepareForTranslogOperationsRequest implements Streamable { class RecoveryPrepareForTranslogOperationsRequest implements Streamable {
private long recoveryId;
private ShardId shardId; private ShardId shardId;
RecoveryPrepareForTranslogOperationsRequest() { RecoveryPrepareForTranslogOperationsRequest() {
} }
RecoveryPrepareForTranslogOperationsRequest(ShardId shardId) { RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId) {
this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
} }
} }

View File

@ -120,7 +120,7 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), response.phase1FileNames, response.phase1FileSizes,
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet();
@ -156,7 +156,7 @@ public class RecoverySource extends AbstractComponent {
indexInput.readBytes(buf, 0, toRead, false); indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead); BytesArray content = new BytesArray(buf, 0, toRead);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), content), transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content),
TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead; readCount += toRead;
} }
@ -185,7 +185,7 @@ public class RecoverySource extends AbstractComponent {
// now, set the clean files request // now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles()); Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
stopWatch.stop(); stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
@ -202,7 +202,7 @@ public class RecoverySource extends AbstractComponent {
} }
logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode()); logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
stopWatch.stop(); stopWatch.stop();
response.startTime = stopWatch.totalTime().millis(); response.startTime = stopWatch.totalTime().millis();
logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
@ -224,7 +224,7 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot); int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
if (request.markAsRelocated()) { if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started // TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try { try {
@ -261,7 +261,7 @@ public class RecoverySource extends AbstractComponent {
recoverySettings.rateLimiter().pause(size); recoverySettings.rateLimiter().pause(size);
} }
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0; ops = 0;
size = 0; size = 0;
@ -270,7 +270,7 @@ public class RecoverySource extends AbstractComponent {
} }
// send the leftover // send the leftover
if (!operations.isEmpty()) { if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
} }
return totalOperations; return totalOperations;

View File

@ -21,6 +21,8 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -39,6 +41,16 @@ public class RecoveryStatus {
DONE DONE
} }
final ShardId shardId;
final long recoveryId;
final InternalIndexShard indexShard;
public RecoveryStatus(long recoveryId, InternalIndexShard indexShard) {
this.recoveryId = recoveryId;
this.indexShard = indexShard;
this.shardId = indexShard.shardId();
}
volatile Thread recoveryThread; volatile Thread recoveryThread;
volatile boolean canceled; volatile boolean canceled;
volatile boolean sentCanceledToSource; volatile boolean sentCanceledToSource;

View File

@ -33,9 +33,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -51,7 +51,6 @@ import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -80,7 +79,7 @@ public class RecoveryTarget extends AbstractComponent {
private final RecoverySettings recoverySettings; private final RecoverySettings recoverySettings;
private final ConcurrentMap<ShardId, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMapLong<RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMapLong();
@Inject @Inject
public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
@ -101,13 +100,15 @@ public class RecoveryTarget extends AbstractComponent {
indicesLifecycle.addListener(new IndicesLifecycle.Listener() { indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
@Override @Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
removeAndCleanOnGoingRecovery(shardId); if (indexShard != null) {
removeAndCleanOnGoingRecovery(findRecoveryByShard(indexShard));
}
} }
}); });
} }
public RecoveryStatus peerRecoveryStatus(ShardId shardId) { public RecoveryStatus peerRecoveryStatus(ShardId shardId) {
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shardId); RecoveryStatus peerRecoveryStatus = findRecoveryByShardId(shardId);
if (peerRecoveryStatus == null) { if (peerRecoveryStatus == null) {
return null; return null;
} }
@ -118,8 +119,8 @@ public class RecoveryTarget extends AbstractComponent {
return peerRecoveryStatus; return peerRecoveryStatus;
} }
public void cancelRecovery(ShardId shardId) { public void cancelRecovery(IndexShard indexShard) {
RecoveryStatus recoveryStatus = onGoingRecoveries.get(shardId); RecoveryStatus recoveryStatus = findRecoveryByShard(indexShard);
// it might be if the recovery source got canceled first // it might be if the recovery source got canceled first
if (recoveryStatus == null) { if (recoveryStatus == null) {
return; return;
@ -143,76 +144,71 @@ public class RecoveryTarget extends AbstractComponent {
break; break;
} }
} }
removeAndCleanOnGoingRecovery(shardId); removeAndCleanOnGoingRecovery(recoveryStatus);
} }
public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) { public void startRecovery(final StartRecoveryRequest request, final InternalIndexShard indexShard, final RecoveryListener listener) {
if (request.sourceNode() == null) { try {
listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update"); indexShard.recovering("from " + request.sourceNode());
return; } catch (IllegalIndexShardStateException e) {
} // that's fine, since we might be called concurrently, just ignore this, we are already recovering
IndexService indexService = indicesService.indexService(request.shardId().index().name()); listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
if (indexService == null) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "index missing locally, stop recovery");
return;
}
final InternalIndexShard shard = (InternalIndexShard) indexService.shard(request.shardId().id());
if (shard == null) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
return;
}
if (!fromRetry) {
try {
shard.recovering("from " + request.sourceNode());
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage());
return;
}
}
if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return; return;
} }
threadPool.generic().execute(new Runnable() { threadPool.generic().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
doRecovery(shard, request, fromRetry, listener); // create a new recovery status, and process...
RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard);
onGoingRecoveries.put(recoveryStatus.recoveryId, recoveryStatus);
doRecovery(request, recoveryStatus, listener);
} }
}); });
} }
private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) { public void retryRecovery(final StartRecoveryRequest request, final RecoveryStatus status, final RecoveryListener listener) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
doRecovery(request, status, listener);
}
});
}
private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update");
return;
}
final InternalIndexShard shard = recoveryStatus.indexShard;
if (shard == null) {
listener.onIgnoreRecovery(false, "shard missing locally, stop recovery");
return;
}
if (shard.state() == IndexShardState.CLOSED) { if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId());
listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return; return;
} }
if (recoveryStatus.canceled) {
RecoveryStatus recovery; // don't remove it, the cancellation code will remove it...
if (fromRetry) { listener.onIgnoreRecovery(false, "canceled recovery");
recovery = onGoingRecoveries.get(request.shardId()); return;
} else {
recovery = new RecoveryStatus();
onGoingRecoveries.put(request.shardId(), recovery);
} }
recovery.recoveryThread = Thread.currentThread();
recoveryStatus.recoveryThread = Thread.currentThread();
try { try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode()); logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
RecoveryResponse recoveryStatus = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() { RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override @Override
public RecoveryResponse newInstance() { public RecoveryResponse newInstance() {
return new RecoveryResponse(); return new RecoveryResponse();
} }
}).txGet(); }).txGet();
if (shard.state() == IndexShardState.CLOSED) { if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(shard.shardId()); removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return; return;
} }
@ -221,29 +217,29 @@ public class RecoveryTarget extends AbstractComponent {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] "); sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n"); sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]") sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']') .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n"); .append("\n");
sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n"); sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryStatus.startTime)).append("]\n"); sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations") sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]") .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n"); .append("\n");
sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations") sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]"); .append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]");
logger.debug(sb.toString()); logger.debug(sb.toString());
} }
removeAndCleanOnGoingRecovery(request.shardId()); removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onRecoveryDone(); listener.onRecoveryDone();
} catch (Exception e) { } catch (Exception e) {
// logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id()); // logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
if (recovery.canceled) { if (recoveryStatus.canceled) {
// don't remove it, the cancellation code will remove it... // don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery"); listener.onIgnoreRecovery(false, "canceled recovery");
return; return;
} }
if (shard.state() == IndexShardState.CLOSED) { if (shard.state() == IndexShardState.CLOSED) {
removeAndCleanOnGoingRecovery(request.shardId()); removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return; return;
} }
@ -263,7 +259,7 @@ public class RecoveryTarget extends AbstractComponent {
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry // if the target is not ready yet, retry
listener.onRetryRecovery(TimeValue.timeValueMillis(500)); listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus);
return; return;
} }
@ -272,7 +268,7 @@ public class RecoveryTarget extends AbstractComponent {
// in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later // in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later
// it will get deleted in the IndicesStore if all are allocated and no shard exists on this node... // it will get deleted in the IndicesStore if all are allocated and no shard exists on this node...
removeAndCleanOnGoingRecovery(request.shardId()); removeAndCleanOnGoingRecovery(recoveryStatus);
if (cause instanceof ConnectTransportException) { if (cause instanceof ConnectTransportException) {
listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")"); listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")");
@ -297,34 +293,57 @@ public class RecoveryTarget extends AbstractComponent {
public static interface RecoveryListener { public static interface RecoveryListener {
void onRecoveryDone(); void onRecoveryDone();
void onRetryRecovery(TimeValue retryAfter); void onRetryRecovery(TimeValue retryAfter, RecoveryStatus status);
void onIgnoreRecovery(boolean removeShard, String reason); void onIgnoreRecovery(boolean removeShard, String reason);
void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure); void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure);
} }
@Nullable
private RecoveryStatus findRecoveryByShardId(ShardId shardId) {
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
if (recoveryStatus.shardId.equals(shardId)) {
return recoveryStatus;
}
}
return null;
}
private void removeAndCleanOnGoingRecovery(ShardId shardId) { @Nullable
private RecoveryStatus findRecoveryByShard(IndexShard indexShard) {
for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) {
if (recoveryStatus.indexShard == indexShard) {
return recoveryStatus;
}
}
return null;
}
private void removeAndCleanOnGoingRecovery(@Nullable RecoveryStatus status) {
if (status == null) {
return;
}
// clean it from the on going recoveries since it is being closed // clean it from the on going recoveries since it is being closed
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.remove(shardId); status = onGoingRecoveries.remove(status.recoveryId);
if (peerRecoveryStatus != null) { if (status == null) {
// just mark it as canceled as well, just in case there are in flight requests return;
// coming from the recovery target }
peerRecoveryStatus.canceled = true; // just mark it as canceled as well, just in case there are in flight requests
// clean open index outputs // coming from the recovery target
for (Map.Entry<String, IndexOutput> entry : peerRecoveryStatus.openIndexOutputs.entrySet()) { status.canceled = true;
synchronized (entry.getValue()) { // clean open index outputs
try { for (Map.Entry<String, IndexOutput> entry : status.openIndexOutputs.entrySet()) {
entry.getValue().close(); synchronized (entry.getValue()) {
} catch (Exception e) { try {
// ignore entry.getValue().close();
} } catch (Exception e) {
// ignore
} }
} }
peerRecoveryStatus.openIndexOutputs = null;
peerRecoveryStatus.checksums = null;
} }
status.openIndexOutputs = null;
status.checksums = null;
} }
class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> { class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@ -341,20 +360,19 @@ public class RecoveryTarget extends AbstractComponent {
@Override @Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) { if (onGoingRecovery == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
if (onGoingRecovery.canceled) { if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true; onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG; onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG;
shard.performRecoveryPrepareForTranslog(); onGoingRecovery.indexShard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE); channel.sendResponse(VoidStreamable.INSTANCE);
} }
} }
@ -373,18 +391,18 @@ public class RecoveryTarget extends AbstractComponent {
@Override @Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) { if (onGoingRecovery == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
if (onGoingRecovery.canceled) { if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true; onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
onGoingRecovery.stage = RecoveryStatus.Stage.FINALIZE; onGoingRecovery.stage = RecoveryStatus.Stage.FINALIZE;
shard.performRecoveryFinalization(false, onGoingRecovery); onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery);
onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime; onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime;
onGoingRecovery.stage = RecoveryStatus.Stage.DONE; onGoingRecovery.stage = RecoveryStatus.Stage.DONE;
channel.sendResponse(VoidStreamable.INSTANCE); channel.sendResponse(VoidStreamable.INSTANCE);
@ -406,7 +424,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override @Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception { public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.shardId()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) { if (onGoingRecovery == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(request.shardId()); throw new IndexShardClosedException(request.shardId());
@ -425,17 +443,6 @@ public class RecoveryTarget extends AbstractComponent {
shard.performRecoveryOperation(operation); shard.performRecoveryOperation(operation);
onGoingRecovery.currentTranslogOperations++; onGoingRecovery.currentTranslogOperations++;
} }
onGoingRecovery = onGoingRecoveries.get(request.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
channel.sendResponse(VoidStreamable.INSTANCE); channel.sendResponse(VoidStreamable.INSTANCE);
} }
} }
@ -454,16 +461,16 @@ public class RecoveryTarget extends AbstractComponent {
@Override @Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception { public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) { if (onGoingRecovery == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
if (onGoingRecovery.canceled) { if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true; onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
onGoingRecovery.phase1FileNames = request.phase1FileNames; onGoingRecovery.phase1FileNames = request.phase1FileNames;
onGoingRecovery.phase1FileSizes = request.phase1FileSizes; onGoingRecovery.phase1FileSizes = request.phase1FileSizes;
onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames; onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames;
@ -489,23 +496,23 @@ public class RecoveryTarget extends AbstractComponent {
@Override @Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) { if (onGoingRecovery == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
if (onGoingRecovery.canceled) { if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true; onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
Store store = onGoingRecovery.indexShard.store();
// first, we go and move files that were created with the recovery id suffix to // first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas // the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes... // to recover from in case of a full cluster shutdown just when this code executes...
String prefix = "recovery." + onGoingRecovery.startTime + "."; String prefix = "recovery." + onGoingRecovery.startTime + ".";
Set<String> filesToRename = Sets.newHashSet(); Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : shard.store().directory().listAll()) { for (String existingFile : store.directory().listAll()) {
if (existingFile.startsWith(prefix)) { if (existingFile.startsWith(prefix)) {
filesToRename.add(existingFile.substring(prefix.length(), existingFile.length())); filesToRename.add(existingFile.substring(prefix.length(), existingFile.length()));
} }
@ -514,12 +521,12 @@ public class RecoveryTarget extends AbstractComponent {
if (!filesToRename.isEmpty()) { if (!filesToRename.isEmpty()) {
// first, go and delete the existing ones // first, go and delete the existing ones
for (String fileToRename : filesToRename) { for (String fileToRename : filesToRename) {
shard.store().directory().deleteFile(fileToRename); store.directory().deleteFile(fileToRename);
} }
for (String fileToRename : filesToRename) { for (String fileToRename : filesToRename) {
// now, rename the files... // now, rename the files...
try { try {
shard.store().renameFile(prefix + fileToRename, fileToRename); store.renameFile(prefix + fileToRename, fileToRename);
} catch (Exception e) { } catch (Exception e) {
failureToRename = e; failureToRename = e;
break; break;
@ -530,13 +537,13 @@ public class RecoveryTarget extends AbstractComponent {
throw failureToRename; throw failureToRename;
} }
// now write checksums // now write checksums
shard.store().writeChecksums(onGoingRecovery.checksums); store.writeChecksums(onGoingRecovery.checksums);
for (String existingFile : shard.store().directory().listAll()) { for (String existingFile : store.directory().listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) { if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) {
try { try {
shard.store().directory().deleteFile(existingFile); store.directory().deleteFile(existingFile);
} catch (Exception e) { } catch (Exception e) {
// ignore, we don't really care, will get deleted later on // ignore, we don't really care, will get deleted later on
} }
@ -561,16 +568,18 @@ public class RecoveryTarget extends AbstractComponent {
@Override @Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception { public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) { if (onGoingRecovery == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
if (onGoingRecovery.canceled) { if (onGoingRecovery.canceled) {
onGoingRecovery.sentCanceledToSource = true; onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
Store store = onGoingRecovery.indexShard.store();
IndexOutput indexOutput; IndexOutput indexOutput;
if (request.position() == 0) { if (request.position() == 0) {
// first request // first request
@ -592,11 +601,11 @@ public class RecoveryTarget extends AbstractComponent {
// case where the index is half moved // case where the index is half moved
String name = request.name(); String name = request.name();
if (shard.store().directory().fileExists(name)) { if (store.directory().fileExists(name)) {
name = "recovery." + onGoingRecovery.startTime + "." + name; name = "recovery." + onGoingRecovery.startTime + "." + name;
} }
indexOutput = shard.store().createOutputRaw(name); indexOutput = store.createOutputRaw(name);
onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput);
} else { } else {
@ -604,7 +613,7 @@ public class RecoveryTarget extends AbstractComponent {
} }
if (indexOutput == null) { if (indexOutput == null) {
// shard is getting closed on us // shard is getting closed on us
throw new IndexShardClosedException(shard.shardId()); throw new IndexShardClosedException(request.shardId());
} }
synchronized (indexOutput) { synchronized (indexOutput) {
try { try {
@ -624,7 +633,7 @@ public class RecoveryTarget extends AbstractComponent {
if (request.checksum() != null) { if (request.checksum() != null) {
onGoingRecovery.checksums.put(request.name(), request.checksum()); onGoingRecovery.checksums.put(request.name(), request.checksum());
} }
shard.store().directory().sync(Collections.singleton(request.name())); store.directory().sync(Collections.singleton(request.name()));
onGoingRecovery.openIndexOutputs.remove(request.name()); onGoingRecovery.openIndexOutputs.remove(request.name());
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -35,17 +35,23 @@ import java.util.List;
*/ */
class RecoveryTranslogOperationsRequest implements Streamable { class RecoveryTranslogOperationsRequest implements Streamable {
private long recoveryId;
private ShardId shardId; private ShardId shardId;
private List<Translog.Operation> operations; private List<Translog.Operation> operations;
RecoveryTranslogOperationsRequest() { RecoveryTranslogOperationsRequest() {
} }
RecoveryTranslogOperationsRequest(ShardId shardId, List<Translog.Operation> operations) { RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations) {
this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
this.operations = operations; this.operations = operations;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
@ -56,6 +62,7 @@ class RecoveryTranslogOperationsRequest implements Streamable {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
int size = in.readVInt(); int size = in.readVInt();
operations = Lists.newArrayListWithExpectedSize(size); operations = Lists.newArrayListWithExpectedSize(size);
@ -66,6 +73,7 @@ class RecoveryTranslogOperationsRequest implements Streamable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
out.writeVInt(operations.size()); out.writeVInt(operations.size());
for (Translog.Operation operation : operations) { for (Translog.Operation operation : operations) {

View File

@ -29,12 +29,17 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* *
*/ */
public class StartRecoveryRequest implements Streamable { public class StartRecoveryRequest implements Streamable {
private static final AtomicLong recoveryIdGenerator = new AtomicLong();
private long recoveryId;
private ShardId shardId; private ShardId shardId;
private DiscoveryNode sourceNode; private DiscoveryNode sourceNode;
@ -58,6 +63,7 @@ public class StartRecoveryRequest implements Streamable {
* @param existingFiles * @param existingFiles
*/ */
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map<String, StoreFileMetaData> existingFiles) { public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map<String, StoreFileMetaData> existingFiles) {
this.recoveryId = recoveryIdGenerator.incrementAndGet();
this.shardId = shardId; this.shardId = shardId;
this.sourceNode = sourceNode; this.sourceNode = sourceNode;
this.targetNode = targetNode; this.targetNode = targetNode;
@ -65,6 +71,10 @@ public class StartRecoveryRequest implements Streamable {
this.existingFiles = existingFiles; this.existingFiles = existingFiles;
} }
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
@ -87,6 +97,7 @@ public class StartRecoveryRequest implements Streamable {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
sourceNode = DiscoveryNode.readNode(in); sourceNode = DiscoveryNode.readNode(in);
targetNode = DiscoveryNode.readNode(in); targetNode = DiscoveryNode.readNode(in);
@ -101,6 +112,7 @@ public class StartRecoveryRequest implements Streamable {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out); shardId.writeTo(out);
sourceNode.writeTo(out); sourceNode.writeTo(out);
targetNode.writeTo(out); targetNode.writeTo(out);