From 346fb9ed835e8d230ed2718b3f27f7364d26ddad Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 6 Aug 2012 17:09:00 +0200 Subject: [PATCH] create a unique recovery id when recovering from a peer shard instead of using shard id this allows to handle better cases where we need to cancel an existing recovery --- .../cluster/IndicesClusterStateService.java | 44 +-- .../recovery/RecoveryCleanFilesRequest.java | 10 +- .../recovery/RecoveryFileChunkRequest.java | 10 +- .../recovery/RecoveryFilesInfoRequest.java | 16 +- .../RecoveryFinalizeRecoveryRequest.java | 11 +- ...ryPrepareForTranslogOperationsRequest.java | 11 +- .../indices/recovery/RecoverySource.java | 14 +- .../indices/recovery/RecoveryStatus.java | 12 + .../indices/recovery/RecoveryTarget.java | 259 +++++++++--------- .../RecoveryTranslogOperationsRequest.java | 10 +- .../recovery/StartRecoveryRequest.java | 12 + 11 files changed, 240 insertions(+), 169 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9a9b78430ce..3dc172d584a 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -57,6 +57,7 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryFailedException; +import org.elasticsearch.indices.recovery.RecoveryStatus; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.threadpool.ThreadPool; @@ -495,27 +496,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent snapshotFiles; @@ -40,11 +41,16 @@ class RecoveryCleanFilesRequest implements Streamable { RecoveryCleanFilesRequest() { } - RecoveryCleanFilesRequest(ShardId shardId, Set snapshotFiles) { + RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Set snapshotFiles) { + this.recoveryId = recoveryId; this.shardId = shardId; this.snapshotFiles = snapshotFiles; } + public long recoveryId() { + return this.recoveryId; + } + public ShardId shardId() { return shardId; } @@ -55,6 +61,7 @@ class RecoveryCleanFilesRequest implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); int size = in.readVInt(); snapshotFiles = Sets.newHashSetWithExpectedSize(size); @@ -65,6 +72,7 @@ class RecoveryCleanFilesRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); out.writeVInt(snapshotFiles.size()); for (String snapshotFile : snapshotFiles) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java index 554a4f896e8..4acc735f154 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java @@ -34,6 +34,7 @@ import java.io.IOException; */ class RecoveryFileChunkRequest implements Streamable { + private long recoveryId; private ShardId shardId; private String name; private long position; @@ -44,7 +45,8 @@ class RecoveryFileChunkRequest implements Streamable { RecoveryFileChunkRequest() { } - RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, String checksum, BytesArray content) { + RecoveryFileChunkRequest(long recoveryId, ShardId shardId, String name, long position, long length, String checksum, BytesArray content) { + this.recoveryId = recoveryId; this.shardId = shardId; this.name = name; this.position = position; @@ -53,6 +55,10 @@ class RecoveryFileChunkRequest implements Streamable { this.content = content; } + public long recoveryId() { + return this.recoveryId; + } + public ShardId shardId() { return shardId; } @@ -86,6 +92,7 @@ class RecoveryFileChunkRequest implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); name = in.readUTF(); position = in.readVLong(); @@ -98,6 +105,7 @@ class RecoveryFileChunkRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); out.writeUTF(name); out.writeVLong(position); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java index 0432baf0b41..faac88de09f 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java @@ -33,7 +33,8 @@ import java.util.List; */ class RecoveryFilesInfoRequest implements Streamable { - ShardId shardId; + private long recoveryId; + private ShardId shardId; List phase1FileNames; List phase1FileSizes; @@ -45,7 +46,8 @@ class RecoveryFilesInfoRequest implements Streamable { RecoveryFilesInfoRequest() { } - RecoveryFilesInfoRequest(ShardId shardId, List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) { + RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) { + this.recoveryId = recoveryId; this.shardId = shardId; this.phase1FileNames = phase1FileNames; this.phase1FileSizes = phase1FileSizes; @@ -55,8 +57,17 @@ class RecoveryFilesInfoRequest implements Streamable { this.phase1ExistingTotalSize = phase1ExistingTotalSize; } + public long recoveryId() { + return this.recoveryId; + } + + public ShardId shardId() { + return shardId; + } + @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); int size = in.readVInt(); phase1FileNames = new ArrayList(size); @@ -88,6 +99,7 @@ class RecoveryFilesInfoRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); out.writeVInt(phase1FileNames.size()); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java index b0c5e1b89bc..ae6485dd7bc 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java @@ -31,26 +31,35 @@ import java.io.IOException; */ class RecoveryFinalizeRecoveryRequest implements Streamable { + private long recoveryId; + private ShardId shardId; RecoveryFinalizeRecoveryRequest() { } - RecoveryFinalizeRecoveryRequest(ShardId shardId) { + RecoveryFinalizeRecoveryRequest(long recoveryId, ShardId shardId) { + this.recoveryId = recoveryId; this.shardId = shardId; } + public long recoveryId() { + return this.recoveryId; + } + public ShardId shardId() { return shardId; } @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index 5ce347accc5..e7fa0b9b3b8 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -31,26 +31,35 @@ import java.io.IOException; */ class RecoveryPrepareForTranslogOperationsRequest implements Streamable { + private long recoveryId; + private ShardId shardId; RecoveryPrepareForTranslogOperationsRequest() { } - RecoveryPrepareForTranslogOperationsRequest(ShardId shardId) { + RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId) { + this.recoveryId = recoveryId; this.shardId = shardId; } + public long recoveryId() { + return this.recoveryId; + } + public ShardId shardId() { return shardId; } @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index cb6b4e9168e..afdd3c27dc8 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -120,7 +120,7 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); - RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, + RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet(); @@ -156,7 +156,7 @@ public class RecoverySource extends AbstractComponent { indexInput.readBytes(buf, 0, toRead, false); BytesArray content = new BytesArray(buf, 0, toRead); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), content), + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content), TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } @@ -185,7 +185,7 @@ public class RecoverySource extends AbstractComponent { // now, set the clean files request Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); stopWatch.stop(); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); @@ -202,7 +202,7 @@ public class RecoverySource extends AbstractComponent { } logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis(); logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); @@ -224,7 +224,7 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); int totalOperations = sendSnapshot(snapshot); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); if (request.markAsRelocated()) { // TODO what happens if the recovery process fails afterwards, we need to mark this back to started try { @@ -261,7 +261,7 @@ public class RecoverySource extends AbstractComponent { recoverySettings.rateLimiter().pause(size); } - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); ops = 0; size = 0; @@ -270,7 +270,7 @@ public class RecoverySource extends AbstractComponent { } // send the leftover if (!operations.isEmpty()) { - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); } return totalOperations; diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java index b23ea70a348..29148c8ba77 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java @@ -21,6 +21,8 @@ package org.elasticsearch.indices.recovery; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.InternalIndexShard; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -39,6 +41,16 @@ public class RecoveryStatus { DONE } + final ShardId shardId; + final long recoveryId; + final InternalIndexShard indexShard; + + public RecoveryStatus(long recoveryId, InternalIndexShard indexShard) { + this.recoveryId = recoveryId; + this.indexShard = indexShard; + this.shardId = indexShard.shardId(); + } + volatile Thread recoveryThread; volatile boolean canceled; volatile boolean sentCanceledToSource; diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index fe67364a95c..0caf6ec54da 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -33,9 +33,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; @@ -51,7 +51,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -80,7 +79,7 @@ public class RecoveryTarget extends AbstractComponent { private final RecoverySettings recoverySettings; - private final ConcurrentMap onGoingRecoveries = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMapLong onGoingRecoveries = ConcurrentCollections.newConcurrentMapLong(); @Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, @@ -101,13 +100,15 @@ public class RecoveryTarget extends AbstractComponent { indicesLifecycle.addListener(new IndicesLifecycle.Listener() { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { - removeAndCleanOnGoingRecovery(shardId); + if (indexShard != null) { + removeAndCleanOnGoingRecovery(findRecoveryByShard(indexShard)); + } } }); } public RecoveryStatus peerRecoveryStatus(ShardId shardId) { - RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shardId); + RecoveryStatus peerRecoveryStatus = findRecoveryByShardId(shardId); if (peerRecoveryStatus == null) { return null; } @@ -118,8 +119,8 @@ public class RecoveryTarget extends AbstractComponent { return peerRecoveryStatus; } - public void cancelRecovery(ShardId shardId) { - RecoveryStatus recoveryStatus = onGoingRecoveries.get(shardId); + public void cancelRecovery(IndexShard indexShard) { + RecoveryStatus recoveryStatus = findRecoveryByShard(indexShard); // it might be if the recovery source got canceled first if (recoveryStatus == null) { return; @@ -143,76 +144,71 @@ public class RecoveryTarget extends AbstractComponent { break; } } - removeAndCleanOnGoingRecovery(shardId); + removeAndCleanOnGoingRecovery(recoveryStatus); } - public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) { - if (request.sourceNode() == null) { - listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update"); - return; - } - IndexService indexService = indicesService.indexService(request.shardId().index().name()); - if (indexService == null) { - removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "index missing locally, stop recovery"); - return; - } - final InternalIndexShard shard = (InternalIndexShard) indexService.shard(request.shardId().id()); - if (shard == null) { - removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "shard missing locally, stop recovery"); - return; - } - if (!fromRetry) { - try { - shard.recovering("from " + request.sourceNode()); - } catch (IllegalIndexShardStateException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already recovering - listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage()); - return; - } - } - if (shard.state() == IndexShardState.CLOSED) { - removeAndCleanOnGoingRecovery(request.shardId()); - listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); + public void startRecovery(final StartRecoveryRequest request, final InternalIndexShard indexShard, final RecoveryListener listener) { + try { + indexShard.recovering("from " + request.sourceNode()); + } catch (IllegalIndexShardStateException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already recovering + listener.onIgnoreRecovery(false, "already in recovering process, " + e.getMessage()); return; } threadPool.generic().execute(new Runnable() { @Override public void run() { - doRecovery(shard, request, fromRetry, listener); + // create a new recovery status, and process... + RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard); + onGoingRecoveries.put(recoveryStatus.recoveryId, recoveryStatus); + doRecovery(request, recoveryStatus, listener); } }); } - private void doRecovery(final InternalIndexShard shard, final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) { + public void retryRecovery(final StartRecoveryRequest request, final RecoveryStatus status, final RecoveryListener listener) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + doRecovery(request, status, listener); + } + }); + } + + private void doRecovery(final StartRecoveryRequest request, final RecoveryStatus recoveryStatus, final RecoveryListener listener) { + if (request.sourceNode() == null) { + listener.onIgnoreRecovery(false, "No node to recover from, retry on next cluster state update"); + return; + } + final InternalIndexShard shard = recoveryStatus.indexShard; + if (shard == null) { + listener.onIgnoreRecovery(false, "shard missing locally, stop recovery"); + return; + } if (shard.state() == IndexShardState.CLOSED) { - removeAndCleanOnGoingRecovery(request.shardId()); listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } - - RecoveryStatus recovery; - if (fromRetry) { - recovery = onGoingRecoveries.get(request.shardId()); - } else { - recovery = new RecoveryStatus(); - onGoingRecoveries.put(request.shardId(), recovery); + if (recoveryStatus.canceled) { + // don't remove it, the cancellation code will remove it... + listener.onIgnoreRecovery(false, "canceled recovery"); + return; } - recovery.recoveryThread = Thread.currentThread(); + + recoveryStatus.recoveryThread = Thread.currentThread(); try { logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode()); StopWatch stopWatch = new StopWatch().start(); - RecoveryResponse recoveryStatus = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { + RecoveryResponse recoveryResponse = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { @Override public RecoveryResponse newInstance() { return new RecoveryResponse(); } }).txGet(); if (shard.state() == IndexShardState.CLOSED) { - removeAndCleanOnGoingRecovery(shard.shardId()); + removeAndCleanOnGoingRecovery(recoveryStatus); listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } @@ -221,29 +217,29 @@ public class RecoveryTarget extends AbstractComponent { StringBuilder sb = new StringBuilder(); sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] "); sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n"); - sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]") - .append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']') + sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]") + .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']') .append("\n"); - sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n"); - sb.append(" phase2: start took [").append(timeValueMillis(recoveryStatus.startTime)).append("]\n"); - sb.append(" : recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations") - .append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]") + sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n"); + sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n"); + sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log operations") + .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]") .append("\n"); - sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations") - .append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]"); + sb.append(" phase3: recovered [").append(recoveryResponse.phase3Operations).append("]").append(" transaction log operations") + .append(", took [").append(timeValueMillis(recoveryResponse.phase3Time)).append("]"); logger.debug(sb.toString()); } - removeAndCleanOnGoingRecovery(request.shardId()); + removeAndCleanOnGoingRecovery(recoveryStatus); listener.onRecoveryDone(); } catch (Exception e) { // logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id()); - if (recovery.canceled) { + if (recoveryStatus.canceled) { // don't remove it, the cancellation code will remove it... listener.onIgnoreRecovery(false, "canceled recovery"); return; } if (shard.state() == IndexShardState.CLOSED) { - removeAndCleanOnGoingRecovery(request.shardId()); + removeAndCleanOnGoingRecovery(recoveryStatus); listener.onIgnoreRecovery(false, "local shard closed, stop recovery"); return; } @@ -263,7 +259,7 @@ public class RecoveryTarget extends AbstractComponent { if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { // if the target is not ready yet, retry - listener.onRetryRecovery(TimeValue.timeValueMillis(500)); + listener.onRetryRecovery(TimeValue.timeValueMillis(500), recoveryStatus); return; } @@ -272,7 +268,7 @@ public class RecoveryTarget extends AbstractComponent { // in general, no need to clean the shard on ignored recovery, since we want to try and reuse it later // it will get deleted in the IndicesStore if all are allocated and no shard exists on this node... - removeAndCleanOnGoingRecovery(request.shardId()); + removeAndCleanOnGoingRecovery(recoveryStatus); if (cause instanceof ConnectTransportException) { listener.onIgnoreRecovery(true, "source node disconnected (" + request.sourceNode() + ")"); @@ -297,34 +293,57 @@ public class RecoveryTarget extends AbstractComponent { public static interface RecoveryListener { void onRecoveryDone(); - void onRetryRecovery(TimeValue retryAfter); + void onRetryRecovery(TimeValue retryAfter, RecoveryStatus status); void onIgnoreRecovery(boolean removeShard, String reason); void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure); } + @Nullable + private RecoveryStatus findRecoveryByShardId(ShardId shardId) { + for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) { + if (recoveryStatus.shardId.equals(shardId)) { + return recoveryStatus; + } + } + return null; + } - private void removeAndCleanOnGoingRecovery(ShardId shardId) { + @Nullable + private RecoveryStatus findRecoveryByShard(IndexShard indexShard) { + for (RecoveryStatus recoveryStatus : onGoingRecoveries.values()) { + if (recoveryStatus.indexShard == indexShard) { + return recoveryStatus; + } + } + return null; + } + + private void removeAndCleanOnGoingRecovery(@Nullable RecoveryStatus status) { + if (status == null) { + return; + } // clean it from the on going recoveries since it is being closed - RecoveryStatus peerRecoveryStatus = onGoingRecoveries.remove(shardId); - if (peerRecoveryStatus != null) { - // just mark it as canceled as well, just in case there are in flight requests - // coming from the recovery target - peerRecoveryStatus.canceled = true; - // clean open index outputs - for (Map.Entry entry : peerRecoveryStatus.openIndexOutputs.entrySet()) { - synchronized (entry.getValue()) { - try { - entry.getValue().close(); - } catch (Exception e) { - // ignore - } + status = onGoingRecoveries.remove(status.recoveryId); + if (status == null) { + return; + } + // just mark it as canceled as well, just in case there are in flight requests + // coming from the recovery target + status.canceled = true; + // clean open index outputs + for (Map.Entry entry : status.openIndexOutputs.entrySet()) { + synchronized (entry.getValue()) { + try { + entry.getValue().close(); + } catch (Exception e) { + // ignore } } - peerRecoveryStatus.openIndexOutputs = null; - peerRecoveryStatus.checksums = null; } + status.openIndexOutputs = null; + status.checksums = null; } class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler { @@ -341,20 +360,19 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { - InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); - - RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId()); if (onGoingRecovery == null) { // shard is getting closed on us - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } if (onGoingRecovery.canceled) { onGoingRecovery.sentCanceledToSource = true; - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } + onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG; - shard.performRecoveryPrepareForTranslog(); + onGoingRecovery.indexShard.performRecoveryPrepareForTranslog(); channel.sendResponse(VoidStreamable.INSTANCE); } } @@ -373,18 +391,18 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { - InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); - RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId()); if (onGoingRecovery == null) { // shard is getting closed on us - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } if (onGoingRecovery.canceled) { onGoingRecovery.sentCanceledToSource = true; - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } + onGoingRecovery.stage = RecoveryStatus.Stage.FINALIZE; - shard.performRecoveryFinalization(false, onGoingRecovery); + onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery); onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime; onGoingRecovery.stage = RecoveryStatus.Stage.DONE; channel.sendResponse(VoidStreamable.INSTANCE); @@ -406,7 +424,7 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception { - RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.shardId()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId()); if (onGoingRecovery == null) { // shard is getting closed on us throw new IndexShardClosedException(request.shardId()); @@ -425,17 +443,6 @@ public class RecoveryTarget extends AbstractComponent { shard.performRecoveryOperation(operation); onGoingRecovery.currentTranslogOperations++; } - - onGoingRecovery = onGoingRecoveries.get(request.shardId()); - if (onGoingRecovery == null) { - // shard is getting closed on us - throw new IndexShardClosedException(request.shardId()); - } - if (onGoingRecovery.canceled) { - onGoingRecovery.sentCanceledToSource = true; - throw new IndexShardClosedException(request.shardId()); - } - channel.sendResponse(VoidStreamable.INSTANCE); } } @@ -454,16 +461,16 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception { - InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id()); - RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId()); if (onGoingRecovery == null) { // shard is getting closed on us - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } if (onGoingRecovery.canceled) { onGoingRecovery.sentCanceledToSource = true; - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } + onGoingRecovery.phase1FileNames = request.phase1FileNames; onGoingRecovery.phase1FileSizes = request.phase1FileSizes; onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames; @@ -489,23 +496,23 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { - InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); - RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId()); if (onGoingRecovery == null) { // shard is getting closed on us - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } if (onGoingRecovery.canceled) { onGoingRecovery.sentCanceledToSource = true; - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } + Store store = onGoingRecovery.indexShard.store(); // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas // to recover from in case of a full cluster shutdown just when this code executes... String prefix = "recovery." + onGoingRecovery.startTime + "."; Set filesToRename = Sets.newHashSet(); - for (String existingFile : shard.store().directory().listAll()) { + for (String existingFile : store.directory().listAll()) { if (existingFile.startsWith(prefix)) { filesToRename.add(existingFile.substring(prefix.length(), existingFile.length())); } @@ -514,12 +521,12 @@ public class RecoveryTarget extends AbstractComponent { if (!filesToRename.isEmpty()) { // first, go and delete the existing ones for (String fileToRename : filesToRename) { - shard.store().directory().deleteFile(fileToRename); + store.directory().deleteFile(fileToRename); } for (String fileToRename : filesToRename) { // now, rename the files... try { - shard.store().renameFile(prefix + fileToRename, fileToRename); + store.renameFile(prefix + fileToRename, fileToRename); } catch (Exception e) { failureToRename = e; break; @@ -530,13 +537,13 @@ public class RecoveryTarget extends AbstractComponent { throw failureToRename; } // now write checksums - shard.store().writeChecksums(onGoingRecovery.checksums); + store.writeChecksums(onGoingRecovery.checksums); - for (String existingFile : shard.store().directory().listAll()) { + for (String existingFile : store.directory().listAll()) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) { try { - shard.store().directory().deleteFile(existingFile); + store.directory().deleteFile(existingFile); } catch (Exception e) { // ignore, we don't really care, will get deleted later on } @@ -561,16 +568,18 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception { - InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); - RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId()); if (onGoingRecovery == null) { // shard is getting closed on us - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } if (onGoingRecovery.canceled) { onGoingRecovery.sentCanceledToSource = true; - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } + + Store store = onGoingRecovery.indexShard.store(); + IndexOutput indexOutput; if (request.position() == 0) { // first request @@ -592,11 +601,11 @@ public class RecoveryTarget extends AbstractComponent { // case where the index is half moved String name = request.name(); - if (shard.store().directory().fileExists(name)) { + if (store.directory().fileExists(name)) { name = "recovery." + onGoingRecovery.startTime + "." + name; } - indexOutput = shard.store().createOutputRaw(name); + indexOutput = store.createOutputRaw(name); onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); } else { @@ -604,7 +613,7 @@ public class RecoveryTarget extends AbstractComponent { } if (indexOutput == null) { // shard is getting closed on us - throw new IndexShardClosedException(shard.shardId()); + throw new IndexShardClosedException(request.shardId()); } synchronized (indexOutput) { try { @@ -624,7 +633,7 @@ public class RecoveryTarget extends AbstractComponent { if (request.checksum() != null) { onGoingRecovery.checksums.put(request.name(), request.checksum()); } - shard.store().directory().sync(Collections.singleton(request.name())); + store.directory().sync(Collections.singleton(request.name())); onGoingRecovery.openIndexOutputs.remove(request.name()); } } catch (IOException e) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 392eba0a447..c27d8446b61 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -35,17 +35,23 @@ import java.util.List; */ class RecoveryTranslogOperationsRequest implements Streamable { + private long recoveryId; private ShardId shardId; private List operations; RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(ShardId shardId, List operations) { + RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations) { + this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; } + public long recoveryId() { + return this.recoveryId; + } + public ShardId shardId() { return shardId; } @@ -56,6 +62,7 @@ class RecoveryTranslogOperationsRequest implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); int size = in.readVInt(); operations = Lists.newArrayListWithExpectedSize(size); @@ -66,6 +73,7 @@ class RecoveryTranslogOperationsRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); out.writeVInt(operations.size()); for (Translog.Operation operation : operations) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 80970a1fcf4..65a712475b4 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -29,12 +29,17 @@ import org.elasticsearch.index.store.StoreFileMetaData; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * */ public class StartRecoveryRequest implements Streamable { + private static final AtomicLong recoveryIdGenerator = new AtomicLong(); + + private long recoveryId; + private ShardId shardId; private DiscoveryNode sourceNode; @@ -58,6 +63,7 @@ public class StartRecoveryRequest implements Streamable { * @param existingFiles */ public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map existingFiles) { + this.recoveryId = recoveryIdGenerator.incrementAndGet(); this.shardId = shardId; this.sourceNode = sourceNode; this.targetNode = targetNode; @@ -65,6 +71,10 @@ public class StartRecoveryRequest implements Streamable { this.existingFiles = existingFiles; } + public long recoveryId() { + return this.recoveryId; + } + public ShardId shardId() { return shardId; } @@ -87,6 +97,7 @@ public class StartRecoveryRequest implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { + recoveryId = in.readLong(); shardId = ShardId.readShardId(in); sourceNode = DiscoveryNode.readNode(in); targetNode = DiscoveryNode.readNode(in); @@ -101,6 +112,7 @@ public class StartRecoveryRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeLong(recoveryId); shardId.writeTo(out); sourceNode.writeTo(out); targetNode.writeTo(out);