Throttling of recovery (both gateway recovery and peer node recovery), closes #176.

This commit is contained in:
kimchy 2010-05-16 23:37:56 +03:00
parent 2301b0b13d
commit 216dda3f9c
16 changed files with 448 additions and 191 deletions

View File

@ -44,6 +44,8 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
private int waitForRelocatingShards = -1;
private int waitForActiveShards = -1;
ClusterHealthRequest() {
}
@ -95,6 +97,15 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
return this;
}
public int waitForActiveShards() {
return waitForActiveShards;
}
public ClusterHealthRequest waitForActiveShards(int waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
@Override public ActionRequestValidationException validate() {
return null;
}
@ -115,6 +126,7 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
}
waitForRelocatingShards = in.readInt();
waitForActiveShards = in.readInt();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -135,5 +147,6 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
out.writeByte(waitForStatus.value());
}
out.writeInt(waitForRelocatingShards);
out.writeInt(waitForActiveShards);
}
}

View File

@ -65,13 +65,16 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
}
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request) throws ElasticSearchException {
int waitFor = 2;
int waitFor = 3;
if (request.waitForStatus() == null) {
waitFor--;
}
if (request.waitForRelocatingShards() == -1) {
waitFor--;
}
if (request.waitForActiveShards() == -1) {
waitFor--;
}
if (waitFor == 0) {
// no need to wait for anything
return clusterHealth(request);
@ -86,6 +89,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
if (request.waitForRelocatingShards() != -1 && response.relocatingShards() <= request.waitForRelocatingShards()) {
waitForCounter++;
}
if (request.waitForActiveShards() != -1 && response.activeShards() >= request.waitForActiveShards()) {
waitForCounter++;
}
if (waitForCounter == waitFor) {
return response;
}

View File

@ -235,11 +235,13 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
private long version;
private int numberOfFiles;
private SizeValue totalSize;
private TimeValue throttlingWaitTime;
public Index(long version, int numberOfFiles, SizeValue totalSize) {
public Index(long version, int numberOfFiles, SizeValue totalSize, TimeValue throttlingWaitTime) {
this.version = version;
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.throttlingWaitTime = throttlingWaitTime;
}
public long version() {
@ -253,6 +255,10 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
public SizeValue totalSize() {
return totalSize;
}
public TimeValue throttlingWaitTime() {
return throttlingWaitTime;
}
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.StopWatch;
import org.elasticsearch.util.TimeValue;
@ -58,6 +59,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private final Store store;
private final RecoveryThrottler recoveryThrottler;
private volatile long lastIndexVersion;
@ -73,12 +76,13 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings,
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway,
Store store) {
Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexShard = (InternalIndexShard) indexShard;
this.shardGateway = shardGateway;
this.store = store;
this.recoveryThrottler = recoveryThrottler;
this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true);
this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10));
@ -99,6 +103,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (!indexShard.routingEntry().primary()) {
throw new ElasticSearchIllegalStateException("Trying to recover when the shard is in backup state");
}
// clear the store, we are going to recover into it
try {
store.deleteContent();
@ -106,29 +111,49 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
logger.debug("Failed to delete store before recovery from gateway", e);
}
indexShard.recovering();
logger.debug("Starting recovery from {}", shardGateway);
StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = recoveryStatus.translog().translogId();
lastTranslogSize = recoveryStatus.translog().numberOfOperations();
StopWatch throttlingWaitTime = new StopWatch().start();
// we know we are on a thread, we can spin till we can engage in recovery
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
throw new IgnoreGatewayRecoveryException(shardId, "Interrupted while waiting for recovery, but we should ignore ...");
}
// we got interrupted, mark it as failed
throw new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e);
}
}
throttlingWaitTime.stop();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start();
try {
logger.debug("Starting recovery from {}", shardGateway);
StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = recoveryStatus.translog().translogId();
lastTranslogSize = recoveryStatus.translog().numberOfOperations();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start();
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" Index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" Translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
scheduleSnapshotIfNeeded();
} finally {
recoveryThrottler.recoveryDone(shardId, "gateway");
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("]\n");
sb.append(" Index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("]\n");
sb.append(" Translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
scheduleSnapshotIfNeeded();
} else {
throw new IgnoreGatewayRecoveryException(shardId, "Already recovered");
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
@ -48,6 +49,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.translog.TranslogStreams.*;
@ -64,6 +66,8 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
private final ThreadPool threadPool;
private final RecoveryThrottler recoveryThrottler;
private final Store store;
private final File location;
@ -72,11 +76,14 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
private final File locationTranslog;
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway, IndexShard indexShard, Store store) {
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexShard = (InternalIndexShard) indexShard;
this.store = store;
this.recoveryThrottler = recoveryThrottler;
this.location = new File(fsIndexGateway.indexGatewayHome(), Integer.toString(shardId.id()));
this.locationIndex = new File(location, "index");
this.locationTranslog = new File(location, "translog");
@ -157,10 +164,11 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
}
threadPool.execute(new Runnable() {
@Override public void run() {
File copyTo = new File(locationIndex, fileName);
try {
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, new File(locationIndex, fileName));
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, copyTo);
} catch (Exception e) {
lastException.set(e);
lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e));
} finally {
latch.countDown();
}
@ -200,7 +208,9 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
translogTime = System.currentTimeMillis() - time;
} catch (Exception e) {
try {
translogRaf.close();
if (translogRaf != null) {
translogRaf.close();
}
} catch (IOException e1) {
// ignore
}
@ -303,15 +313,22 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
File[] files = locationIndex.listFiles();
final CountDownLatch latch = new CountDownLatch(files.length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final AtomicLong throttlingWaitTime = new AtomicLong();
for (final File file : files) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(shardId, file.getName())) {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
copyToDirectory(file, store.directory(), file.getName());
} catch (Exception e) {
logger.debug("Failed to read [" + file + "] into [" + store + "]", e);
lastException.set(e);
} finally {
recoveryThrottler.streamDone(shardId, file.getName());
latch.countDown();
}
}
@ -339,7 +356,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, files.length, new SizeValue(totalSize, SizeUnit.BYTES));
return new RecoveryStatus.Index(version, files.length, new SizeValue(totalSize, SizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
@ -46,7 +47,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
// in the none case, we simply start the shard
indexShard.start();
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)));
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES), TimeValue.timeValueMillis(0)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)));
}
@Override public SnapshotStatus snapshot(Snapshot snapshot) {

View File

@ -52,7 +52,10 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
IndexShard createShard(int sShardId) throws ElasticSearchException;
void deleteShard(int shardId) throws ElasticSearchException;
/**
* Cleans the shard locally, does not touch the gateway!.
*/
void cleanShard(int shardId) throws ElasticSearchException;
int numberOfShards();

View File

@ -171,7 +171,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
@Override public synchronized void close(boolean delete) {
try {
for (int shardId : shardIds()) {
deleteShard(shardId, delete);
deleteShard(shardId, delete, delete);
}
} finally {
indicesLifecycle.removeListener(cleanCacheOnIndicesLifecycleListener);
@ -234,11 +234,11 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexShard;
}
@Override public synchronized void deleteShard(int shardId) throws ElasticSearchException {
deleteShard(shardId, true);
@Override public synchronized void cleanShard(int shardId) throws ElasticSearchException {
deleteShard(shardId, true, false);
}
private synchronized void deleteShard(int shardId, boolean delete) throws ElasticSearchException {
private synchronized void deleteShard(int shardId, boolean delete, boolean deleteGateway) throws ElasticSearchException {
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
Injector shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
@ -268,7 +268,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
RecoveryAction recoveryAction = shardInjector.getInstance(RecoveryAction.class);
if (recoveryAction != null) recoveryAction.close();
shardInjector.getInstance(IndexShardGatewayService.class).close(delete);
shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway);
indexShard.close();

View File

@ -35,12 +35,12 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.memory.MemorySnapshot;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.StopWatch;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.CloseableComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.io.stream.StreamInput;
@ -56,9 +56,11 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.*;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.ConcurrentCollections.*;
/**
@ -76,6 +78,8 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
private final Store store;
private final RecoveryThrottler recoveryThrottler;
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = newConcurrentMap();
private final String startTransportAction;
@ -94,12 +98,14 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
private final CopyOnWriteArrayList<Future> sendFileChunksRecoveryFutures = new CopyOnWriteArrayList<Future>();
@Inject public RecoveryAction(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, TransportService transportService, IndexShard indexShard, Store store) {
@Inject public RecoveryAction(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, TransportService transportService,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indexShard = (InternalIndexShard) indexShard;
this.store = store;
this.recoveryThrottler = recoveryThrottler;
startTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/start";
transportService.registerHandler(startTransportAction, new StartRecoveryTransportRequestHandler());
@ -154,30 +160,63 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
} catch (IndexShardClosedException e) {
throw new IgnoreRecoveryException("Can't recover a closed shard.", e);
}
// we know we are on a thread, we can spin till we can engage in recovery
StopWatch throttlingWaitTime = new StopWatch().start();
while (!recoveryThrottler.tryRecovery(shardId, "peer recovery target")) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
throw new IgnoreRecoveryException("Interrupted while waiting for recovery, but we should ignore ...");
}
// we got interrupted, mark it as failed
throw new RecoveryFailedException(shardId, node, targetNode, e);
}
}
throttlingWaitTime.stop();
logger.debug("Starting recovery from {}", targetNode);
StopWatch stopWatch = new StopWatch().start();
try {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed");
}
RecoveryStatus recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler<RecoveryStatus>() {
@Override public RecoveryStatus newInstance() {
return new RecoveryStatus();
StopWatch stopWatch = null;
RecoveryStatus recoveryStatus = null;
boolean retry = true;
while (retry) {
stopWatch = new StopWatch().start();
recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler<RecoveryStatus>() {
@Override public RecoveryStatus newInstance() {
return new RecoveryStatus();
}
}).txGet();
retry = recoveryStatus.retry;
if (retry) {
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
throw new IgnoreRecoveryException("Interrupted while waiting for remote recovery, but we should ignore ...");
}
// we got interrupted, mark it as failed
throw new RecoveryFailedException(shardId, node, targetNode, e);
}
}
}).txGet();
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(targetNode).append(", took[").append(stopWatch.totalTime()).append("]\n");
sb.append("Recovery completed from ").append(targetNode).append(", took[").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" Phase1: recovered [").append(recoveryStatus.phase1FileNames.size()).append("]")
.append(" files with total size of [").append(new SizeValue(recoveryStatus.phase1TotalSize)).append("]")
.append(", took [").append(new TimeValue(recoveryStatus.phase1Time, MILLISECONDS)).append("]")
.append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" Phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(new TimeValue(recoveryStatus.phase2Time, MILLISECONDS)).append("]")
.append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]")
.append("\n");
sb.append(" Phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(new TimeValue(recoveryStatus.phase3Time, MILLISECONDS)).append("]");
.append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]");
logger.debug(sb.toString());
}
} catch (RemoteTransportException e) {
@ -203,6 +242,8 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
throw new IgnoreRecoveryException("Recovery closed", e);
}
throw new RecoveryFailedException(shardId, node, targetNode, e);
} finally {
recoveryThrottler.recoveryDone(shardId, "peer recovery target");
}
} finally {
sendStartRecoveryThread = null;
@ -254,154 +295,175 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
@Override public void messageReceived(final StartRecoveryRequest startRecoveryRequest, final TransportChannel channel) throws Exception {
logger.trace("Starting recovery to {}, markAsRelocated {}", startRecoveryRequest.node, startRecoveryRequest.markAsRelocated);
final DiscoveryNode node = startRecoveryRequest.node;
cleanOpenIndex();
final RecoveryStatus recoveryStatus = new RecoveryStatus();
indexShard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
if (!recoveryThrottler.tryRecovery(shardId, "peer recovery source")) {
RecoveryStatus retry = new RecoveryStatus();
retry.retry = true;
channel.sendResponse(retry);
return;
}
try {
logger.trace("Starting recovery to {}, markAsRelocated {}", startRecoveryRequest.node, startRecoveryRequest.markAsRelocated);
final DiscoveryNode node = startRecoveryRequest.node;
cleanOpenIndex();
final RecoveryStatus recoveryStatus = new RecoveryStatus();
indexShard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
IndexInput indexInput = store.directory().openInput(name);
recoveryStatus.phase1FileNames.add(name);
recoveryStatus.phase1FileSizes.add(indexInput.length());
totalSize += indexInput.length();
indexInput.close();
}
recoveryStatus.phase1TotalSize = totalSize;
logger.trace("Recovery [phase1] to {}: recovering [{}] files with total size of [{}]", node, snapshot.getFiles().length, new SizeValue(totalSize));
final CountDownLatch latch = new CountDownLatch(snapshot.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : snapshot.getFiles()) {
sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = store.directory().openInput(name);
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(node, fileChunkTransportAction, new FileChunk(name, position, len, buf, toRead), VoidTransportResponseHandler.INSTANCE).txGet(30, SECONDS);
readCount += toRead;
}
indexInput.close();
} catch (Exception e) {
lastException.set(e);
} finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown();
}
}
}));
}
latch.await();
if (lastException.get() != null) {
throw lastException.get();
}
stopWatch.stop();
logger.trace("Recovery [phase1] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase1Time = stopWatch.totalTime().millis();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted while recovering files");
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e);
} finally {
sendFileChunksRecoveryFutures.clear();
}
}
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, false);
stopWatch.stop();
logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally {
sendSnapshotRecoveryThread = null;
}
}
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, true);
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
indexShard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
for (String name : snapshot.getFiles()) {
IndexInput indexInput = store.directory().openInput(name);
recoveryStatus.phase1FileNames.add(name);
recoveryStatus.phase1FileSizes.add(indexInput.length());
totalSize += indexInput.length();
indexInput.close();
}
}
stopWatch.stop();
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally {
sendSnapshotRecoveryThread = null;
}
}
recoveryStatus.phase1TotalSize = totalSize;
private void sendSnapshot(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException {
MemorySnapshot memorySnapshot;
if (snapshot instanceof MemorySnapshot) {
memorySnapshot = (MemorySnapshot) snapshot;
} else {
memorySnapshot = new MemorySnapshot(snapshot);
final AtomicLong throttlingWaitTime = new AtomicLong();
logger.trace("Recovery [phase1] to {}: recovering [{}] files with total size of [{}]", node, snapshot.getFiles().length, new SizeValue(totalSize));
final CountDownLatch latch = new CountDownLatch(snapshot.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : snapshot.getFiles()) {
sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(shardId, name)) {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = store.directory().openInput(name);
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(node, fileChunkTransportAction, new FileChunk(name, position, len, buf, toRead), VoidTransportResponseHandler.INSTANCE).txGet(120, SECONDS);
readCount += toRead;
}
indexInput.close();
} catch (Exception e) {
lastException.set(e);
} finally {
recoveryThrottler.streamDone(shardId, name);
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown();
}
}
}));
}
latch.await();
if (lastException.get() != null) {
throw lastException.get();
}
stopWatch.stop();
logger.trace("Recovery [phase1] to {}: took [{}], throttling_wait [{}]", node, stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get()));
recoveryStatus.phase1Time = stopWatch.totalTime().millis();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted while recovering files");
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new SizeValue(totalSize), e);
} finally {
sendFileChunksRecoveryFutures.clear();
}
}
transportService.submitRequest(node, snapshotTransportAction, new SnapshotWrapper(memorySnapshot, phase3), VoidTransportResponseHandler.INSTANCE).txGet();
}
});
channel.sendResponse(recoveryStatus);
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, false);
stopWatch.stop();
logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally {
sendSnapshotRecoveryThread = null;
}
}
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
sendSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, true);
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
indexShard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
}
}
stopWatch.stop();
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted in phase 2 files");
} finally {
sendSnapshotRecoveryThread = null;
}
}
private void sendSnapshot(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException {
MemorySnapshot memorySnapshot;
if (snapshot instanceof MemorySnapshot) {
memorySnapshot = (MemorySnapshot) snapshot;
} else {
memorySnapshot = new MemorySnapshot(snapshot);
}
transportService.submitRequest(node, snapshotTransportAction, new SnapshotWrapper(memorySnapshot, phase3), VoidTransportResponseHandler.INSTANCE).txGet();
}
});
channel.sendResponse(recoveryStatus);
} finally {
recoveryThrottler.recoveryDone(shardId, "peer recovery source");
}
}
}
private static class RecoveryStatus implements Streamable {
boolean retry = false;
List<String> phase1FileNames = new ArrayList<String>();
List<Long> phase1FileSizes = new ArrayList<Long>();
long phase1TotalSize;
long phase1Time;
long phase1ThrottlingWaitTime;
int phase2Operations;
long phase2Time;
@ -413,6 +475,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
@Override public void readFrom(StreamInput in) throws IOException {
retry = in.readBoolean();
int size = in.readVInt();
phase1FileNames = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
@ -425,6 +488,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
phase1TotalSize = in.readVLong();
phase1Time = in.readVLong();
phase1ThrottlingWaitTime = in.readVLong();
phase2Operations = in.readVInt();
phase2Time = in.readVLong();
phase3Operations = in.readVInt();
@ -432,6 +496,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(retry);
out.writeVInt(phase1FileNames.size());
for (String name : phase1FileNames) {
out.writeUTF(name);
@ -442,6 +507,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1Time);
out.writeVLong(phase1ThrottlingWaitTime);
out.writeVInt(phase2Operations);
out.writeVLong(phase2Time);
out.writeVInt(phase3Operations);

View File

@ -19,8 +19,9 @@
package org.elasticsearch.indices;
import org.elasticsearch.util.inject.AbstractModule;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.util.inject.AbstractModule;
import org.elasticsearch.util.settings.Settings;
/**
@ -37,6 +38,7 @@ public class IndicesModule extends AbstractModule {
@Override protected void configure() {
bind(IndicesLifecycle.class).to(InternalIndicesLifecycle.class).asEagerSingleton();
bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton();
bind(RecoveryThrottler.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndicesMemoryCleaner.class).asEagerSingleton();
}

View File

@ -153,7 +153,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("Index [{}]: Deleting shard [{}]", index, existingShardId);
}
indexService.deleteShard(existingShardId);
indexService.cleanShard(existingShardId);
}
}
}
@ -290,7 +290,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Exception e) {
logger.warn("Failed to create shard for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e);
try {
indexService.deleteShard(shardId);
indexService.cleanShard(shardId);
} catch (IndexShardMissingException e1) {
// ignore
} catch (Exception e1) {
@ -361,7 +361,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.warn("Failed to start shard for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e);
if (indexService.hasShard(shardId)) {
try {
indexService.deleteShard(shardId);
indexService.cleanShard(shardId);
} catch (Exception e1) {
logger.warn("Failed to delete shard after failed startup for index [" + indexService.index().name() + "] and shard id [" + shardRouting.id() + "]", e1);
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery.throttler;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class RecoveryThrottler extends AbstractComponent {
private final Object concurrentRecoveryMutex = new Object();
private final int concurrentRecoveries;
private final TimeValue throttleInterval;
private volatile int onGoingRecoveries = 0;
private final int concurrentStreams;
private volatile int onGoingStreams = 0;
private final Object concurrentStreamsMutex = new Object();
@Inject public RecoveryThrottler(Settings settings) {
super(settings);
concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors());
concurrentStreams = componentSettings.getAsInt("concurrent_streams", Runtime.getRuntime().availableProcessors());
throttleInterval = componentSettings.getAsTime("interval", TimeValue.timeValueMillis(100));
logger.debug("concurrent_recoveries [{}], concurrent_streams [{}] interval [{}]", concurrentRecoveries, concurrentStreams, throttleInterval);
}
public boolean tryRecovery(ShardId shardId, String reason) {
synchronized (concurrentRecoveryMutex) {
if (onGoingRecoveries + 1 > concurrentRecoveries) {
return false;
}
onGoingRecoveries++;
logger.trace("Recovery allowed for [{}], on going [{}], allowed [{}], reason [{}]", shardId, onGoingRecoveries, concurrentRecoveries, reason);
return true;
}
}
public void recoveryDone(ShardId shardId, String reason) {
synchronized (concurrentRecoveryMutex) {
--onGoingRecoveries;
logger.trace("Recovery done for [{}], on going [{}], allowed [{}], reason [{}]", shardId, onGoingRecoveries, concurrentRecoveries, reason);
}
}
public int onGoingRecoveries() {
return onGoingRecoveries;
}
public boolean tryStream(ShardId shardId, String streamName) {
synchronized (concurrentStreamsMutex) {
if (onGoingStreams + 1 > concurrentStreams) {
return false;
}
onGoingStreams++;
logger.trace("Stream [{}] allowed for [{}], on going [{}], allowed [{}]", streamName, shardId, onGoingStreams, concurrentStreams);
return true;
}
}
public void streamDone(ShardId shardId, String streamName) {
synchronized (concurrentStreamsMutex) {
--onGoingStreams;
logger.trace("Stream [{}] done for [{}], on going [{}], allowed [{}]", streamName, shardId, onGoingStreams, concurrentStreams);
}
}
public int onGoingStreams() {
return onGoingStreams;
}
public TimeValue throttleInterval() {
return throttleInterval;
}
}

View File

@ -56,6 +56,7 @@ public class RestClusterHealthAction extends BaseRestHandler {
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase()));
}
clusterHealthRequest.waitForRelocatingShards(request.paramAsInt("wait_for_relocating_shards", clusterHealthRequest.waitForRelocatingShards()));
clusterHealthRequest.waitForActiveShards(request.paramAsInt("wait_for_active_shards", clusterHealthRequest.waitForActiveShards()));
String sLevel = request.param("level");
if (sLevel != null) {
if ("cluster".equals("sLevel")) {

View File

@ -66,9 +66,8 @@ public class FetchPhase implements SearchPhase {
FieldSelector fieldSelector = buildFieldSelectors(context);
InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()];
int index = 0;
for (int docIdIdx = context.docIdsToLoadFrom(); docIdIdx < context.docIdsToLoadSize(); docIdIdx++) {
int docId = context.docIdsToLoad()[docIdIdx];
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
Document doc = loadDocument(context, fieldSelector, docId);
Uid uid = extractUid(context, doc);

View File

@ -78,10 +78,12 @@ public class TransportSearchFailuresTests extends AbstractNodesTests {
Thread.sleep(500);
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth("test").waitForYellowStatus().waitForRelocatingShards(0)).actionGet();
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth("test")
.waitForYellowStatus().waitForRelocatingShards(0).waitForActiveShards(6)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(clusterHealth.activeShards(), equalTo(6));
refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
assertThat(refreshResponse.totalShards(), equalTo(9));

View File

@ -36,6 +36,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
@ -61,6 +62,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.translog.TranslogStreams.*;
@ -74,6 +76,8 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
private final ThreadPool threadPool;
private final RecoveryThrottler recoveryThrottler;
private final Store store;
private final Location shardLocation;
@ -91,10 +95,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
private volatile int currentTranslogPartToWrite = 1;
@Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, ThreadPool threadPool,
Store store, CloudIndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
Store store, RecoveryThrottler recoveryThrottler, CloudIndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
this.threadPool = threadPool;
this.recoveryThrottler = recoveryThrottler;
this.store = store;
this.blobStoreContext = blobStoreService.context();
@ -345,15 +350,22 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
final CountDownLatch latch = new CountDownLatch(filesMetaDatas.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final AtomicLong throttlingWaitTime = new AtomicLong();
for (final Map.Entry<String, StorageMetadata> entry : filesMetaDatas.entrySet()) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(shardId, entry.getKey())) {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
copyToDirectory(entry.getValue(), allMetaDatas);
} catch (Exception e) {
logger.debug("Failed to read [" + entry.getKey() + "] into [" + store + "]", e);
lastException.set(e);
} finally {
recoveryThrottler.streamDone(shardId, entry.getKey());
latch.countDown();
}
}
@ -379,7 +391,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, filesMetaDatas.size(), new SizeValue(totalSize, SizeUnit.BYTES));
return new RecoveryStatus.Index(version, filesMetaDatas.size(), new SizeValue(totalSize, SizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {