refactor recovery to be handled on the node level (and not per shard), with better retry mechanism when doing peer shard recovery

This commit is contained in:
kimchy 2010-07-18 22:54:21 +03:00
parent 8ac8dd818d
commit fefcfb5b5c
19 changed files with 1468 additions and 998 deletions

View File

@ -133,34 +133,39 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
@Override public void start() throws EngineException {
if (indexWriter != null) {
throw new EngineAlreadyStartedException(shardId);
}
if (logger.isDebugEnabled()) {
logger.debug("Starting engine with ram_buffer_size[" + indexingBufferSize + "], refresh_interval[" + refreshInterval + "]");
}
rwl.writeLock().lock();
try {
this.indexWriter = createWriter();
} catch (IOException e) {
throw new EngineCreationFailureException(shardId, "Failed to create engine", e);
}
try {
translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
this.nrtResource = buildNrtResource(indexWriter);
} catch (IOException e) {
if (indexWriter != null) {
throw new EngineAlreadyStartedException(shardId);
}
if (logger.isDebugEnabled()) {
logger.debug("Starting engine with ram_buffer_size[" + indexingBufferSize + "], refresh_interval[" + refreshInterval + "]");
}
try {
indexWriter.rollback();
} catch (IOException e1) {
// ignore
} finally {
this.indexWriter = createWriter();
} catch (IOException e) {
throw new EngineCreationFailureException(shardId, "Failed to create engine", e);
}
try {
translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
this.nrtResource = buildNrtResource(indexWriter);
} catch (IOException e) {
try {
indexWriter.close();
indexWriter.rollback();
} catch (IOException e1) {
// ignore
} finally {
try {
indexWriter.close();
} catch (IOException e1) {
// ignore
}
}
throw new EngineCreationFailureException(shardId, "Failed to open reader on writer", e);
}
throw new EngineCreationFailureException(shardId, "Failed to open reader on writer", e);
} finally {
rwl.writeLock().unlock();
}
}
@ -461,10 +466,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
closed = true;
rwl.writeLock().lock();
if (nrtResource != null) {
this.nrtResource.forceClose();
}
try {
if (nrtResource != null) {
this.nrtResource.forceClose();
}
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
if (indexWriter != null) {
try {
@ -486,8 +491,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
try {
// release locks when started
if (IndexWriter.isLocked(store.directory())) {
logger.trace("Shard is locked, releasing lock");
store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME);
logger.warn("shard is locked, releasing lock");
IndexWriter.unlock(store.directory());
}
boolean create = !IndexReader.indexExists(store.directory());
indexWriter = new IndexWriter(store.directory(),

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.CloseableIndexComponent;
import org.elasticsearch.common.inject.Inject;
@ -94,79 +93,97 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
scheduleSnapshotIfNeeded();
}
public static interface RecoveryListener {
void onRecoveryDone();
void onIgnoreRecovery(String reason);
void onRecoveryFailed(IndexShardGatewayRecoveryException e);
}
/**
* Recovers the state of the shard from the gateway.
*/
public synchronized void recover() throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException {
public void recover(final RecoveryListener listener) throws IndexShardGatewayRecoveryException, IgnoreGatewayRecoveryException {
if (!recovered.compareAndSet(false, true)) {
throw new IgnoreGatewayRecoveryException(shardId, "already recovered");
listener.onIgnoreRecovery("already recovered");
return;
}
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
listener.onIgnoreRecovery("shard closed");
return;
}
if (!indexShard.routingEntry().primary()) {
throw new ElasticSearchIllegalStateException("Trying to recover when the shard is in backup state");
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Trying to recover when the shard is in backup state", null));
return;
}
indexShard.recovering();
threadPool.execute(new Runnable() {
@Override public void run() {
StopWatch throttlingWaitTime = new StopWatch().start();
// we know we are on a thread, we can spin till we can engage in recovery
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
throw new IgnoreGatewayRecoveryException(shardId, "Interrupted while waiting for recovery, but we should ignore ...");
indexShard.recovering();
StopWatch throttlingWaitTime = new StopWatch().start();
// we know we are on a thread, we can spin till we can engage in recovery
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore ...");
return;
}
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e));
}
}
// we got interrupted, mark it as failed
throw new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e);
}
}
throttlingWaitTime.stop();
throttlingWaitTime.stop();
try {
logger.debug("starting recovery from {}", shardGateway);
StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
try {
logger.debug("starting recovery from {}", shardGateway);
StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = -1;
lastTranslogPosition = 0;
lastTranslogLength = 0;
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = -1;
lastTranslogPosition = 0;
lastTranslogLength = 0;
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start();
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
listener.onRecoveryDone();
scheduleSnapshotIfNeeded();
} catch (IndexShardGatewayRecoveryException e) {
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
listener.onRecoveryFailed(e);
} catch (IndexShardClosedException e) {
listener.onIgnoreRecovery("shard closed");
} catch (IndexShardNotStartedException e) {
listener.onIgnoreRecovery("shard closed");
} catch (Exception e) {
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e));
} finally {
recoveryThrottler.recoveryDone(shardId, "gateway");
}
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
scheduleSnapshotIfNeeded();
} catch (IndexShardGatewayRecoveryException e) {
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
}
throw e;
} catch (IndexShardClosedException e) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
} catch (IndexShardNotStartedException e) {
// got closed on us, just ignore this recovery
throw new IgnoreGatewayRecoveryException(shardId, "shard closed");
} finally {
recoveryThrottler.recoveryDone(shardId, "gateway");
}
});
}
/**

View File

@ -51,7 +51,6 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardManagement;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.recovery.RecoveryAction;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
@ -295,9 +294,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
shardInjector.getInstance(IndexShardManagement.class).close();
}
RecoveryAction recoveryAction = shardInjector.getInstance(RecoveryAction.class);
if (recoveryAction != null) recoveryAction.close();
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.shard;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.shard.recovery.RecoveryAction;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -39,7 +38,5 @@ public class IndexShardModule extends AbstractModule {
bind(ShardId.class).toInstance(shardId);
bind(IndexShard.class).to(InternalIndexShard.class).asEagerSingleton();
bind(IndexShardManagement.class).asEagerSingleton();
bind(RecoveryAction.class).asEagerSingleton();
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RecoverFilesRecoveryException extends IndexShardException {

View File

@ -1,837 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.*;
import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
/**
* @author kimchy (shay.banon)
*/
public class RecoveryAction extends AbstractIndexShardComponent implements CloseableComponent {
private final ByteSizeValue fileChunkSize;
private final ThreadPool threadPool;
private final TransportService transportService;
private final InternalIndexShard indexShard;
private final Store store;
private final RecoveryThrottler recoveryThrottler;
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = newConcurrentMap();
private final String startTransportAction;
private final String fileChunkTransportAction;
private final String cleanFilesTransportAction;
private final String prepareForTranslogOperationsTransportAction;
private final String translogOperationsTransportAction;
private final String finalizeRecoveryTransportAction;
private volatile boolean closed = false;
private volatile Thread sendStartRecoveryThread;
private volatile Thread receiveSnapshotRecoveryThread;
private volatile Thread sendSnapshotRecoveryThread;
private final CopyOnWriteArrayList<Future> sendFileChunksRecoveryFutures = new CopyOnWriteArrayList<Future>();
@Inject public RecoveryAction(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, TransportService transportService,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indexShard = (InternalIndexShard) indexShard;
this.store = store;
this.recoveryThrottler = recoveryThrottler;
startTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/start";
transportService.registerHandler(startTransportAction, new StartRecoveryTransportRequestHandler());
fileChunkTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/fileChunk";
transportService.registerHandler(fileChunkTransportAction, new FileChunkTransportRequestHandler());
cleanFilesTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/cleanFiles";
transportService.registerHandler(cleanFilesTransportAction, new CleanFilesRequestHandler());
prepareForTranslogOperationsTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/prepareForTranslog";
transportService.registerHandler(prepareForTranslogOperationsTransportAction, new PrepareForTranslogOperationsRequestHandler());
translogOperationsTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/translogOperations";
transportService.registerHandler(translogOperationsTransportAction, new TranslogOperationsRequestHandler());
finalizeRecoveryTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/finalizeRecovery";
transportService.registerHandler(finalizeRecoveryTransportAction, new FinalizeRecoveryRequestHandler());
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
logger.trace("recovery action registered, using file_chunk_size[{}]", fileChunkSize);
}
public void close() {
closed = true;
transportService.removeHandler(startTransportAction);
transportService.removeHandler(fileChunkTransportAction);
transportService.removeHandler(cleanFilesTransportAction);
transportService.removeHandler(prepareForTranslogOperationsTransportAction);
transportService.removeHandler(translogOperationsTransportAction);
transportService.removeHandler(finalizeRecoveryTransportAction);
cleanOpenIndex();
// interrupt the startRecovery thread if its performing recovery
if (sendStartRecoveryThread != null) {
sendStartRecoveryThread.interrupt();
}
if (receiveSnapshotRecoveryThread != null) {
receiveSnapshotRecoveryThread.interrupt();
}
if (sendSnapshotRecoveryThread != null) {
sendSnapshotRecoveryThread.interrupt();
}
for (Future future : sendFileChunksRecoveryFutures) {
future.cancel(true);
}
}
public synchronized void startRecovery(DiscoveryNode node, DiscoveryNode targetNode, boolean markAsRelocated) throws ElasticSearchException {
if (targetNode == null) {
throw new IgnoreRecoveryException("No node to recovery from, retry next time...");
}
sendStartRecoveryThread = Thread.currentThread();
try {
// mark the shard as recovering
IndexShardState preRecoveringState;
try {
preRecoveringState = indexShard.recovering();
} catch (IndexShardRecoveringException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardStartedException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already started
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardRelocatedException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already relocated
throw new IgnoreRecoveryException("Already in recovering process", e);
} catch (IndexShardClosedException e) {
throw new IgnoreRecoveryException("Can't recover a closed shard.", e);
}
// we know we are on a thread, we can spin till we can engage in recovery
StopWatch throttlingWaitTime = new StopWatch().start();
while (!recoveryThrottler.tryRecovery(shardId, "peer recovery target")) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
throw new IgnoreRecoveryException("Interrupted while waiting for recovery, but we should ignore ...");
}
// we got interrupted, mark it as failed
throw new RecoveryFailedException(shardId, node, targetNode, e);
}
}
throttlingWaitTime.stop();
try {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed");
}
logger.debug("starting recovery from {}", targetNode);
// build a list of the current files located locally, maybe we don't need to recover them...
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(node, markAsRelocated, store.listWithMd5());
StopWatch stopWatch = null;
RecoveryStatus recoveryStatus = null;
boolean retry = true;
while (retry) {
stopWatch = new StopWatch().start();
recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, startRecoveryRequest, new FutureTransportResponseHandler<RecoveryStatus>() {
@Override public RecoveryStatus newInstance() {
return new RecoveryStatus();
}
}).txGet();
retry = recoveryStatus.retry;
if (retry) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
throw new IgnoreRecoveryException("Interrupted while waiting for remote recovery, but we should ignore ...");
}
// we got interrupted, mark it as failed
throw new RecoveryFailedException(shardId, node, targetNode, e);
}
}
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(targetNode).append(", took[").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]")
.append("\n");
sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]");
logger.debug(sb.toString());
}
} catch (RemoteTransportException e) {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed", e);
}
logger.trace("recovery from [{}] failed", e, targetNode);
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ActionNotFoundTransportException || cause instanceof IndexShardNotStartedException) {
// the remote shard has not yet registered the action or not started yet, we need to ignore this recovery attempt, and restore the state previous to recovering
indexShard.restoreRecoveryState(preRecoveringState);
throw new IgnoreRecoveryException("Ignoring recovery attempt, remote shard not started", e);
} else if (cause instanceof RecoveryEngineException) {
// it might be wrapped
if (cause.getCause() instanceof IgnoreRecoveryException) {
throw (IgnoreRecoveryException) cause.getCause();
}
} else if (cause instanceof IgnoreRecoveryException) {
throw (IgnoreRecoveryException) cause;
} else if ((cause instanceof NodeNotConnectedException) || (cause instanceof NodeDisconnectedException)) {
throw new IgnoreRecoveryException("Ignore recovery attempt, remote node not connected", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
} catch (Exception e) {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
} finally {
recoveryThrottler.recoveryDone(shardId, "peer recovery target");
}
} finally {
sendStartRecoveryThread = null;
}
}
private void cleanOpenIndex() {
for (IndexOutput indexOutput : openIndexOutputs.values()) {
try {
synchronized (indexOutput) {
indexOutput.close();
}
} catch (Exception e) {
// ignore
}
}
openIndexOutputs.clear();
}
static class StartRecoveryRequest implements Streamable {
DiscoveryNode node;
boolean markAsRelocated;
// name -> (md5, size)
Map<String, StoreFileMetaData> existingFiles;
private StartRecoveryRequest() {
}
private StartRecoveryRequest(DiscoveryNode node, boolean markAsRelocated, Map<String, StoreFileMetaData> existingFiles) {
this.node = node;
this.markAsRelocated = markAsRelocated;
this.existingFiles = existingFiles;
}
@Override public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
markAsRelocated = in.readBoolean();
int size = in.readVInt();
existingFiles = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in);
existingFiles.put(md.name(), md);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeBoolean(markAsRelocated);
out.writeVInt(existingFiles.size());
for (StoreFileMetaData md : existingFiles.values()) {
md.writeTo(out);
}
}
}
private class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler<StartRecoveryRequest> {
@Override public StartRecoveryRequest newInstance() {
return new StartRecoveryRequest();
}
@Override public void messageReceived(final StartRecoveryRequest startRecoveryRequest, final TransportChannel channel) throws Exception {
if (!recoveryThrottler.tryRecovery(shardId, "peer recovery source")) {
RecoveryStatus retry = new RecoveryStatus();
retry.retry = true;
channel.sendResponse(retry);
return;
}
try {
logger.trace("starting recovery to {}, mark_as_relocated {}", startRecoveryRequest.node, startRecoveryRequest.markAsRelocated);
final DiscoveryNode node = startRecoveryRequest.node;
cleanOpenIndex();
final RecoveryStatus recoveryStatus = new RecoveryStatus();
indexShard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
long existingTotalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
StoreFileMetaData md = store.metaDataWithMd5(name);
boolean useExisting = false;
if (startRecoveryRequest.existingFiles.containsKey(name)) {
if (md.md5().equals(startRecoveryRequest.existingFiles.get(name).md5())) {
recoveryStatus.phase1ExistingFileNames.add(name);
recoveryStatus.phase1ExistingFileSizes.add(md.sizeInBytes());
existingTotalSize += md.sizeInBytes();
useExisting = true;
if (logger.isTraceEnabled()) {
logger.trace("recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", node, name, md.md5());
}
}
}
if (!useExisting) {
if (startRecoveryRequest.existingFiles.containsKey(name)) {
logger.trace("recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", node, name, startRecoveryRequest.existingFiles.get(name).md5(), md.md5());
} else {
logger.trace("recovery [phase1] to {}: recovering [{}], does not exists in remote", node, name);
}
recoveryStatus.phase1FileNames.add(name);
recoveryStatus.phase1FileSizes.add(md.sizeInBytes());
totalSize += md.sizeInBytes();
}
}
recoveryStatus.phase1TotalSize = totalSize;
recoveryStatus.phase1ExistingTotalSize = existingTotalSize;
final AtomicLong throttlingWaitTime = new AtomicLong();
logger.trace("recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", node, recoveryStatus.phase1FileNames.size(), new ByteSizeValue(totalSize), recoveryStatus.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
final CountDownLatch latch = new CountDownLatch(recoveryStatus.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : recoveryStatus.phase1FileNames) {
sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(shardId, name)) {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = snapshot.getDirectory().openInput(name);
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(node, fileChunkTransportAction, new FileChunk(name, position, len, buf, toRead), VoidTransportResponseHandler.INSTANCE).txGet(120, SECONDS);
readCount += toRead;
}
indexInput.close();
} catch (Exception e) {
lastException.set(e);
} finally {
recoveryThrottler.streamDone(shardId, name);
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown();
}
}
}));
}
latch.await();
if (lastException.get() != null) {
throw lastException.get();
}
// now, set the clean files request
CleanFilesRequest cleanFilesRequest = new CleanFilesRequest();
cleanFilesRequest.snapshotFiles.addAll(Arrays.asList(snapshot.getFiles()));
transportService.submitRequest(node, cleanFilesTransportAction, cleanFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet();
stopWatch.stop();
logger.trace("recovery [phase1] to {}: took [{}], throttling_wait [{}]", node, stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get()));
recoveryStatus.phase1Time = stopWatch.totalTime().millis();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted while recovering files");
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(shardId, recoveryStatus.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
sendFileChunksRecoveryFutures.clear();
}
}
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("recovery [phase2] to {}: sending transaction log operations", node);
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(node, prepareForTranslogOperationsTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet();
int totalOperations = sendSnapshot(snapshot);
stopWatch.stop();
logger.trace("recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = totalOperations;
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally {
sendSnapshotRecoveryThread = null;
}
}
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("recovery [phase3] to {}: sending transaction log operations", node);
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(node, finalizeRecoveryTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet();
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
indexShard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
}
}
stopWatch.stop();
logger.trace("recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = totalOperations;
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally {
sendSnapshotRecoveryThread = null;
}
}
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
TranslogOperationsRequest request = new TranslogOperationsRequest();
int translogBatchSize = 10; // TODO make this configurable
int counter = 0;
int totalOperations = 0;
while (snapshot.hasNext()) {
request.operations.add(snapshot.next());
totalOperations++;
if (++counter == translogBatchSize) {
transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
request.operations.clear();
}
}
// send the leftover
if (!request.operations.isEmpty()) {
transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet();
}
return totalOperations;
}
});
channel.sendResponse(recoveryStatus);
} finally {
recoveryThrottler.recoveryDone(shardId, "peer recovery source");
}
}
}
private static class RecoveryStatus implements Streamable {
boolean retry = false;
List<String> phase1FileNames = Lists.newArrayList();
List<Long> phase1FileSizes = Lists.newArrayList();
List<String> phase1ExistingFileNames = Lists.newArrayList();
List<Long> phase1ExistingFileSizes = Lists.newArrayList();
long phase1TotalSize;
long phase1ExistingTotalSize;
long phase1Time;
long phase1ThrottlingWaitTime;
int phase2Operations;
long phase2Time;
int phase3Operations;
long phase3Time;
private RecoveryStatus() {
}
@Override public void readFrom(StreamInput in) throws IOException {
retry = in.readBoolean();
int size = in.readVInt();
phase1FileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1FileNames.add(in.readUTF());
}
size = in.readVInt();
phase1FileSizes = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1FileSizes.add(in.readVLong());
}
size = in.readVInt();
phase1ExistingFileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileNames.add(in.readUTF());
}
size = in.readVInt();
phase1ExistingFileSizes = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileSizes.add(in.readVLong());
}
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
phase1Time = in.readVLong();
phase1ThrottlingWaitTime = in.readVLong();
phase2Operations = in.readVInt();
phase2Time = in.readVLong();
phase3Operations = in.readVInt();
phase3Time = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(retry);
out.writeVInt(phase1FileNames.size());
for (String name : phase1FileNames) {
out.writeUTF(name);
}
out.writeVInt(phase1FileSizes.size());
for (long size : phase1FileSizes) {
out.writeVLong(size);
}
out.writeVInt(phase1ExistingFileNames.size());
for (String name : phase1ExistingFileNames) {
out.writeUTF(name);
}
out.writeVInt(phase1ExistingFileSizes.size());
for (long size : phase1ExistingFileSizes) {
out.writeVLong(size);
}
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
out.writeVLong(phase1Time);
out.writeVLong(phase1ThrottlingWaitTime);
out.writeVInt(phase2Operations);
out.writeVLong(phase2Time);
out.writeVInt(phase3Operations);
out.writeVLong(phase3Time);
}
}
static class CleanFilesRequest implements Streamable {
Set<String> snapshotFiles = Sets.newHashSet();
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
snapshotFiles.add(in.readUTF());
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(snapshotFiles.size());
for (String snapshotFile : snapshotFiles) {
out.writeUTF(snapshotFile);
}
}
}
class CleanFilesRequestHandler extends BaseTransportRequestHandler<CleanFilesRequest> {
@Override public CleanFilesRequest newInstance() {
return new CleanFilesRequest();
}
@Override public void messageReceived(CleanFilesRequest request, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
for (String existingFile : store.directory().listAll()) {
if (!request.snapshotFiles.contains(existingFile)) {
store.directory().deleteFile(existingFile);
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<VoidStreamable> {
@Override public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
}
@Override public void messageReceived(VoidStreamable stream, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
indexShard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
class FinalizeRecoveryRequestHandler extends BaseTransportRequestHandler<VoidStreamable> {
@Override public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
}
@Override public void messageReceived(VoidStreamable stream, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
indexShard.performRecoveryFinalization(false);
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
class TranslogOperationsRequestHandler extends BaseTransportRequestHandler<TranslogOperationsRequest> {
@Override public TranslogOperationsRequest newInstance() {
return new TranslogOperationsRequest();
}
@Override public void messageReceived(TranslogOperationsRequest snapshot, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
for (Translog.Operation operation : snapshot.operations) {
indexShard.performRecoveryOperation(operation);
}
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
static class TranslogOperationsRequest implements Streamable {
List<Translog.Operation> operations = Lists.newArrayList();
TranslogOperationsRequest() {
}
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
operations.add(TranslogStreams.readTranslogOperation(in));
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(operations.size());
for (Translog.Operation operation : operations) {
TranslogStreams.writeTranslogOperation(out, operation);
}
}
}
private class FileChunkTransportRequestHandler extends BaseTransportRequestHandler<FileChunk> {
@Override public FileChunk newInstance() {
return new FileChunk();
}
@Override public void messageReceived(FileChunk request, TransportChannel channel) throws Exception {
if (closed) {
throw new IndexShardClosedException(shardId);
}
IndexOutput indexOutput;
if (request.position == 0) {
// first request
indexOutput = openIndexOutputs.remove(request.name);
if (indexOutput != null) {
try {
indexOutput.close();
} catch (IOException e) {
// ignore
}
}
indexOutput = store.directory().createOutput(request.name);
openIndexOutputs.put(request.name, indexOutput);
} else {
indexOutput = openIndexOutputs.get(request.name);
}
synchronized (indexOutput) {
try {
indexOutput.writeBytes(request.content, request.contentLength);
if (indexOutput.getFilePointer() == request.length) {
// we are done
indexOutput.close();
openIndexOutputs.remove(request.name);
}
} catch (IOException e) {
openIndexOutputs.remove(request.name);
try {
indexOutput.close();
} catch (IOException e1) {
// ignore
}
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
private static class FileChunk implements Streamable {
String name;
long position;
long length;
byte[] content;
int contentLength;
private FileChunk() {
}
private FileChunk(String name, long position, long length, byte[] content, int contentLength) {
this.name = name;
this.position = position;
this.length = length;
this.content = content;
this.contentLength = contentLength;
}
@Override public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
position = in.readVLong();
length = in.readVLong();
contentLength = in.readVInt();
content = new byte[contentLength];
in.readFully(content);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name);
out.writeVLong(position);
out.writeVLong(length);
out.writeVInt(contentLength);
out.writeBytes(content, 0, contentLength);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
class RecoveryCleanFilesRequest implements Streamable {
private ShardId shardId;
private Set<String> snapshotFiles;
RecoveryCleanFilesRequest() {
}
RecoveryCleanFilesRequest(ShardId shardId, Set<String> snapshotFiles) {
this.shardId = shardId;
this.snapshotFiles = snapshotFiles;
}
public ShardId shardId() {
return shardId;
}
public Set<String> snapshotFiles() {
return snapshotFiles;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
int size = in.readVInt();
snapshotFiles = Sets.newHashSetWithExpectedSize(size);
for (int i = 0; i < size; i++) {
snapshotFiles.add(in.readUTF());
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeVInt(snapshotFiles.size());
for (String snapshotFile : snapshotFiles) {
out.writeUTF(snapshotFile);
}
}
}

View File

@ -24,11 +24,15 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.index.shard.ShardId;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RecoveryFailedException extends ElasticSearchException {
public RecoveryFailedException(ShardId shardId, DiscoveryNode node, DiscoveryNode targetNode, Throwable cause) {
super(shardId + ": Recovery failed from " + targetNode + " into " + node, cause);
public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) {
this(request.shardId(), request.sourceNode(), request.targetNode(), cause);
}
public RecoveryFailedException(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Throwable cause) {
super(shardId + ": Recovery failed from " + sourceNode + " into " + targetNode, cause);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
class RecoveryFileChunkRequest implements Streamable {
private ShardId shardId;
private String name;
private long position;
private long length;
private byte[] content;
private int contentLength;
RecoveryFileChunkRequest() {
}
RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, byte[] content, int contentLength) {
this.shardId = shardId;
this.name = name;
this.position = position;
this.length = length;
this.content = content;
this.contentLength = contentLength;
}
public ShardId shardId() {
return shardId;
}
public String name() {
return name;
}
public long position() {
return position;
}
public long length() {
return length;
}
public byte[] content() {
return content;
}
public int contentLength() {
return contentLength;
}
public RecoveryFileChunkRequest readFileChunk(StreamInput in) throws IOException {
RecoveryFileChunkRequest request = new RecoveryFileChunkRequest();
request.readFrom(in);
return request;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
name = in.readUTF();
position = in.readVLong();
length = in.readVLong();
contentLength = in.readVInt();
content = new byte[contentLength];
in.readFully(content);
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeUTF(name);
out.writeVLong(position);
out.writeVLong(length);
out.writeVInt(contentLength);
out.writeBytes(content, 0, contentLength);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
class RecoveryFinalizeRecoveryRequest implements Streamable {
private ShardId shardId;
RecoveryFinalizeRecoveryRequest() {
}
RecoveryFinalizeRecoveryRequest(ShardId shardId) {
this.shardId = shardId;
}
public ShardId shardId() {
return shardId;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
class RecoveryPrepareForTranslogOperationsRequest implements Streamable {
private ShardId shardId;
RecoveryPrepareForTranslogOperationsRequest() {
}
RecoveryPrepareForTranslogOperationsRequest(ShardId shardId) {
this.shardId = shardId;
}
public ShardId shardId() {
return shardId;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
class RecoveryResponse implements Streamable {
boolean retry = false;
List<String> phase1FileNames = Lists.newArrayList();
List<Long> phase1FileSizes = Lists.newArrayList();
List<String> phase1ExistingFileNames = Lists.newArrayList();
List<Long> phase1ExistingFileSizes = Lists.newArrayList();
long phase1TotalSize;
long phase1ExistingTotalSize;
long phase1Time;
long phase1ThrottlingWaitTime;
int phase2Operations;
long phase2Time;
int phase3Operations;
long phase3Time;
RecoveryResponse() {
}
@Override public void readFrom(StreamInput in) throws IOException {
retry = in.readBoolean();
int size = in.readVInt();
phase1FileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1FileNames.add(in.readUTF());
}
size = in.readVInt();
phase1FileSizes = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1FileSizes.add(in.readVLong());
}
size = in.readVInt();
phase1ExistingFileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileNames.add(in.readUTF());
}
size = in.readVInt();
phase1ExistingFileSizes = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileSizes.add(in.readVLong());
}
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
phase1Time = in.readVLong();
phase1ThrottlingWaitTime = in.readVLong();
phase2Operations = in.readVInt();
phase2Time = in.readVLong();
phase3Operations = in.readVInt();
phase3Time = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(retry);
out.writeVInt(phase1FileNames.size());
for (String name : phase1FileNames) {
out.writeUTF(name);
}
out.writeVInt(phase1FileSizes.size());
for (long size : phase1FileSizes) {
out.writeVLong(size);
}
out.writeVInt(phase1ExistingFileNames.size());
for (String name : phase1ExistingFileNames) {
out.writeUTF(name);
}
out.writeVInt(phase1ExistingFileSizes.size());
for (long size : phase1ExistingFileSizes) {
out.writeVLong(size);
}
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
out.writeVLong(phase1Time);
out.writeVLong(phase1ThrottlingWaitTime);
out.writeVInt(phase2Operations);
out.writeVLong(phase2Time);
out.writeVInt(phase3Operations);
out.writeVLong(phase3Time);
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
*
* @author kimchy (shay.banon)
*/
public class RecoverySource extends AbstractComponent {
public static class Actions {
public static final String START_RECOVERY = "index/shard/recovery/startRecovery";
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final IndicesService indicesService;
private final RecoveryThrottler recoveryThrottler;
private final ByteSizeValue fileChunkSize;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
RecoveryThrottler recoveryThrottler) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
this.recoveryThrottler = recoveryThrottler;
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
}
private RecoveryResponse recover(final StartRecoveryRequest request) {
if (!recoveryThrottler.tryRecovery(request.shardId(), "peer recovery source")) {
RecoveryResponse retry = new RecoveryResponse();
retry.retry = true;
return retry;
}
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
try {
logger.trace("starting recovery to {}, mark_as_relocated {}", request.targetNode(), request.markAsRelocated());
final RecoveryResponse response = new RecoveryResponse();
shard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
long existingTotalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
StoreFileMetaData md = shard.store().metaDataWithMd5(name);
boolean useExisting = false;
if (request.existingFiles.containsKey(name)) {
if (md.md5().equals(request.existingFiles.get(name).md5())) {
response.phase1ExistingFileNames.add(name);
response.phase1ExistingFileSizes.add(md.sizeInBytes());
existingTotalSize += md.sizeInBytes();
useExisting = true;
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5());
}
}
}
if (!useExisting) {
if (request.existingFiles.containsKey(name)) {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5());
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
}
response.phase1FileNames.add(name);
response.phase1FileSizes.add(md.sizeInBytes());
totalSize += md.sizeInBytes();
}
}
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
final AtomicLong throttlingWaitTime = new AtomicLong();
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) {
threadPool.execute(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(request.shardId(), name)) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
}
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = snapshot.getDirectory().openInput(name);
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
}
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead;
}
indexInput.close();
} catch (Exception e) {
lastException.set(e);
} finally {
recoveryThrottler.streamDone(request.shardId(), name);
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown();
}
}
});
}
latch.await();
if (lastException.get() != null) {
throw lastException.get();
}
// now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet();
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}], throttling_wait [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get()));
response.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
}
}
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
int totalOperations = sendSnapshot(snapshot);
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
}
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
shard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
}
}
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase3Time = stopWatch.totalTime().millis();
response.phase3Operations = totalOperations;
}
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
int translogBatchSize = 10; // TODO make this configurable
int counter = 0;
int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList();
while (snapshot.hasNext()) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
operations.add(snapshot.next());
totalOperations++;
if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
operations = Lists.newArrayList();
}
}
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet();
}
return totalOperations;
}
});
return response;
} finally {
recoveryThrottler.recoveryDone(request.shardId(), "peer recovery source");
}
}
class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler<StartRecoveryRequest> {
@Override public StartRecoveryRequest newInstance() {
return new StartRecoveryRequest();
}
@Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}
}
}

View File

@ -0,0 +1,342 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
*
* <p>Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and
* not several of them (since we don't allocate several shard replicas to the same node).
*
* @author kimchy (shay.banon)
*/
public class RecoveryTarget extends AbstractComponent {
public static class Actions {
public static final String FILE_CHUNK = "index/shard/recovery/fileChunk";
public static final String CLEAN_FILES = "index/shard/recovery/cleanFiles";
public static final String TRANSLOG_OPS = "index/shard/recovery/translogOps";
public static final String PREPARE_TRANSLOG = "index/shard/recovery/prepareTranslog";
public static final String FINALIZE = "index/shard/recovery/finalize";
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final IndicesService indicesService;
private final RecoveryThrottler recoveryThrottler;
private final ConcurrentMap<ShardId, OnGoingRecovery> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
@Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
IndicesLifecycle indicesLifecycle, RecoveryThrottler recoveryThrottler) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
this.recoveryThrottler = recoveryThrottler;
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
transportService.registerHandler(Actions.CLEAN_FILES, new CleanFilesRequestHandler());
transportService.registerHandler(Actions.PREPARE_TRANSLOG, new PrepareForTranslogOperationsRequestHandler());
transportService.registerHandler(Actions.TRANSLOG_OPS, new TranslogOperationsRequestHandler());
transportService.registerHandler(Actions.FINALIZE, new FinalizeRecoveryRequestHandler());
indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
@Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
removeAndCleanOnGoingRecovery(shardId);
}
});
}
public void startRecovery(final StartRecoveryRequest request, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery("No node to recovery from, retry on next cluster state update");
return;
}
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
// mark the shard as recovering
IndexShardState preRecoveryState;
try {
preRecoveryState = shard.recovering();
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
listener.onIgnoreRecovery("Already in recovering process, " + e.getMessage());
return;
}
final IndexShardState fPreRecoveryState = preRecoveryState;
threadPool.execute(new Runnable() {
@Override public void run() {
doRecovery(shard, fPreRecoveryState, request, listener);
}
});
}
private void doRecovery(final InternalIndexShard shard, final IndexShardState preRecoveryState, final StartRecoveryRequest request, final RecoveryListener listener) {
// we know we are on a thread, we can spin till we can engage in recovery
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("shard closed, stop recovery");
return;
}
shard.restoreRecoveryState(preRecoveryState);
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
onGoingRecoveries.put(request.shardId(), new OnGoingRecovery());
StopWatch stopWatch = new StopWatch().start();
RecoveryResponse recoveryStatus = transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet();
if (recoveryStatus.retry) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("shard closed, stop recovery");
return;
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
removeAndCleanOnGoingRecovery(request.shardId());
shard.restoreRecoveryState(preRecoveryState);
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().index().name()).append(']').append('[').append(request.shardId().id()).append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]")
.append("\n");
sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]");
logger.debug(sb.toString());
}
removeAndCleanOnGoingRecovery(request.shardId());
listener.onRecoveryDone();
} catch (Exception e) {
removeAndCleanOnGoingRecovery(request.shardId());
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("shard closed, stop recovery");
return;
}
logger.trace("[{}][{}] recovery from [{}] failed", e, request.shardId().index().name(), request.shardId().id(), request.sourceNode());
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof RecoveryEngineException) {
// unwrap an exception that was thrown as part of the recovery
cause = cause.getCause();
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
shard.restoreRecoveryState(preRecoveryState);
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
} finally {
recoveryThrottler.recoveryDone(shard.shardId(), "peer recovery target");
}
}
public static interface RecoveryListener {
void onRecoveryDone();
void onRetryRecovery(TimeValue retryAfter);
void onIgnoreRecovery(String reason);
void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure);
}
private void removeAndCleanOnGoingRecovery(ShardId shardId) {
// clean it from the on going recoveries since it is being closed
OnGoingRecovery onGoingRecovery = onGoingRecoveries.remove(shardId);
if (onGoingRecovery != null) {
// clean open index outputs
for (Map.Entry<String, IndexOutput> entry : onGoingRecovery.openIndexOutputs.entrySet()) {
synchronized (entry.getValue()) {
try {
entry.getValue().close();
} catch (IOException e) {
// ignore
}
}
}
}
}
static class OnGoingRecovery {
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
}
class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override public RecoveryPrepareForTranslogOperationsRequest newInstance() {
return new RecoveryPrepareForTranslogOperationsRequest();
}
@Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
shard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
class FinalizeRecoveryRequestHandler extends BaseTransportRequestHandler<RecoveryFinalizeRecoveryRequest> {
@Override public RecoveryFinalizeRecoveryRequest newInstance() {
return new RecoveryFinalizeRecoveryRequest();
}
@Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
shard.performRecoveryFinalization(false);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
class TranslogOperationsRequestHandler extends BaseTransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override public RecoveryTranslogOperationsRequest newInstance() {
return new RecoveryTranslogOperationsRequest();
}
@Override public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
shard.performRecoveryOperation(operation);
}
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
class CleanFilesRequestHandler extends BaseTransportRequestHandler<RecoveryCleanFilesRequest> {
@Override public RecoveryCleanFilesRequest newInstance() {
return new RecoveryCleanFilesRequest();
}
@Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (String existingFile : shard.store().directory().listAll()) {
if (!request.snapshotFiles().contains(existingFile)) {
shard.store().directory().deleteFile(existingFile);
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
class FileChunkTransportRequestHandler extends BaseTransportRequestHandler<RecoveryFileChunkRequest> {
@Override public RecoveryFileChunkRequest newInstance() {
return new RecoveryFileChunkRequest();
}
@Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
IndexOutput indexOutput;
if (request.position() == 0) {
// first request
indexOutput = onGoingRecovery.openIndexOutputs.remove(request.name());
if (indexOutput != null) {
try {
indexOutput.close();
} catch (IOException e) {
// ignore
}
}
indexOutput = shard.store().directory().createOutput(request.name());
onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput);
} else {
indexOutput = onGoingRecovery.openIndexOutputs.get(request.name());
}
synchronized (indexOutput) {
try {
indexOutput.writeBytes(request.content(), request.contentLength());
if (indexOutput.getFilePointer() == request.length()) {
// we are done
indexOutput.close();
onGoingRecovery.openIndexOutputs.remove(request.name());
}
} catch (IOException e) {
onGoingRecovery.openIndexOutputs.remove(request.name());
try {
indexOutput.close();
} catch (IOException e1) {
// ignore
}
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
class RecoveryTranslogOperationsRequest implements Streamable {
private ShardId shardId;
private List<Translog.Operation> operations;
RecoveryTranslogOperationsRequest() {
}
RecoveryTranslogOperationsRequest(ShardId shardId, List<Translog.Operation> operations) {
this.shardId = shardId;
this.operations = operations;
}
public ShardId shardId() {
return shardId;
}
public List<Translog.Operation> operations() {
return operations;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
int size = in.readVInt();
operations = Lists.newArrayListWithExpectedSize(size);
for (int i = 0; i < size; i++) {
operations.add(TranslogStreams.readTranslogOperation(in));
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeVInt(operations.size());
for (Translog.Operation operation : operations) {
TranslogStreams.writeTranslogOperation(out, operation);
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.IOException;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class StartRecoveryRequest implements Streamable {
private ShardId shardId;
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private boolean markAsRelocated;
Map<String, StoreFileMetaData> existingFiles;
StartRecoveryRequest() {
}
/**
* Start recovery request.
*
* @param shardId
* @param sourceNode The node to recover from
* @param targetNode Teh node to recover to
* @param markAsRelocated
* @param existingFiles
*/
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map<String, StoreFileMetaData> existingFiles) {
this.shardId = shardId;
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.markAsRelocated = markAsRelocated;
this.existingFiles = existingFiles;
}
public ShardId shardId() {
return shardId;
}
public DiscoveryNode sourceNode() {
return sourceNode;
}
public DiscoveryNode targetNode() {
return targetNode;
}
public boolean markAsRelocated() {
return markAsRelocated;
}
public Map<String, StoreFileMetaData> existingFiles() {
return existingFiles;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
sourceNode = DiscoveryNode.readNode(in);
targetNode = DiscoveryNode.readNode(in);
markAsRelocated = in.readBoolean();
int size = in.readVInt();
existingFiles = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in);
existingFiles.put(md.name(), md);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
sourceNode.writeTo(out);
targetNode.writeTo(out);
out.writeBoolean(markAsRelocated);
out.writeVInt(existingFiles.size());
for (StoreFileMetaData md : existingFiles.values()) {
md.writeTo(out);
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.memory.ByteBufferDirectory;
import org.elasticsearch.index.store.support.AbstractStore;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
@ -67,7 +68,31 @@ public abstract class FsStore extends AbstractStore {
String fsLock = componentSettings.get("fs_lock", "native");
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
lockFactory = new NativeFSLockFactory();
// TODO LUCENE MONITOR: this is not needed in next Lucene version
lockFactory = new NativeFSLockFactory() {
@Override public void clearLock(String lockName) throws IOException {
// Note that this isn't strictly required anymore
// because the existence of these files does not mean
// they are locked, but, still do this in case people
// really want to see the files go away:
if (lockDir.exists()) {
// Try to release the lock first - if it's held by another process, this
// method should not silently fail.
// NOTE: makeLock fixes the lock name by prefixing it w/ lockPrefix.
// Therefore it should be called before the code block next which prefixes
// the given name.
makeLock(lockName).release();
if (lockPrefix != null) {
lockName = lockPrefix + "-" + lockName;
}
// As mentioned above, we don't care if the deletion of the file failed.
new File(lockDir, lockName).delete();
}
}
};
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.indices;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.recovery.RecoverySource;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
@ -41,8 +43,13 @@ public class IndicesModule extends AbstractModule {
@Override protected void configure() {
bind(IndicesLifecycle.class).to(InternalIndicesLifecycle.class).asEagerSingleton();
bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton();
bind(RecoveryThrottler.class).asEagerSingleton();
bind(RecoveryTarget.class).asEagerSingleton();
bind(RecoverySource.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndicesMemoryCleaner.class).asEagerSingleton();
bind(IndexingMemoryBufferController.class).asEagerSingleton();

View File

@ -38,16 +38,18 @@ import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.gateway.IgnoreGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.IgnoreRecoveryException;
import org.elasticsearch.index.shard.recovery.RecoveryAction;
import org.elasticsearch.index.shard.recovery.RecoveryFailedException;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.index.shard.recovery.StartRecoveryRequest;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
@ -70,6 +72,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final ThreadPool threadPool;
private final RecoveryTarget recoveryTarget;
private final ShardStateAction shardStateAction;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
@ -79,13 +83,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeMappingCreatedAction nodeMappingCreatedAction;
@Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ThreadPool threadPool, RecoveryTarget recoveryTarget, ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.recoveryTarget = recoveryTarget;
this.shardStateAction = shardStateAction;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
@ -308,71 +313,104 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
return;
}
threadPool.execute(new Runnable() {
@Override public void run() {
// recheck here, since the cluster event can be called
if (indexShard.ignoreRecoveryAttempt()) {
return;
}
try {
RecoveryAction recoveryAction = indexService.shardInjectorSafe(shardId).getInstance(RecoveryAction.class);
if (!shardRouting.primary()) {
// recovery from primary
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
DiscoveryNode node = nodes.get(entry.currentNodeId());
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
recoveryAction.startRecovery(nodes.localNode(), node, false);
shardStateAction.shardStarted(shardRouting, "after recovery (backup) from node [" + node + "]");
} catch (IgnoreRecoveryException e) {
// that's fine, since we might be called concurrently, just ignore this
break;
}
break;
}
}
} else {
if (shardRouting.relocatingNodeId() == null) {
// we are the first primary, recover from the gateway
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
try {
shardGatewayService.recover();
shardStateAction.shardStarted(shardRouting, "after recovery from gateway");
} catch (IgnoreGatewayRecoveryException e) {
// that's fine, we might be called concurrently, just ignore this, we already recovered
}
} else {
// relocating primaries, recovery from the relocating shard
DiscoveryNode node = nodes.get(shardRouting.relocatingNodeId());
try {
// we mark the primary we are going to recover from as relocated at the end of phase 3
// so operations will start moving to the new primary
recoveryAction.startRecovery(nodes.localNode(), node, true);
shardStateAction.shardStarted(shardRouting, "after recovery (primary) from node [" + node + "]");
} catch (IgnoreRecoveryException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
}
}
}
} catch (Exception e) {
logger.warn("[{}][{}] failed to start shard", e, indexService.index().name(), shardRouting.id());
if (indexService.hasShard(shardId)) {
try {
indexService.cleanShard(shardId);
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.id());
}
}
if (!shardRouting.primary()) {
// recovery from primary
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
final DiscoveryNode sourceNode = nodes.get(entry.currentNodeId());
try {
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(e) + "]");
} catch (Exception e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().listWithMd5());
recoveryTarget.startRecovery(request, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
break;
}
break;
}
}
});
} else {
if (shardRouting.relocatingNodeId() == null) {
// we are the first primary, recover from the gateway
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
shardGatewayService.recover(new IndexShardGatewayService.RecoveryListener() {
@Override public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, "after recovery from gateway");
}
@Override public void onIgnoreRecovery(String reason) {
}
@Override public void onRecoveryFailed(IndexShardGatewayRecoveryException e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
}
});
} else {
// relocating primaries, recovery from the relocating shard
final DiscoveryNode sourceNode = nodes.get(shardRouting.relocatingNodeId());
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().listWithMd5());
recoveryTarget.startRecovery(request, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e);
}
}
}
}
private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {
private final StartRecoveryRequest request;
private final ShardRouting shardRouting;
private final IndexService indexService;
private PeerRecoveryListener(StartRecoveryRequest request, ShardRouting shardRouting, IndexService indexService) {
this.request = request;
this.shardRouting = shardRouting;
this.indexService = indexService;
}
@Override public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, "after recovery (backup) from node [" + request.sourceNode() + "]");
}
@Override public void onRetryRecovery(TimeValue retryAfter) {
threadPool.schedule(new Runnable() {
@Override public void run() {
recoveryTarget.startRecovery(request, PeerRecoveryListener.this);
}
}, retryAfter);
}
@Override public void onIgnoreRecovery(String reason) {
}
@Override public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(indexService, shardRouting, sendShardFailure, e);
}
}
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.cleanShard(shardRouting.shardId().id());
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
if (sendShardFailure) {
try {
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Exception e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
}
}
}
}