Refactor shard recovery from anonymous class to ShardRecoveryHandler

Previously the bulk of our shard recovery code was in a 300-line
anonymous class in `RecoverySource`. This made it difficult to find and
more difficult to read.

This factors out that code into a `ShardRecoveryHandler` class, adding
javadocs for each function and phase of the recovery, as well as
comments explaining some of the more esoteric functions performed during

It's hoped that this will help more people understand Elasticsearch's
recovery procedure.

No *major* functionality has changed, only typo corrections, some minor
allocation improvements and logging clarification changes.
This commit is contained in:
Lee Hinman 2014-11-06 11:09:27 +01:00
parent 0ac9912b89
commit f0f6a2c396
2 changed files with 560 additions and 342 deletions

View File

@ -19,53 +19,21 @@
package org.elasticsearch.indices.recovery; package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import java.util.List; import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this * The source recovery accepts recovery requests from other peer shards and start the recovery process from this
@ -133,315 +101,12 @@ public class RecoverySource extends AbstractComponent {
} }
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated()); logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
final RecoveryResponse response = new RecoveryResponse();
shard.recover(new Engine.RecoveryHandler() {
public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchException {
long totalSize = 0;
long existingTotalSize = 0;
final Store store =;
try {
StopWatch stopWatch = new StopWatch().start();
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
for (String name : snapshot.getFiles()) {
final StoreFileMetaData md = recoverySourceMetadata.get(name);
if (md == null) {"Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());
throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " + recoverySourceMetadata.asMap().size() + " files", name);
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(new Store.MetadataSnapshot(request.existingFiles()));
for (StoreFileMetaData md : diff.identical) {
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(),, md.checksum(), md.length());
totalSize += md.length();
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
if (request.existingFiles().containsKey( {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(),, request.existingFiles().get(, md);
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(),;
totalSize += md.length();
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
internalActionLongTimeout, clusterService, indicesService, mappingUpdatedAction, logger);
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), response.phase1FileNames, response.phase1FileSizes, shard.recover(handler);
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); return handler.getResponse();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
int fileIndex = 0;
for (final String name : response.phase1FileNames) {
ThreadPoolExecutor pool;
long fileSize = response.phase1FileSizes.get(fileIndex);
if (fileSize > recoverySettings.SMALL_FILE_CUTOFF_BYTES) {
pool = recoverySettings.concurrentStreamPool();
} else {
pool = recoverySettings.concurrentSmallFileStreamPool();
pool.execute(new Runnable() {
public void run() {
final StoreFileMetaData md = recoverySourceMetadata.get(name);
try (final IndexInput indexInput =, IOContext.READONCE)) {
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
final byte[] buf = new byte[BUFFER_SIZE];
boolean shouldCompressRequest = recoverySettings.compress();
if (CompressorFactory.isCompressed(indexInput)) {
shouldCompressRequest = false;
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
if (recoverySettings.rateLimiter() != null) {
indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead);
readCount += toRead;
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, readCount == len),
TransportRequestOptions.options().withCompress(shouldCompressRequest).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (Throwable e) {
final Throwable corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
// if we are not the first exception, add ourselves as suppressed to the main one:
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occured on recovery but checksums are ok", null);
exceptions.add(0, exception); // last exception first
logger.warn("{} File corruption on recovery {} local checksum OK", corruptIndexException, shard.shardId(), md);
} else {
exceptions.add(0, e); // last exceptions first
} finally {
try {
} finally {
if (corruptedEngine.get() != null) {
throw corruptedEngine.get();
} else {
// now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
logger.trace("{} recovery [phase2] to {}: start", request.shardId(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
response.startTime = stopWatch.totalTime().millis();
logger.trace("{} recovery [phase2] to {}: start took [{}]", request.shardId(), request.targetNode(), request.targetNode(), stopWatch.totalTime());
logger.trace("{} recovery [phase2] to {}: updating current mapping to master", request.shardId(), request.targetNode());
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
private void updateMappingOnMaster() {
// we test that the cluster state is in sync with our in memory mapping stored by the mapperService
// we have to do it under the "cluster state update" thread to make sure that one doesn't modify it
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
return currentState;
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
try {
} catch (InterruptedException e) {
if (documentMappersToUpdate.isEmpty()) {
final CountDownLatch updatedOnMaster = new CountDownLatch(documentMappersToUpdate.size());
MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() {
public void onMappingUpdate() {
public void onFailure(Throwable t) {
logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t);
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper, indexService.indexUUID(), listener);
try {
if (!updatedOnMaster.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.debug("{} recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]", request.shardId(), request.targetNode(), internalActionTimeout);
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping to update on master");
public void phase3(Translog.Snapshot snapshot) throws ElasticsearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), TransportRequestOptions.options().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
shard.relocated("to " + request.targetNode());
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase3Time = stopWatch.totalTime().millis();
response.phase3Operations = totalOperations;
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
int ops = 0;
long size = 0;
int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList();
Translog.Operation operation =;
while (operation != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
ops += 1;
size += operation.estimateSize();
if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
// don't throttle translog, since we lock for phase3 indexing, so we need to move it as
// fast as possible. Note, sine we index docs to replicas while the index files are recovered
// the lock can potentially be removed, in which case, it might make sense to re-enable
// throttling in this phase
// if (recoverySettings.rateLimiter() != null) {
// recoverySettings.rateLimiter().pause(size);
// }
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0;
size = 0;
operation =;
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
return totalOperations;
return response;
} }
class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler<StartRecoveryRequest> { class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler<StartRecoveryRequest> {

View File

@ -0,0 +1,553 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
* ShardRecoveryHandler handles the three phases of shard recovery, which is
* everything relating to copying the segment files as well as sending translog
* operations across the wire once the segments have been copied.
public class ShardRecoveryHandler implements Engine.RecoveryHandler {
private final ESLogger logger;
// Shard that is going to be recovered (the "source")
private final InternalIndexShard shard;
private final String indexName;
private final int shardId;
// Request containing source and target node information
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
private final TimeValue internalActionTimeout;
private final TimeValue internalActionLongTimeout;
private final ClusterService clusterService;
private final IndexService indexService;
private final MappingUpdatedAction mappingUpdatedAction;
private final RecoveryResponse response;
public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final TimeValue internalActionTimeout,
final TimeValue internalActionLongTimeout, final ClusterService clusterService,
final IndicesService indicesService, final MappingUpdatedAction mappingUpdatedAction, final ESLogger logger) {
this.shard = shard;
this.request = request;
this.recoverySettings = recoverySettings;
this.logger = logger;
this.transportService = transportService;
this.internalActionTimeout = internalActionTimeout;
this.internalActionLongTimeout = internalActionLongTimeout;
this.clusterService = clusterService;
this.indexName = this.request.shardId().index().name();
this.shardId = this.request.shardId().id();
this.indexService = indicesService.indexServiceSafe(indexName);
this.mappingUpdatedAction = mappingUpdatedAction;
this.response = new RecoveryResponse();
* @return the {@link RecoveryResponse} after the recovery has completed all three phases
public RecoveryResponse getResponse() {
return this.response;
* Perform phase1 of the recovery operations. Once this {@link SnapshotIndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)
* are effectively allowed on this index until all recovery phases are done
* Phase1 examines the segment files on the target node and copies over the
* segments that are missing. Only segments that have the same size and
* checksum can be reused
* {@code InternalEngine#recover} is responsible for snapshotting the index
* and releasing the snapshot once all 3 phases of recovery are complete
public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchException {
// Total size of segment files that are recovered
long totalSize = 0;
// Total size of segment files that were able to be re-used
long existingTotalSize = 0;
final Store store =;
try {
StopWatch stopWatch = new StopWatch().start();
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
for (String name : snapshot.getFiles()) {
final StoreFileMetaData md = recoverySourceMetadata.get(name);
if (md == null) {"Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());
throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " +
recoverySourceMetadata.asMap().size() + " files", name);
// Generate a "diff" of all the identical, different, and missing
// segment files on the target node, using the existing files on
// the source node
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(new Store.MetadataSnapshot(request.existingFiles()));
for (StoreFileMetaData md : diff.identical) {
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]",
indexName, shardId, request.targetNode(),, md.checksum(), md.length());
totalSize += md.length();
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
if (request.existingFiles().containsKey( {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
indexName, shardId, request.targetNode(),, request.existingFiles().get(, md);
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote",
indexName, shardId, request.targetNode(),;
totalSize += md.length();
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
// This latch will be used to wait until all files have been transferred to the target node
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
int fileIndex = 0;
ThreadPoolExecutor pool;
for (final String name : response.phase1FileNames) {
long fileSize = response.phase1FileSizes.get(fileIndex);
// Files are split into two categories, files that are "small"
// (under 5mb) and other files. Small files are transferred
// using a separate thread pool dedicated to small files.
// The idea behind this is that while we are transferring an
// older, large index, a user may create a new index, but that
// index will not be able to recover until the large index
// finishes, by using two different thread pools we can allow
// tiny files (like segments for a brand new index) to be
// recovered while ongoing large segment recoveries are
// happening. It also allows these pools to be configured
// separately.
if (fileSize > recoverySettings.SMALL_FILE_CUTOFF_BYTES) {
pool = recoverySettings.concurrentStreamPool();
} else {
pool = recoverySettings.concurrentSmallFileStreamPool();
pool.execute(new Runnable() {
public void run() {
final StoreFileMetaData md = recoverySourceMetadata.get(name);
try (final IndexInput indexInput =, IOContext.READONCE)) {
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
final byte[] buf = new byte[BUFFER_SIZE];
boolean shouldCompressRequest = recoverySettings.compress();
if (CompressorFactory.isCompressed(indexInput)) {
shouldCompressRequest = false;
long len = indexInput.length();
long readCount = 0;
TransportRequestOptions requestOptions = TransportRequestOptions.options()
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
// Pause using the rate limiter, if desired, to throttle the recovery
if (recoverySettings.rateLimiter() != null) {
indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead);
readCount += toRead;
// Actually send the file chunk to the target node, waiting for it to complete
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, readCount == len),
requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (Throwable e) {
final Throwable corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
// if we are not the first exception, add ourselves as suppressed to the main one:
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occured on recovery but checksums are ok", null);
exceptions.add(0, exception); // last exception first
logger.warn("{} File corruption on recovery {} local checksum OK",
corruptIndexException, shard.shardId(), md);
} else {
exceptions.add(0, e); // last exceptions first
} finally {
try {
} finally {
// Signify this file has completed by decrementing the latch
// Wait for all files that need to be transferred to finish transferring
if (corruptedEngine.get() != null) {
throw corruptedEngine.get();
} else {
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
// the files after they have been renamed.
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles),
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
* Perform phase2 of the recovery process
* Phase2 takes a snapshot of the current translog *without* acquiring the
* write lock (however, the translog snapshot is a point-in-time view of
* the translog). It then sends each translog operation to the target node
* so it can be replayed into the new shard.
* {@code InternalEngine#recover} is responsible for taking the snapshot
* of the translog and releasing it once all 3 phases of recovery are complete
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
logger.trace("{} recovery [phase2] to {}: start", request.shardId(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
// Send a request preparing the new shard's translog to receive
// operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()),
TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
response.startTime = stopWatch.totalTime().millis();
logger.trace("{} recovery [phase2] to {}: start took [{}]",
request.shardId(), request.targetNode(), request.targetNode(), stopWatch.totalTime());
logger.trace("{} recovery [phase2] to {}: updating current mapping to master", request.shardId(), request.targetNode());
// Ensure that the mappings are synced with the master node
logger.trace("{} recovery [phase2] to {}: sending transaction log operations", request.shardId(), request.targetNode());
stopWatch = new StopWatch().start();
// Send all the snapshot's translog operations to the target
int totalOperations = sendSnapshot(snapshot);
logger.trace("{} recovery [phase2] to {}: took [{}]", request.shardId(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
* Perform phase 3 of the recovery process
* Phase3 again takes a snapshot of the translog, however this time the
* snapshot is acquired under a write lock. The translog operations are
* sent to the target node where they are replayed.
* {@code InternalEngine#recover} is responsible for taking the snapshot
* of the translog, and after phase 3 completes the snapshots from all
* three phases are released.
public void phase3(Translog.Snapshot snapshot) throws ElasticsearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
StopWatch stopWatch = new StopWatch().start();
// Send the translog operations to the target node
int totalOperations = sendSnapshot(snapshot);
// Send the FINALIZE request to the target node. The finalize request
// clears unreferenced translog files, refreshes the engine now that
// new segments are available, and enables garbage collection of
// tombstone files. The shard is also moved to the POST_RECOVERY phase
// during this time
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
shard.relocated("to " + request.targetNode());
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]",
indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase3Time = stopWatch.totalTime().millis();
response.phase3Operations = totalOperations;
* Ensures that the mapping in the cluster state is the same as the mapping
* in our mapper service. If the mapping is not in sync, sends a request
* to update it in the cluster state and blocks until it has finished
* being updated.
private void updateMappingOnMaster() {
// we test that the cluster state is in sync with our in memory mapping stored by the mapperService
// we have to do it under the "cluster state update" thread to make sure that one doesn't modify it
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {
MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
return currentState;
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
try {
} catch (InterruptedException e) {
if (documentMappersToUpdate.isEmpty()) {
final CountDownLatch updatedOnMaster = new CountDownLatch(documentMappersToUpdate.size());
MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() {
public void onMappingUpdate() {
public void onFailure(Throwable t) {
logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t);
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper, indexService.indexUUID(), listener);
try {
if (!updatedOnMaster.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.debug("[{}][{}] recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]",
indexName, shardId, request.targetNode(), internalActionTimeout);
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping to update on master");
* Send the given snapshot's operations to this handler's target node.
* Operations are bulked into a single request depending on an operation
* count limit or size-in-bytes limit
* @return the total number of translog operations that were sent
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
int ops = 0;
long size = 0;
int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList();
Translog.Operation operation =;
TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
while (operation != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
ops += 1;
size += operation.estimateSize();
// Check if this request is past the size or bytes threshold, and
// if so, send it off
if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
// don't throttle translog, since we lock for phase3 indexing,
// so we need to move it as fast as possible. Note, since we
// index docs to replicas while the index files are recovered
// the lock can potentially be removed, in which case, it might
// make sense to re-enable throttling in this phase
// if (recoverySettings.rateLimiter() != null) {
// recoverySettings.rateLimiter().pause(size);
// }
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0;
size = 0;
operation =;
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
return totalOperations;