From fefcfb5b5c2fcc5d3b14bed903b1d1c46c3d1a74 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 18 Jul 2010 22:54:21 +0300 Subject: [PATCH] refactor recovery to be handled on the node level (and not per shard), with better retry mechanism when doing peer shard recovery --- .../index/engine/robin/RobinEngine.java | 57 +- .../gateway/IndexShardGatewayService.java | 131 +-- .../index/service/InternalIndexService.java | 4 - .../index/shard/IndexShardModule.java | 3 - .../RecoverFilesRecoveryException.java | 2 +- .../index/shard/recovery/RecoveryAction.java | 837 ------------------ .../recovery/RecoveryCleanFilesRequest.java | 72 ++ .../recovery/RecoveryFailedException.java | 10 +- .../recovery/RecoveryFileChunkRequest.java | 101 +++ .../RecoveryFinalizeRecoveryRequest.java | 54 ++ ...ryPrepareForTranslogOperationsRequest.java | 54 ++ .../shard/recovery/RecoveryResponse.java | 117 +++ .../index/shard/recovery/RecoverySource.java | 294 ++++++ .../index/shard/recovery/RecoveryTarget.java | 342 +++++++ .../RecoveryTranslogOperationsRequest.java | 73 ++ .../shard/recovery/StartRecoveryRequest.java | 111 +++ .../elasticsearch/index/store/fs/FsStore.java | 27 +- .../elasticsearch/indices/IndicesModule.java | 7 + .../cluster/IndicesClusterStateService.java | 170 ++-- 19 files changed, 1468 insertions(+), 998 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryCleanFilesRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFinalizeRecoveryRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryPrepareForTranslogOperationsRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 9154d42a721..c68a32524b3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -133,34 +133,39 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } @Override public void start() throws EngineException { - if (indexWriter != null) { - throw new EngineAlreadyStartedException(shardId); - } - if (logger.isDebugEnabled()) { - logger.debug("Starting engine with ram_buffer_size[" + indexingBufferSize + "], refresh_interval[" + refreshInterval + "]"); - } + rwl.writeLock().lock(); try { - this.indexWriter = createWriter(); - } catch (IOException e) { - throw new EngineCreationFailureException(shardId, "Failed to create engine", e); - } - - try { - translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); - this.nrtResource = buildNrtResource(indexWriter); - } catch (IOException e) { + if (indexWriter != null) { + throw new EngineAlreadyStartedException(shardId); + } + if (logger.isDebugEnabled()) { + logger.debug("Starting engine with ram_buffer_size[" + indexingBufferSize + "], refresh_interval[" + refreshInterval + "]"); + } try { - indexWriter.rollback(); - } catch (IOException e1) { - // ignore - } finally { + this.indexWriter = createWriter(); + } catch (IOException e) { + throw new EngineCreationFailureException(shardId, "Failed to create engine", e); + } + + try { + translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); + this.nrtResource = buildNrtResource(indexWriter); + } catch (IOException e) { try { - indexWriter.close(); + indexWriter.rollback(); } catch (IOException e1) { // ignore + } finally { + try { + indexWriter.close(); + } catch (IOException e1) { + // ignore + } } + throw new EngineCreationFailureException(shardId, "Failed to open reader on writer", e); } - throw new EngineCreationFailureException(shardId, "Failed to open reader on writer", e); + } finally { + rwl.writeLock().unlock(); } } @@ -461,10 +466,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } closed = true; rwl.writeLock().lock(); - if (nrtResource != null) { - this.nrtResource.forceClose(); - } try { + if (nrtResource != null) { + this.nrtResource.forceClose(); + } // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed if (indexWriter != null) { try { @@ -486,8 +491,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, try { // release locks when started if (IndexWriter.isLocked(store.directory())) { - logger.trace("Shard is locked, releasing lock"); - store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME); + logger.warn("shard is locked, releasing lock"); + IndexWriter.unlock(store.directory()); } boolean create = !IndexReader.indexExists(store.directory()); indexWriter = new IndexWriter(store.directory(), diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 30e341ad061..fd2a76aca51 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.gateway; -import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.CloseableIndexComponent; import org.elasticsearch.common.inject.Inject; @@ -94,79 +93,97 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem scheduleSnapshotIfNeeded(); } + public static interface RecoveryListener { + void onRecoveryDone(); + + void onIgnoreRecovery(String reason); + + void onRecoveryFailed(IndexShardGatewayRecoveryException e); + } + /** * Recovers the state of the shard from the gateway. */ - public synchronized void recover() throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException { + public void recover(final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException { if (!recovered.compareAndSet(false, true)) { - throw new IgnoreGatewayRecoveryException(shardId, "already recovered"); + listener.onIgnoreRecovery("already recovered"); + return; } if (indexShard.state() == IndexShardState.CLOSED) { // got closed on us, just ignore this recovery - throw new IgnoreGatewayRecoveryException(shardId, "shard closed"); + listener.onIgnoreRecovery("shard closed"); + return; } if (!indexShard.routingEntry().primary()) { - throw new ElasticSearchIllegalStateException("Trying to recover when the shard is in backup state"); + listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Trying to recover when the shard is in backup state", null)); + return; } - indexShard.recovering(); + threadPool.execute(new Runnable() { + @Override public void run() { - StopWatch throttlingWaitTime = new StopWatch().start(); - // we know we are on a thread, we can spin till we can engage in recovery - while (!recoveryThrottler.tryRecovery(shardId, "gateway")) { - try { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } catch (InterruptedException e) { - if (indexShard.ignoreRecoveryAttempt()) { - throw new IgnoreGatewayRecoveryException(shardId, "Interrupted while waiting for recovery, but we should ignore ..."); + indexShard.recovering(); + + StopWatch throttlingWaitTime = new StopWatch().start(); + // we know we are on a thread, we can spin till we can engage in recovery + while (!recoveryThrottler.tryRecovery(shardId, "gateway")) { + try { + Thread.sleep(recoveryThrottler.throttleInterval().millis()); + } catch (InterruptedException e) { + if (indexShard.ignoreRecoveryAttempt()) { + listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore ..."); + return; + } + listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e)); + } } - // we got interrupted, mark it as failed - throw new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e); - } - } - throttlingWaitTime.stop(); + throttlingWaitTime.stop(); - try { - logger.debug("starting recovery from {}", shardGateway); - StopWatch stopWatch = new StopWatch().start(); - IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover(); + try { + logger.debug("starting recovery from {}", shardGateway); + StopWatch stopWatch = new StopWatch().start(); + IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover(); - lastIndexVersion = recoveryStatus.index().version(); - lastTranslogId = -1; - lastTranslogPosition = 0; - lastTranslogLength = 0; + lastIndexVersion = recoveryStatus.index().version(); + lastTranslogId = -1; + lastTranslogPosition = 0; + lastTranslogLength = 0; - // start the shard if the gateway has not started it already - if (indexShard.state() != IndexShardState.STARTED) { - indexShard.start(); + // start the shard if the gateway has not started it already + if (indexShard.state() != IndexShardState.STARTED) { + indexShard.start(); + } + stopWatch.stop(); + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); + sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n"); + sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n"); + sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]"); + logger.debug(sb.toString()); + } + // refresh the shard + indexShard.refresh(new Engine.Refresh(false)); + listener.onRecoveryDone(); + scheduleSnapshotIfNeeded(); + } catch (IndexShardGatewayRecoveryException e) { + if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { + // got closed on us, just ignore this recovery + listener.onIgnoreRecovery("shard closed"); + return; + } + listener.onRecoveryFailed(e); + } catch (IndexShardClosedException e) { + listener.onIgnoreRecovery("shard closed"); + } catch (IndexShardNotStartedException e) { + listener.onIgnoreRecovery("shard closed"); + } catch (Exception e) { + listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e)); + } finally { + recoveryThrottler.recoveryDone(shardId, "gateway"); + } } - stopWatch.stop(); - if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); - sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n"); - sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n"); - sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]"); - logger.debug(sb.toString()); - } - // refresh the shard - indexShard.refresh(new Engine.Refresh(false)); - scheduleSnapshotIfNeeded(); - } catch (IndexShardGatewayRecoveryException e) { - if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { - // got closed on us, just ignore this recovery - throw new IgnoreGatewayRecoveryException(shardId, "shard closed"); - } - throw e; - } catch (IndexShardClosedException e) { - // got closed on us, just ignore this recovery - throw new IgnoreGatewayRecoveryException(shardId, "shard closed"); - } catch (IndexShardNotStartedException e) { - // got closed on us, just ignore this recovery - throw new IgnoreGatewayRecoveryException(shardId, "shard closed"); - } finally { - recoveryThrottler.recoveryDone(shardId, "gateway"); - } + }); } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 08921e2330a..066023de096 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -51,7 +51,6 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShardManagement; import org.elasticsearch.index.shard.IndexShardModule; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.recovery.RecoveryAction; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; @@ -295,9 +294,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde shardInjector.getInstance(IndexShardManagement.class).close(); } - RecoveryAction recoveryAction = shardInjector.getInstance(RecoveryAction.class); - if (recoveryAction != null) recoveryAction.close(); - // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index 19b1dac2641..92ec02edfe5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.shard; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.index.shard.recovery.RecoveryAction; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; @@ -39,7 +38,5 @@ public class IndexShardModule extends AbstractModule { bind(ShardId.class).toInstance(shardId); bind(IndexShard.class).to(InternalIndexShard.class).asEagerSingleton(); bind(IndexShardManagement.class).asEagerSingleton(); - - bind(RecoveryAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverFilesRecoveryException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverFilesRecoveryException.java index 514daa72e35..8df96ac8623 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverFilesRecoveryException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverFilesRecoveryException.java @@ -24,7 +24,7 @@ import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.ShardId; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class RecoverFilesRecoveryException extends IndexShardException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java deleted file mode 100644 index efa660f9979..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ /dev/null @@ -1,837 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.shard.recovery; - -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.ElasticSearchInterruptedException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.StopWatch; -import org.elasticsearch.common.collect.Lists; -import org.elasticsearch.common.collect.Maps; -import org.elasticsearch.common.collect.Sets; -import org.elasticsearch.common.component.CloseableComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.io.stream.VoidStreamable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.*; -import org.elasticsearch.index.shard.service.IndexShard; -import org.elasticsearch.index.shard.service.InternalIndexShard; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogStreams; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static java.util.concurrent.TimeUnit.*; -import static org.elasticsearch.common.unit.TimeValue.*; -import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*; - -/** - * @author kimchy (shay.banon) - */ -public class RecoveryAction extends AbstractIndexShardComponent implements CloseableComponent { - - private final ByteSizeValue fileChunkSize; - - private final ThreadPool threadPool; - - private final TransportService transportService; - - private final InternalIndexShard indexShard; - - private final Store store; - - private final RecoveryThrottler recoveryThrottler; - - private final ConcurrentMap openIndexOutputs = newConcurrentMap(); - - private final String startTransportAction; - - private final String fileChunkTransportAction; - - private final String cleanFilesTransportAction; - - private final String prepareForTranslogOperationsTransportAction; - - private final String translogOperationsTransportAction; - - private final String finalizeRecoveryTransportAction; - - private volatile boolean closed = false; - - private volatile Thread sendStartRecoveryThread; - - private volatile Thread receiveSnapshotRecoveryThread; - - private volatile Thread sendSnapshotRecoveryThread; - - private final CopyOnWriteArrayList sendFileChunksRecoveryFutures = new CopyOnWriteArrayList(); - - @Inject public RecoveryAction(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, TransportService transportService, - IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { - super(shardId, indexSettings); - this.threadPool = threadPool; - this.transportService = transportService; - this.indexShard = (InternalIndexShard) indexShard; - this.store = store; - this.recoveryThrottler = recoveryThrottler; - - startTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/start"; - transportService.registerHandler(startTransportAction, new StartRecoveryTransportRequestHandler()); - - fileChunkTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/fileChunk"; - transportService.registerHandler(fileChunkTransportAction, new FileChunkTransportRequestHandler()); - - cleanFilesTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/cleanFiles"; - transportService.registerHandler(cleanFilesTransportAction, new CleanFilesRequestHandler()); - - prepareForTranslogOperationsTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/prepareForTranslog"; - transportService.registerHandler(prepareForTranslogOperationsTransportAction, new PrepareForTranslogOperationsRequestHandler()); - - translogOperationsTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/translogOperations"; - transportService.registerHandler(translogOperationsTransportAction, new TranslogOperationsRequestHandler()); - - finalizeRecoveryTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/finalizeRecovery"; - transportService.registerHandler(finalizeRecoveryTransportAction, new FinalizeRecoveryRequestHandler()); - - this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); - logger.trace("recovery action registered, using file_chunk_size[{}]", fileChunkSize); - } - - public void close() { - closed = true; - transportService.removeHandler(startTransportAction); - transportService.removeHandler(fileChunkTransportAction); - transportService.removeHandler(cleanFilesTransportAction); - transportService.removeHandler(prepareForTranslogOperationsTransportAction); - transportService.removeHandler(translogOperationsTransportAction); - transportService.removeHandler(finalizeRecoveryTransportAction); - - cleanOpenIndex(); - - // interrupt the startRecovery thread if its performing recovery - if (sendStartRecoveryThread != null) { - sendStartRecoveryThread.interrupt(); - } - if (receiveSnapshotRecoveryThread != null) { - receiveSnapshotRecoveryThread.interrupt(); - } - if (sendSnapshotRecoveryThread != null) { - sendSnapshotRecoveryThread.interrupt(); - } - for (Future future : sendFileChunksRecoveryFutures) { - future.cancel(true); - } - } - - public synchronized void startRecovery(DiscoveryNode node, DiscoveryNode targetNode, boolean markAsRelocated) throws ElasticSearchException { - if (targetNode == null) { - throw new IgnoreRecoveryException("No node to recovery from, retry next time..."); - } - sendStartRecoveryThread = Thread.currentThread(); - try { - // mark the shard as recovering - IndexShardState preRecoveringState; - try { - preRecoveringState = indexShard.recovering(); - } catch (IndexShardRecoveringException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already recovering - throw new IgnoreRecoveryException("Already in recovering process", e); - } catch (IndexShardStartedException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already started - throw new IgnoreRecoveryException("Already in recovering process", e); - } catch (IndexShardRelocatedException e) { - // that's fine, since we might be called concurrently, just ignore this, we are already relocated - throw new IgnoreRecoveryException("Already in recovering process", e); - } catch (IndexShardClosedException e) { - throw new IgnoreRecoveryException("Can't recover a closed shard.", e); - } - - // we know we are on a thread, we can spin till we can engage in recovery - StopWatch throttlingWaitTime = new StopWatch().start(); - while (!recoveryThrottler.tryRecovery(shardId, "peer recovery target")) { - try { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } catch (InterruptedException e) { - if (indexShard.ignoreRecoveryAttempt()) { - throw new IgnoreRecoveryException("Interrupted while waiting for recovery, but we should ignore ..."); - } - // we got interrupted, mark it as failed - throw new RecoveryFailedException(shardId, node, targetNode, e); - } - } - throttlingWaitTime.stop(); - - try { - if (closed) { - throw new IgnoreRecoveryException("Recovery closed"); - } - - logger.debug("starting recovery from {}", targetNode); - // build a list of the current files located locally, maybe we don't need to recover them... - StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(node, markAsRelocated, store.listWithMd5()); - - StopWatch stopWatch = null; - RecoveryStatus recoveryStatus = null; - boolean retry = true; - while (retry) { - stopWatch = new StopWatch().start(); - recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, startRecoveryRequest, new FutureTransportResponseHandler() { - @Override public RecoveryStatus newInstance() { - return new RecoveryStatus(); - } - }).txGet(); - retry = recoveryStatus.retry; - if (retry) { - try { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } catch (InterruptedException e) { - if (indexShard.ignoreRecoveryAttempt()) { - throw new IgnoreRecoveryException("Interrupted while waiting for remote recovery, but we should ignore ..."); - } - // we got interrupted, mark it as failed - throw new RecoveryFailedException(shardId, node, targetNode, e); - } - } - } - stopWatch.stop(); - if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("recovery completed from ").append(targetNode).append(", took[").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); - sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]") - .append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']') - .append("\n"); - sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n"); - sb.append(" phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations") - .append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]") - .append("\n"); - sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations") - .append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]"); - logger.debug(sb.toString()); - } - } catch (RemoteTransportException e) { - if (closed) { - throw new IgnoreRecoveryException("Recovery closed", e); - } - logger.trace("recovery from [{}] failed", e, targetNode); - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof ActionNotFoundTransportException || cause instanceof IndexShardNotStartedException) { - // the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering - indexShard.restoreRecoveryState(preRecoveringState); - throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e); - } else if (cause instanceof RecoveryEngineException) { - // it might be wrapped - if (cause.getCause() instanceof IgnoreRecoveryException) { - throw (IgnoreRecoveryException) cause.getCause(); - } - } else if (cause instanceof IgnoreRecoveryException) { - throw (IgnoreRecoveryException) cause; - } else if ((cause instanceof NodeNotConnectedException) || (cause instanceof NodeDisconnectedException)) { - throw new IgnoreRecoveryException("Ignore recovery attempt, remote node not connected", e); - } - throw new RecoveryFailedException(shardId, node, targetNode, e); - } catch (Exception e) { - if (closed) { - throw new IgnoreRecoveryException("Recovery closed", e); - } - throw new RecoveryFailedException(shardId, node, targetNode, e); - } finally { - recoveryThrottler.recoveryDone(shardId, "peer recovery target"); - } - } finally { - sendStartRecoveryThread = null; - } - } - - private void cleanOpenIndex() { - for (IndexOutput indexOutput : openIndexOutputs.values()) { - try { - synchronized (indexOutput) { - indexOutput.close(); - } - } catch (Exception e) { - // ignore - } - } - openIndexOutputs.clear(); - } - - static class StartRecoveryRequest implements Streamable { - - DiscoveryNode node; - - boolean markAsRelocated; - - // name -> (md5, size) - Map existingFiles; - - private StartRecoveryRequest() { - } - - private StartRecoveryRequest(DiscoveryNode node, boolean markAsRelocated, Map existingFiles) { - this.node = node; - this.markAsRelocated = markAsRelocated; - this.existingFiles = existingFiles; - } - - @Override public void readFrom(StreamInput in) throws IOException { - node = DiscoveryNode.readNode(in); - markAsRelocated = in.readBoolean(); - int size = in.readVInt(); - existingFiles = Maps.newHashMapWithExpectedSize(size); - for (int i = 0; i < size; i++) { - StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in); - existingFiles.put(md.name(), md); - } - } - - @Override public void writeTo(StreamOutput out) throws IOException { - node.writeTo(out); - out.writeBoolean(markAsRelocated); - out.writeVInt(existingFiles.size()); - for (StoreFileMetaData md : existingFiles.values()) { - md.writeTo(out); - } - } - } - - private class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler { - - @Override public StartRecoveryRequest newInstance() { - return new StartRecoveryRequest(); - } - - @Override public void messageReceived(final StartRecoveryRequest startRecoveryRequest, final TransportChannel channel) throws Exception { - if (!recoveryThrottler.tryRecovery(shardId, "peer recovery source")) { - RecoveryStatus retry = new RecoveryStatus(); - retry.retry = true; - channel.sendResponse(retry); - return; - } - try { - logger.trace("starting recovery to {}, mark_as_relocated {}", startRecoveryRequest.node, startRecoveryRequest.markAsRelocated); - final DiscoveryNode node = startRecoveryRequest.node; - cleanOpenIndex(); - final RecoveryStatus recoveryStatus = new RecoveryStatus(); - indexShard.recover(new Engine.RecoveryHandler() { - @Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException { - long totalSize = 0; - long existingTotalSize = 0; - try { - StopWatch stopWatch = new StopWatch().start(); - - for (String name : snapshot.getFiles()) { - StoreFileMetaData md = store.metaDataWithMd5(name); - boolean useExisting = false; - if (startRecoveryRequest.existingFiles.containsKey(name)) { - if (md.md5().equals(startRecoveryRequest.existingFiles.get(name).md5())) { - recoveryStatus.phase1ExistingFileNames.add(name); - recoveryStatus.phase1ExistingFileSizes.add(md.sizeInBytes()); - existingTotalSize += md.sizeInBytes(); - useExisting = true; - if (logger.isTraceEnabled()) { - logger.trace("recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", node, name, md.md5()); - } - } - } - if (!useExisting) { - if (startRecoveryRequest.existingFiles.containsKey(name)) { - logger.trace("recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", node, name, startRecoveryRequest.existingFiles.get(name).md5(), md.md5()); - } else { - logger.trace("recovery [phase1] to {}: recovering [{}], does not exists in remote", node, name); - } - recoveryStatus.phase1FileNames.add(name); - recoveryStatus.phase1FileSizes.add(md.sizeInBytes()); - totalSize += md.sizeInBytes(); - } - } - recoveryStatus.phase1TotalSize = totalSize; - recoveryStatus.phase1ExistingTotalSize = existingTotalSize; - - final AtomicLong throttlingWaitTime = new AtomicLong(); - - logger.trace("recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", node, recoveryStatus.phase1FileNames.size(), new ByteSizeValue(totalSize), recoveryStatus.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); - - final CountDownLatch latch = new CountDownLatch(recoveryStatus.phase1FileNames.size()); - final AtomicReference lastException = new AtomicReference(); - for (final String name : recoveryStatus.phase1FileNames) { - sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() { - @Override public void run() { - IndexInput indexInput = null; - try { - long throttlingStartTime = System.currentTimeMillis(); - while (!recoveryThrottler.tryStream(shardId, name)) { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } - throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime); - - final int BUFFER_SIZE = (int) fileChunkSize.bytes(); - byte[] buf = new byte[BUFFER_SIZE]; - indexInput = snapshot.getDirectory().openInput(name); - long len = indexInput.length(); - long readCount = 0; - while (readCount < len) { - int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; - long position = indexInput.getFilePointer(); - indexInput.readBytes(buf, 0, toRead, false); - transportService.submitRequest(node, fileChunkTransportAction, new FileChunk(name, position, len, buf, toRead), VoidTransportResponseHandler.INSTANCE).txGet(120, SECONDS); - readCount += toRead; - } - indexInput.close(); - } catch (Exception e) { - lastException.set(e); - } finally { - recoveryThrottler.streamDone(shardId, name); - if (indexInput != null) { - try { - indexInput.close(); - } catch (IOException e) { - // ignore - } - } - latch.countDown(); - } - } - })); - } - - latch.await(); - - if (lastException.get() != null) { - throw lastException.get(); - } - - // now, set the clean files request - CleanFilesRequest cleanFilesRequest = new CleanFilesRequest(); - cleanFilesRequest.snapshotFiles.addAll(Arrays.asList(snapshot.getFiles())); - transportService.submitRequest(node, cleanFilesTransportAction, cleanFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet(); - - stopWatch.stop(); - logger.trace("recovery [phase1] to {}: took [{}], throttling_wait [{}]", node, stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get())); - recoveryStatus.phase1Time = stopWatch.totalTime().millis(); - } catch (ElasticSearchInterruptedException e) { - // we got interrupted since we are closing, ignore the recovery - throw new IgnoreRecoveryException("Interrupted while recovering files"); - } catch (Throwable e) { - throw new RecoverFilesRecoveryException(shardId, recoveryStatus.phase1FileNames.size(), new ByteSizeValue(totalSize), e); - } finally { - sendFileChunksRecoveryFutures.clear(); - } - } - - @Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException { - sendSnapshotRecoveryThread = Thread.currentThread(); - try { - if (closed) { - throw new IndexShardClosedException(shardId); - } - logger.trace("recovery [phase2] to {}: sending transaction log operations", node); - StopWatch stopWatch = new StopWatch().start(); - - transportService.submitRequest(node, prepareForTranslogOperationsTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet(); - - int totalOperations = sendSnapshot(snapshot); - - stopWatch.stop(); - logger.trace("recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime()); - recoveryStatus.phase2Time = stopWatch.totalTime().millis(); - recoveryStatus.phase2Operations = totalOperations; - } catch (ElasticSearchInterruptedException e) { - // we got interrupted since we are closing, ignore the recovery - throw new IgnoreRecoveryException("Interrupted in phase 2 files"); - } finally { - sendSnapshotRecoveryThread = null; - } - } - - @Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException { - sendSnapshotRecoveryThread = Thread.currentThread(); - try { - if (closed) { - throw new IndexShardClosedException(shardId); - } - logger.trace("recovery [phase3] to {}: sending transaction log operations", node); - StopWatch stopWatch = new StopWatch().start(); - int totalOperations = sendSnapshot(snapshot); - transportService.submitRequest(node, finalizeRecoveryTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet(); - if (startRecoveryRequest.markAsRelocated) { - // TODO what happens if the recovery process fails afterwards, we need to mark this back to started - try { - indexShard.relocated(); - } catch (IllegalIndexShardStateException e) { - // we can ignore this exception since, on the other node, when it moved to phase3 - // it will also send shard started, which might cause the index shard we work against - // to move be closed by the time we get to the the relocated method - } - } - stopWatch.stop(); - logger.trace("recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime()); - recoveryStatus.phase3Time = stopWatch.totalTime().millis(); - recoveryStatus.phase3Operations = totalOperations; - } catch (ElasticSearchInterruptedException e) { - // we got interrupted since we are closing, ignore the recovery - throw new IgnoreRecoveryException("Interrupted in phase 2 files"); - } finally { - sendSnapshotRecoveryThread = null; - } - } - - private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException { - TranslogOperationsRequest request = new TranslogOperationsRequest(); - int translogBatchSize = 10; // TODO make this configurable - int counter = 0; - int totalOperations = 0; - while (snapshot.hasNext()) { - request.operations.add(snapshot.next()); - totalOperations++; - if (++counter == translogBatchSize) { - transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet(); - counter = 0; - request.operations.clear(); - } - } - // send the leftover - if (!request.operations.isEmpty()) { - transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet(); - } - return totalOperations; - } - }); - channel.sendResponse(recoveryStatus); - } finally { - recoveryThrottler.recoveryDone(shardId, "peer recovery source"); - } - } - } - - private static class RecoveryStatus implements Streamable { - - boolean retry = false; - List phase1FileNames = Lists.newArrayList(); - List phase1FileSizes = Lists.newArrayList(); - List phase1ExistingFileNames = Lists.newArrayList(); - List phase1ExistingFileSizes = Lists.newArrayList(); - long phase1TotalSize; - long phase1ExistingTotalSize; - long phase1Time; - long phase1ThrottlingWaitTime; - - int phase2Operations; - long phase2Time; - - int phase3Operations; - long phase3Time; - - private RecoveryStatus() { - } - - @Override public void readFrom(StreamInput in) throws IOException { - retry = in.readBoolean(); - int size = in.readVInt(); - phase1FileNames = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - phase1FileNames.add(in.readUTF()); - } - size = in.readVInt(); - phase1FileSizes = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - phase1FileSizes.add(in.readVLong()); - } - - size = in.readVInt(); - phase1ExistingFileNames = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - phase1ExistingFileNames.add(in.readUTF()); - } - size = in.readVInt(); - phase1ExistingFileSizes = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - phase1ExistingFileSizes.add(in.readVLong()); - } - - phase1TotalSize = in.readVLong(); - phase1ExistingTotalSize = in.readVLong(); - phase1Time = in.readVLong(); - phase1ThrottlingWaitTime = in.readVLong(); - phase2Operations = in.readVInt(); - phase2Time = in.readVLong(); - phase3Operations = in.readVInt(); - phase3Time = in.readVLong(); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(retry); - out.writeVInt(phase1FileNames.size()); - for (String name : phase1FileNames) { - out.writeUTF(name); - } - out.writeVInt(phase1FileSizes.size()); - for (long size : phase1FileSizes) { - out.writeVLong(size); - } - - out.writeVInt(phase1ExistingFileNames.size()); - for (String name : phase1ExistingFileNames) { - out.writeUTF(name); - } - out.writeVInt(phase1ExistingFileSizes.size()); - for (long size : phase1ExistingFileSizes) { - out.writeVLong(size); - } - - out.writeVLong(phase1TotalSize); - out.writeVLong(phase1ExistingTotalSize); - out.writeVLong(phase1Time); - out.writeVLong(phase1ThrottlingWaitTime); - out.writeVInt(phase2Operations); - out.writeVLong(phase2Time); - out.writeVInt(phase3Operations); - out.writeVLong(phase3Time); - } - } - - static class CleanFilesRequest implements Streamable { - - Set snapshotFiles = Sets.newHashSet(); - - @Override public void readFrom(StreamInput in) throws IOException { - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - snapshotFiles.add(in.readUTF()); - } - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(snapshotFiles.size()); - for (String snapshotFile : snapshotFiles) { - out.writeUTF(snapshotFile); - } - } - } - - class CleanFilesRequestHandler extends BaseTransportRequestHandler { - - @Override public CleanFilesRequest newInstance() { - return new CleanFilesRequest(); - } - - @Override public void messageReceived(CleanFilesRequest request, TransportChannel channel) throws Exception { - receiveSnapshotRecoveryThread = Thread.currentThread(); - try { - for (String existingFile : store.directory().listAll()) { - if (!request.snapshotFiles.contains(existingFile)) { - store.directory().deleteFile(existingFile); - } - } - channel.sendResponse(VoidStreamable.INSTANCE); - } finally { - receiveSnapshotRecoveryThread = null; - } - } - } - - - class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler { - - @Override public VoidStreamable newInstance() { - return VoidStreamable.INSTANCE; - } - - @Override public void messageReceived(VoidStreamable stream, TransportChannel channel) throws Exception { - receiveSnapshotRecoveryThread = Thread.currentThread(); - try { - indexShard.performRecoveryPrepareForTranslog(); - channel.sendResponse(VoidStreamable.INSTANCE); - } finally { - receiveSnapshotRecoveryThread = null; - } - } - } - - class FinalizeRecoveryRequestHandler extends BaseTransportRequestHandler { - - @Override public VoidStreamable newInstance() { - return VoidStreamable.INSTANCE; - } - - @Override public void messageReceived(VoidStreamable stream, TransportChannel channel) throws Exception { - receiveSnapshotRecoveryThread = Thread.currentThread(); - try { - indexShard.performRecoveryFinalization(false); - channel.sendResponse(VoidStreamable.INSTANCE); - } finally { - receiveSnapshotRecoveryThread = null; - } - } - } - - class TranslogOperationsRequestHandler extends BaseTransportRequestHandler { - - @Override public TranslogOperationsRequest newInstance() { - return new TranslogOperationsRequest(); - } - - @Override public void messageReceived(TranslogOperationsRequest snapshot, TransportChannel channel) throws Exception { - receiveSnapshotRecoveryThread = Thread.currentThread(); - try { - if (closed) { - throw new IndexShardClosedException(shardId); - } - for (Translog.Operation operation : snapshot.operations) { - indexShard.performRecoveryOperation(operation); - } - channel.sendResponse(VoidStreamable.INSTANCE); - } finally { - receiveSnapshotRecoveryThread = null; - } - } - } - - static class TranslogOperationsRequest implements Streamable { - - List operations = Lists.newArrayList(); - - TranslogOperationsRequest() { - } - - @Override public void readFrom(StreamInput in) throws IOException { - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - operations.add(TranslogStreams.readTranslogOperation(in)); - } - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(operations.size()); - for (Translog.Operation operation : operations) { - TranslogStreams.writeTranslogOperation(out, operation); - } - } - } - - private class FileChunkTransportRequestHandler extends BaseTransportRequestHandler { - - @Override public FileChunk newInstance() { - return new FileChunk(); - } - - @Override public void messageReceived(FileChunk request, TransportChannel channel) throws Exception { - if (closed) { - throw new IndexShardClosedException(shardId); - } - IndexOutput indexOutput; - if (request.position == 0) { - // first request - indexOutput = openIndexOutputs.remove(request.name); - if (indexOutput != null) { - try { - indexOutput.close(); - } catch (IOException e) { - // ignore - } - } - indexOutput = store.directory().createOutput(request.name); - openIndexOutputs.put(request.name, indexOutput); - } else { - indexOutput = openIndexOutputs.get(request.name); - } - synchronized (indexOutput) { - try { - indexOutput.writeBytes(request.content, request.contentLength); - if (indexOutput.getFilePointer() == request.length) { - // we are done - indexOutput.close(); - openIndexOutputs.remove(request.name); - } - } catch (IOException e) { - openIndexOutputs.remove(request.name); - try { - indexOutput.close(); - } catch (IOException e1) { - // ignore - } - } - } - channel.sendResponse(VoidStreamable.INSTANCE); - } - } - - private static class FileChunk implements Streamable { - String name; - long position; - long length; - byte[] content; - int contentLength; - - private FileChunk() { - } - - private FileChunk(String name, long position, long length, byte[] content, int contentLength) { - this.name = name; - this.position = position; - this.length = length; - this.content = content; - this.contentLength = contentLength; - } - - @Override public void readFrom(StreamInput in) throws IOException { - name = in.readUTF(); - position = in.readVLong(); - length = in.readVLong(); - contentLength = in.readVInt(); - content = new byte[contentLength]; - in.readFully(content); - } - - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeUTF(name); - out.writeVLong(position); - out.writeVLong(length); - out.writeVInt(contentLength); - out.writeBytes(content, 0, contentLength); - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryCleanFilesRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryCleanFilesRequest.java new file mode 100644 index 00000000000..185381279a1 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryCleanFilesRequest.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Set; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryCleanFilesRequest implements Streamable { + + private ShardId shardId; + + private Set snapshotFiles; + + RecoveryCleanFilesRequest() { + } + + RecoveryCleanFilesRequest(ShardId shardId, Set snapshotFiles) { + this.shardId = shardId; + this.snapshotFiles = snapshotFiles; + } + + public ShardId shardId() { + return shardId; + } + + public Set snapshotFiles() { + return snapshotFiles; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + int size = in.readVInt(); + snapshotFiles = Sets.newHashSetWithExpectedSize(size); + for (int i = 0; i < size; i++) { + snapshotFiles.add(in.readUTF()); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeVInt(snapshotFiles.size()); + for (String snapshotFile : snapshotFiles) { + out.writeUTF(snapshotFile); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFailedException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFailedException.java index 3ce9bea28f1..f823046b518 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFailedException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFailedException.java @@ -24,11 +24,15 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.index.shard.ShardId; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class RecoveryFailedException extends ElasticSearchException { - public RecoveryFailedException(ShardId shardId, DiscoveryNode node, DiscoveryNode targetNode, Throwable cause) { - super(shardId + ": Recovery failed from " + targetNode + " into " + node, cause); + public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) { + this(request.shardId(), request.sourceNode(), request.targetNode(), cause); + } + + public RecoveryFailedException(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Throwable cause) { + super(shardId + ": Recovery failed from " + sourceNode + " into " + targetNode, cause); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java new file mode 100644 index 00000000000..8843a694bdc --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryFileChunkRequest implements Streamable { + + private ShardId shardId; + private String name; + private long position; + private long length; + private byte[] content; + private int contentLength; + + RecoveryFileChunkRequest() { + } + + RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, byte[] content, int contentLength) { + this.shardId = shardId; + this.name = name; + this.position = position; + this.length = length; + this.content = content; + this.contentLength = contentLength; + } + + public ShardId shardId() { + return shardId; + } + + public String name() { + return name; + } + + public long position() { + return position; + } + + public long length() { + return length; + } + + public byte[] content() { + return content; + } + + public int contentLength() { + return contentLength; + } + + public RecoveryFileChunkRequest readFileChunk(StreamInput in) throws IOException { + RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(); + request.readFrom(in); + return request; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + name = in.readUTF(); + position = in.readVLong(); + length = in.readVLong(); + contentLength = in.readVInt(); + content = new byte[contentLength]; + in.readFully(content); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeUTF(name); + out.writeVLong(position); + out.writeVLong(length); + out.writeVInt(contentLength); + out.writeBytes(content, 0, contentLength); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFinalizeRecoveryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFinalizeRecoveryRequest.java new file mode 100644 index 00000000000..67ec3c21179 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFinalizeRecoveryRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryFinalizeRecoveryRequest implements Streamable { + + private ShardId shardId; + + RecoveryFinalizeRecoveryRequest() { + } + + RecoveryFinalizeRecoveryRequest(ShardId shardId) { + this.shardId = shardId; + } + + public ShardId shardId() { + return shardId; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryPrepareForTranslogOperationsRequest.java new file mode 100644 index 00000000000..c784ee2bc25 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryPrepareForTranslogOperationsRequest implements Streamable { + + private ShardId shardId; + + RecoveryPrepareForTranslogOperationsRequest() { + } + + RecoveryPrepareForTranslogOperationsRequest(ShardId shardId) { + this.shardId = shardId; + } + + public ShardId shardId() { + return shardId; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java new file mode 100644 index 00000000000..477b2b2a3ed --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; + +import java.io.IOException; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryResponse implements Streamable { + + boolean retry = false; + List phase1FileNames = Lists.newArrayList(); + List phase1FileSizes = Lists.newArrayList(); + List phase1ExistingFileNames = Lists.newArrayList(); + List phase1ExistingFileSizes = Lists.newArrayList(); + long phase1TotalSize; + long phase1ExistingTotalSize; + long phase1Time; + long phase1ThrottlingWaitTime; + + int phase2Operations; + long phase2Time; + + int phase3Operations; + long phase3Time; + + RecoveryResponse() { + } + + @Override public void readFrom(StreamInput in) throws IOException { + retry = in.readBoolean(); + int size = in.readVInt(); + phase1FileNames = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + phase1FileNames.add(in.readUTF()); + } + size = in.readVInt(); + phase1FileSizes = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + phase1FileSizes.add(in.readVLong()); + } + + size = in.readVInt(); + phase1ExistingFileNames = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + phase1ExistingFileNames.add(in.readUTF()); + } + size = in.readVInt(); + phase1ExistingFileSizes = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + phase1ExistingFileSizes.add(in.readVLong()); + } + + phase1TotalSize = in.readVLong(); + phase1ExistingTotalSize = in.readVLong(); + phase1Time = in.readVLong(); + phase1ThrottlingWaitTime = in.readVLong(); + phase2Operations = in.readVInt(); + phase2Time = in.readVLong(); + phase3Operations = in.readVInt(); + phase3Time = in.readVLong(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(retry); + out.writeVInt(phase1FileNames.size()); + for (String name : phase1FileNames) { + out.writeUTF(name); + } + out.writeVInt(phase1FileSizes.size()); + for (long size : phase1FileSizes) { + out.writeVLong(size); + } + + out.writeVInt(phase1ExistingFileNames.size()); + for (String name : phase1ExistingFileNames) { + out.writeUTF(name); + } + out.writeVInt(phase1ExistingFileSizes.size()); + for (long size : phase1ExistingFileSizes) { + out.writeVLong(size); + } + + out.writeVLong(phase1TotalSize); + out.writeVLong(phase1ExistingTotalSize); + out.writeVLong(phase1Time); + out.writeVLong(phase1ThrottlingWaitTime); + out.writeVInt(phase2Operations); + out.writeVLong(phase2Time); + out.writeVInt(phase3Operations); + out.writeVLong(phase3Time); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java new file mode 100644 index 00000000000..992ca3830b7 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -0,0 +1,294 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.apache.lucene.store.IndexInput; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.VoidTransportResponseHandler; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.common.unit.TimeValue.*; + +/** + * The source recovery accepts recovery requests from other peer shards and start the recovery process from this + * source shard to the target shard. + * + * @author kimchy (shay.banon) + */ +public class RecoverySource extends AbstractComponent { + + public static class Actions { + public static final String START_RECOVERY = "index/shard/recovery/startRecovery"; + } + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final IndicesService indicesService; + + private final RecoveryThrottler recoveryThrottler; + + + private final ByteSizeValue fileChunkSize; + + @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, + RecoveryThrottler recoveryThrottler) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.indicesService = indicesService; + this.recoveryThrottler = recoveryThrottler; + + this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); + + transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); + } + + private RecoveryResponse recover(final StartRecoveryRequest request) { + if (!recoveryThrottler.tryRecovery(request.shardId(), "peer recovery source")) { + RecoveryResponse retry = new RecoveryResponse(); + retry.retry = true; + return retry; + } + final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + try { + logger.trace("starting recovery to {}, mark_as_relocated {}", request.targetNode(), request.markAsRelocated()); + final RecoveryResponse response = new RecoveryResponse(); + shard.recover(new Engine.RecoveryHandler() { + @Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException { + long totalSize = 0; + long existingTotalSize = 0; + try { + StopWatch stopWatch = new StopWatch().start(); + + for (String name : snapshot.getFiles()) { + StoreFileMetaData md = shard.store().metaDataWithMd5(name); + boolean useExisting = false; + if (request.existingFiles.containsKey(name)) { + if (md.md5().equals(request.existingFiles.get(name).md5())) { + response.phase1ExistingFileNames.add(name); + response.phase1ExistingFileSizes.add(md.sizeInBytes()); + existingTotalSize += md.sizeInBytes(); + useExisting = true; + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5()); + } + } + } + if (!useExisting) { + if (request.existingFiles.containsKey(name)) { + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5()); + } else { + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name); + } + response.phase1FileNames.add(name); + response.phase1FileSizes.add(md.sizeInBytes()); + totalSize += md.sizeInBytes(); + } + } + response.phase1TotalSize = totalSize; + response.phase1ExistingTotalSize = existingTotalSize; + + final AtomicLong throttlingWaitTime = new AtomicLong(); + + logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + + final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); + final AtomicReference lastException = new AtomicReference(); + for (final String name : response.phase1FileNames) { + threadPool.execute(new Runnable() { + @Override public void run() { + IndexInput indexInput = null; + try { + long throttlingStartTime = System.currentTimeMillis(); + while (!recoveryThrottler.tryStream(request.shardId(), name)) { + if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us + throw new IndexShardClosedException(shard.shardId()); + } + Thread.sleep(recoveryThrottler.throttleInterval().millis()); + } + throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime); + + final int BUFFER_SIZE = (int) fileChunkSize.bytes(); + byte[] buf = new byte[BUFFER_SIZE]; + indexInput = snapshot.getDirectory().openInput(name); + long len = indexInput.length(); + long readCount = 0; + while (readCount < len) { + if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us + throw new IndexShardClosedException(shard.shardId()); + } + int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; + long position = indexInput.getFilePointer(); + indexInput.readBytes(buf, 0, toRead, false); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead), VoidTransportResponseHandler.INSTANCE).txGet(); + readCount += toRead; + } + indexInput.close(); + } catch (Exception e) { + lastException.set(e); + } finally { + recoveryThrottler.streamDone(request.shardId(), name); + if (indexInput != null) { + try { + indexInput.close(); + } catch (IOException e) { + // ignore + } + } + latch.countDown(); + } + } + }); + } + + latch.await(); + + if (lastException.get() != null) { + throw lastException.get(); + } + + // now, set the clean files request + Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet(); + + stopWatch.stop(); + logger.trace("[{}][{}] recovery [phase1] to {}: took [{}], throttling_wait [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get())); + response.phase1Time = stopWatch.totalTime().millis(); + } catch (Throwable e) { + throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e); + } + } + + @Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); + StopWatch stopWatch = new StopWatch().start(); + + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); + + int totalOperations = sendSnapshot(snapshot); + + stopWatch.stop(); + logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); + response.phase2Time = stopWatch.totalTime().millis(); + response.phase2Operations = totalOperations; + } + + @Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); + StopWatch stopWatch = new StopWatch().start(); + int totalOperations = sendSnapshot(snapshot); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); + if (request.markAsRelocated()) { + // TODO what happens if the recovery process fails afterwards, we need to mark this back to started + try { + shard.relocated(); + } catch (IllegalIndexShardStateException e) { + // we can ignore this exception since, on the other node, when it moved to phase3 + // it will also send shard started, which might cause the index shard we work against + // to move be closed by the time we get to the the relocated method + } + } + stopWatch.stop(); + logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); + response.phase3Time = stopWatch.totalTime().millis(); + response.phase3Operations = totalOperations; + } + + private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException { + int translogBatchSize = 10; // TODO make this configurable + int counter = 0; + int totalOperations = 0; + List operations = Lists.newArrayList(); + while (snapshot.hasNext()) { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + operations.add(snapshot.next()); + totalOperations++; + if (++counter == translogBatchSize) { + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + counter = 0; + operations = Lists.newArrayList(); + } + } + // send the leftover + if (!operations.isEmpty()) { + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + } + return totalOperations; + } + }); + return response; + } finally { + recoveryThrottler.recoveryDone(request.shardId(), "peer recovery source"); + } + } + + class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler { + + @Override public StartRecoveryRequest newInstance() { + return new StartRecoveryRequest(); + } + + @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception { + RecoveryResponse response = recover(request); + channel.sendResponse(response); + } + } +} + diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java new file mode 100644 index 00000000000..3c21cae4db0 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -0,0 +1,342 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.VoidStreamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.IndexShardMissingException; +import org.elasticsearch.index.engine.RecoveryEngineException; +import org.elasticsearch.index.shard.*; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.FutureTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportService; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import static org.elasticsearch.common.unit.TimeValue.*; + +/** + * The recovery target handles recoveries of peer shards of the shard+node to recover to. + * + *

Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and + * not several of them (since we don't allocate several shard replicas to the same node). + * + * @author kimchy (shay.banon) + */ +public class RecoveryTarget extends AbstractComponent { + + public static class Actions { + public static final String FILE_CHUNK = "index/shard/recovery/fileChunk"; + public static final String CLEAN_FILES = "index/shard/recovery/cleanFiles"; + public static final String TRANSLOG_OPS = "index/shard/recovery/translogOps"; + public static final String PREPARE_TRANSLOG = "index/shard/recovery/prepareTranslog"; + public static final String FINALIZE = "index/shard/recovery/finalize"; + } + + private final ThreadPool threadPool; + + private final TransportService transportService; + + private final IndicesService indicesService; + + private final RecoveryThrottler recoveryThrottler; + + private final ConcurrentMap onGoingRecoveries = ConcurrentCollections.newConcurrentMap(); + + @Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, + IndicesLifecycle indicesLifecycle, RecoveryThrottler recoveryThrottler) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.indicesService = indicesService; + this.recoveryThrottler = recoveryThrottler; + + transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler()); + transportService.registerHandler(Actions.CLEAN_FILES, new CleanFilesRequestHandler()); + transportService.registerHandler(Actions.PREPARE_TRANSLOG, new PrepareForTranslogOperationsRequestHandler()); + transportService.registerHandler(Actions.TRANSLOG_OPS, new TranslogOperationsRequestHandler()); + transportService.registerHandler(Actions.FINALIZE, new FinalizeRecoveryRequestHandler()); + + indicesLifecycle.addListener(new IndicesLifecycle.Listener() { + @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { + removeAndCleanOnGoingRecovery(shardId); + } + }); + } + + public void startRecovery(final StartRecoveryRequest request, final RecoveryListener listener) { + if (request.sourceNode() == null) { + listener.onIgnoreRecovery("No node to recovery from, retry on next cluster state update"); + return; + } + final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + // mark the shard as recovering + IndexShardState preRecoveryState; + try { + preRecoveryState = shard.recovering(); + } catch (IllegalIndexShardStateException e) { + // that's fine, since we might be called concurrently, just ignore this, we are already recovering + listener.onIgnoreRecovery("Already in recovering process, " + e.getMessage()); + return; + } + final IndexShardState fPreRecoveryState = preRecoveryState; + threadPool.execute(new Runnable() { + @Override public void run() { + doRecovery(shard, fPreRecoveryState, request, listener); + } + }); + } + + private void doRecovery(final InternalIndexShard shard, final IndexShardState preRecoveryState, final StartRecoveryRequest request, final RecoveryListener listener) { + // we know we are on a thread, we can spin till we can engage in recovery + if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) { + if (shard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery("shard closed, stop recovery"); + return; + } + shard.restoreRecoveryState(preRecoveryState); + listener.onRetryRecovery(recoveryThrottler.throttleInterval()); + return; + } + + try { + logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode()); + onGoingRecoveries.put(request.shardId(), new OnGoingRecovery()); + + StopWatch stopWatch = new StopWatch().start(); + RecoveryResponse recoveryStatus = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { + @Override public RecoveryResponse newInstance() { + return new RecoveryResponse(); + } + }).txGet(); + if (recoveryStatus.retry) { + if (shard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery("shard closed, stop recovery"); + return; + } + logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval()); + removeAndCleanOnGoingRecovery(request.shardId()); + shard.restoreRecoveryState(preRecoveryState); + listener.onRetryRecovery(recoveryThrottler.throttleInterval()); + return; + } + stopWatch.stop(); + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] "); + sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n"); + sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]") + .append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']') + .append("\n"); + sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n"); + sb.append(" phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations") + .append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]") + .append("\n"); + sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations") + .append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]"); + logger.debug(sb.toString()); + } + removeAndCleanOnGoingRecovery(request.shardId()); + listener.onRecoveryDone(); + } catch (Exception e) { + removeAndCleanOnGoingRecovery(request.shardId()); + if (shard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery("shard closed, stop recovery"); + return; + } + logger.trace("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode()); + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof RecoveryEngineException) { + // unwrap an exception that was thrown as part of the recovery + cause = cause.getCause(); + } + if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { + shard.restoreRecoveryState(preRecoveryState); + listener.onRetryRecovery(recoveryThrottler.throttleInterval()); + return; + } + listener.onRecoveryFailure(new RecoveryFailedException(request, e), true); + } finally { + recoveryThrottler.recoveryDone(shard.shardId(), "peer recovery target"); + } + } + + public static interface RecoveryListener { + void onRecoveryDone(); + + void onRetryRecovery(TimeValue retryAfter); + + void onIgnoreRecovery(String reason); + + void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure); + } + + + private void removeAndCleanOnGoingRecovery(ShardId shardId) { + // clean it from the on going recoveries since it is being closed + OnGoingRecovery onGoingRecovery = onGoingRecoveries.remove(shardId); + if (onGoingRecovery != null) { + // clean open index outputs + for (Map.Entry entry : onGoingRecovery.openIndexOutputs.entrySet()) { + synchronized (entry.getValue()) { + try { + entry.getValue().close(); + } catch (IOException e) { + // ignore + } + } + } + } + } + + static class OnGoingRecovery { + ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); + } + + class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler { + + @Override public RecoveryPrepareForTranslogOperationsRequest newInstance() { + return new RecoveryPrepareForTranslogOperationsRequest(); + } + + @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { + InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + shard.performRecoveryPrepareForTranslog(); + channel.sendResponse(VoidStreamable.INSTANCE); + } + } + + class FinalizeRecoveryRequestHandler extends BaseTransportRequestHandler { + + @Override public RecoveryFinalizeRecoveryRequest newInstance() { + return new RecoveryFinalizeRecoveryRequest(); + } + + @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { + InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + shard.performRecoveryFinalization(false); + channel.sendResponse(VoidStreamable.INSTANCE); + } + } + + class TranslogOperationsRequestHandler extends BaseTransportRequestHandler { + + + @Override public RecoveryTranslogOperationsRequest newInstance() { + return new RecoveryTranslogOperationsRequest(); + } + + @Override public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception { + InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + for (Translog.Operation operation : request.operations()) { + shard.performRecoveryOperation(operation); + } + channel.sendResponse(VoidStreamable.INSTANCE); + } + } + + class CleanFilesRequestHandler extends BaseTransportRequestHandler { + + @Override public RecoveryCleanFilesRequest newInstance() { + return new RecoveryCleanFilesRequest(); + } + + @Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { + InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + for (String existingFile : shard.store().directory().listAll()) { + if (!request.snapshotFiles().contains(existingFile)) { + shard.store().directory().deleteFile(existingFile); + } + } + channel.sendResponse(VoidStreamable.INSTANCE); + } + } + + class FileChunkTransportRequestHandler extends BaseTransportRequestHandler { + + + @Override public RecoveryFileChunkRequest newInstance() { + return new RecoveryFileChunkRequest(); + } + + @Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception { + InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + if (onGoingRecovery == null) { + // shard is getting closed on us + throw new IndexShardClosedException(shard.shardId()); + } + IndexOutput indexOutput; + if (request.position() == 0) { + // first request + indexOutput = onGoingRecovery.openIndexOutputs.remove(request.name()); + if (indexOutput != null) { + try { + indexOutput.close(); + } catch (IOException e) { + // ignore + } + } + indexOutput = shard.store().directory().createOutput(request.name()); + onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); + } else { + indexOutput = onGoingRecovery.openIndexOutputs.get(request.name()); + } + synchronized (indexOutput) { + try { + indexOutput.writeBytes(request.content(), request.contentLength()); + if (indexOutput.getFilePointer() == request.length()) { + // we are done + indexOutput.close(); + onGoingRecovery.openIndexOutputs.remove(request.name()); + } + } catch (IOException e) { + onGoingRecovery.openIndexOutputs.remove(request.name()); + try { + indexOutput.close(); + } catch (IOException e1) { + // ignore + } + } + } + channel.sendResponse(VoidStreamable.INSTANCE); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java new file mode 100644 index 00000000000..cbcb1830fd6 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStreams; + +import java.io.IOException; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryTranslogOperationsRequest implements Streamable { + + private ShardId shardId; + private List operations; + + RecoveryTranslogOperationsRequest() { + } + + RecoveryTranslogOperationsRequest(ShardId shardId, List operations) { + this.shardId = shardId; + this.operations = operations; + } + + public ShardId shardId() { + return shardId; + } + + public List operations() { + return operations; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + int size = in.readVInt(); + operations = Lists.newArrayListWithExpectedSize(size); + for (int i = 0; i < size; i++) { + operations.add(TranslogStreams.readTranslogOperation(in)); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeVInt(operations.size()); + for (Translog.Operation operation : operations) { + TranslogStreams.writeTranslogOperation(out, operation); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java new file mode 100644 index 00000000000..97b8c48c90a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.StoreFileMetaData; + +import java.io.IOException; +import java.util.Map; + +/** + * @author kimchy (shay.banon) + */ +public class StartRecoveryRequest implements Streamable { + + private ShardId shardId; + + private DiscoveryNode sourceNode; + + private DiscoveryNode targetNode; + + private boolean markAsRelocated; + + Map existingFiles; + + StartRecoveryRequest() { + } + + /** + * Start recovery request. + * + * @param shardId + * @param sourceNode The node to recover from + * @param targetNode Teh node to recover to + * @param markAsRelocated + * @param existingFiles + */ + public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map existingFiles) { + this.shardId = shardId; + this.sourceNode = sourceNode; + this.targetNode = targetNode; + this.markAsRelocated = markAsRelocated; + this.existingFiles = existingFiles; + } + + public ShardId shardId() { + return shardId; + } + + public DiscoveryNode sourceNode() { + return sourceNode; + } + + public DiscoveryNode targetNode() { + return targetNode; + } + + public boolean markAsRelocated() { + return markAsRelocated; + } + + public Map existingFiles() { + return existingFiles; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + sourceNode = DiscoveryNode.readNode(in); + targetNode = DiscoveryNode.readNode(in); + markAsRelocated = in.readBoolean(); + int size = in.readVInt(); + existingFiles = Maps.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in); + existingFiles.put(md.name(), md); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + sourceNode.writeTo(out); + targetNode.writeTo(out); + out.writeBoolean(markAsRelocated); + out.writeVInt(existingFiles.size()); + for (StoreFileMetaData md : existingFiles.values()) { + md.writeTo(out); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java index 35ef502f014..b890590ac20 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.memory.ByteBufferDirectory; import org.elasticsearch.index.store.support.AbstractStore; +import java.io.File; import java.io.IOException; import java.util.concurrent.ConcurrentMap; @@ -67,7 +68,31 @@ public abstract class FsStore extends AbstractStore { String fsLock = componentSettings.get("fs_lock", "native"); LockFactory lockFactory = new NoLockFactory(); if (fsLock.equals("native")) { - lockFactory = new NativeFSLockFactory(); + // TODO LUCENE MONITOR: this is not needed in next Lucene version + lockFactory = new NativeFSLockFactory() { + @Override public void clearLock(String lockName) throws IOException { + // Note that this isn't strictly required anymore + // because the existence of these files does not mean + // they are locked, but, still do this in case people + // really want to see the files go away: + if (lockDir.exists()) { + + // Try to release the lock first - if it's held by another process, this + // method should not silently fail. + // NOTE: makeLock fixes the lock name by prefixing it w/ lockPrefix. + // Therefore it should be called before the code block next which prefixes + // the given name. + makeLock(lockName).release(); + + if (lockPrefix != null) { + lockName = lockPrefix + "-" + lockName; + } + + // As mentioned above, we don't care if the deletion of the file failed. + new File(lockDir, lockName).delete(); + } + } + }; } else if (fsLock.equals("simple")) { lockFactory = new SimpleFSLockFactory(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java index efbc7c711c2..a663d402a3e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -21,6 +21,8 @@ package org.elasticsearch.indices; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.recovery.RecoverySource; +import org.elasticsearch.index.shard.recovery.RecoveryTarget; import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.memory.IndexingMemoryBufferController; @@ -41,8 +43,13 @@ public class IndicesModule extends AbstractModule { @Override protected void configure() { bind(IndicesLifecycle.class).to(InternalIndicesLifecycle.class).asEagerSingleton(); + bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton(); + bind(RecoveryThrottler.class).asEagerSingleton(); + bind(RecoveryTarget.class).asEagerSingleton(); + bind(RecoverySource.class).asEagerSingleton(); + bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndicesMemoryCleaner.class).asEagerSingleton(); bind(IndexingMemoryBufferController.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 945b43d2f48..6a6947d233f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -38,16 +38,18 @@ import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexShardAlreadyExistsException; import org.elasticsearch.index.IndexShardMissingException; -import org.elasticsearch.index.gateway.IgnoreGatewayRecoveryException; +import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.IndexShardGatewayService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.recovery.IgnoreRecoveryException; -import org.elasticsearch.index.shard.recovery.RecoveryAction; +import org.elasticsearch.index.shard.recovery.RecoveryFailedException; +import org.elasticsearch.index.shard.recovery.RecoveryTarget; +import org.elasticsearch.index.shard.recovery.StartRecoveryRequest; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; @@ -70,6 +72,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent