refactor how throttling is done, instead of doing it after a shard is allocated to a node, and then wait till its allowed to recover, do it on the allocation level, and don't allocate a shard to a node that has N number of recoveries going on it

This commit is contained in:
kimchy 2010-08-22 02:47:34 +03:00
parent 7592862646
commit aa28b93610
32 changed files with 483 additions and 534 deletions

View File

@ -30,11 +30,10 @@ public class GatewayRecoveryStatus {
public enum Stage {
INIT((byte) 0),
THROTTLE((byte) 1),
INDEX((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
INDEX((byte) 1),
TRANSLOG((byte) 2),
FINALIZE((byte) 3),
DONE((byte) 4);
private final byte value;
@ -50,14 +49,12 @@ public class GatewayRecoveryStatus {
if (value == 0) {
return INIT;
} else if (value == 1) {
return THROTTLE;
} else if (value == 2) {
return INDEX;
} else if (value == 3) {
} else if (value == 2) {
return TRANSLOG;
} else if (value == 4) {
} else if (value == 3) {
return FINALIZE;
} else if (value == 5) {
} else if (value == 4) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
@ -70,10 +67,6 @@ public class GatewayRecoveryStatus {
final long time;
final long throttlingTime;
final long indexThrottlingTime;
final long indexSize;
final long reusedIndexSize;
@ -82,13 +75,11 @@ public class GatewayRecoveryStatus {
final long recoveredTranslogOperations;
public GatewayRecoveryStatus(Stage stage, long startTime, long time, long throttlingTime, long indexThrottlingTime, long indexSize, long reusedIndexSize,
public GatewayRecoveryStatus(Stage stage, long startTime, long time, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.throttlingTime = throttlingTime;
this.indexThrottlingTime = indexThrottlingTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
@ -115,22 +106,6 @@ public class GatewayRecoveryStatus {
return time();
}
public TimeValue throttlingTime() {
return TimeValue.timeValueMillis(throttlingTime);
}
public TimeValue getThrottlingTime() {
return throttlingTime();
}
public TimeValue indexThrottlingTime() {
return TimeValue.timeValueMillis(indexThrottlingTime);
}
public TimeValue getIndexThrottlingTime() {
return indexThrottlingTime();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}

View File

@ -30,11 +30,10 @@ public class PeerRecoveryStatus {
public enum Stage {
INIT((byte) 0),
THROTTLE((byte) 1),
INDEX((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
INDEX((byte) 1),
TRANSLOG((byte) 2),
FINALIZE((byte) 3),
DONE((byte) 4);
private final byte value;
@ -50,14 +49,12 @@ public class PeerRecoveryStatus {
if (value == 0) {
return INIT;
} else if (value == 1) {
return THROTTLE;
} else if (value == 2) {
return INDEX;
} else if (value == 3) {
} else if (value == 2) {
return TRANSLOG;
} else if (value == 4) {
} else if (value == 3) {
return FINALIZE;
} else if (value == 5) {
} else if (value == 4) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
@ -70,8 +67,6 @@ public class PeerRecoveryStatus {
final long time;
final long throttlingTime;
final long indexSize;
final long reusedIndexSize;
@ -80,12 +75,11 @@ public class PeerRecoveryStatus {
final long recoveredTranslogOperations;
public PeerRecoveryStatus(Stage stage, long startTime, long time, long throttlingTime, long indexSize, long reusedIndexSize,
public PeerRecoveryStatus(Stage stage, long startTime, long time, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.throttlingTime = throttlingTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
@ -112,14 +106,6 @@ public class PeerRecoveryStatus {
return time();
}
public TimeValue throttlingTime() {
return TimeValue.timeValueMillis(throttlingTime);
}
public TimeValue getThrottlingTime() {
return throttlingTime();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}

View File

@ -227,7 +227,6 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeByte(peerRecoveryStatus.stage.value());
out.writeVLong(peerRecoveryStatus.startTime);
out.writeVLong(peerRecoveryStatus.time);
out.writeVLong(peerRecoveryStatus.throttlingTime);
out.writeVLong(peerRecoveryStatus.indexSize);
out.writeVLong(peerRecoveryStatus.reusedIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredIndexSize);
@ -241,8 +240,6 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeByte(gatewayRecoveryStatus.stage.value());
out.writeVLong(gatewayRecoveryStatus.startTime);
out.writeVLong(gatewayRecoveryStatus.time);
out.writeVLong(gatewayRecoveryStatus.throttlingTime);
out.writeVLong(gatewayRecoveryStatus.indexThrottlingTime);
out.writeVLong(gatewayRecoveryStatus.indexSize);
out.writeVLong(gatewayRecoveryStatus.reusedIndexSize);
out.writeVLong(gatewayRecoveryStatus.recoveredIndexSize);
@ -278,12 +275,12 @@ public class ShardStatus extends BroadcastShardOperationResponse {
}
if (in.readBoolean()) {
peerRecoveryStatus = new PeerRecoveryStatus(PeerRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
if (in.readBoolean()) {
gatewayRecoveryStatus = new GatewayRecoveryStatus(GatewayRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
if (in.readBoolean()) {

View File

@ -177,9 +177,6 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
case TRANSLOG:
stage = PeerRecoveryStatus.Stage.TRANSLOG;
break;
case THROTTLE:
stage = PeerRecoveryStatus.Stage.THROTTLE;
break;
case FINALIZE:
stage = PeerRecoveryStatus.Stage.FINALIZE;
break;
@ -190,7 +187,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
stage = PeerRecoveryStatus.Stage.INIT;
}
shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(),
peerRecoveryStatus.retryTime(), peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(),
peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(),
peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations());
}
@ -208,17 +205,14 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
case TRANSLOG:
stage = GatewayRecoveryStatus.Stage.TRANSLOG;
break;
case THROTTLE:
stage = GatewayRecoveryStatus.Stage.THROTTLE;
break;
case DONE:
stage = GatewayRecoveryStatus.Stage.DONE;
break;
default:
stage = GatewayRecoveryStatus.Stage.INIT;
}
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(), gatewayRecoveryStatus.retryTime(),
gatewayRecoveryStatus.index().retryTime(), gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().existingTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations());
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(),
gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().existingTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations());
}
SnapshotStatus snapshotStatus = gatewayService.snapshotStatus();

View File

@ -43,6 +43,7 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
this(settings, ImmutableSet.<NodeAllocation>builder()
.add(new SameShardNodeAllocation(settings))
.add(new ReplicaAfterPrimaryActiveNodeAllocation(settings))
.add(new ThrottlingNodeAllocation(settings))
.build()
);
}

View File

@ -34,6 +34,7 @@ public class ShardAllocationModule extends AbstractModule {
Multibinder<NodeAllocation> decidersBinder = Multibinder.newSetBinder(binder(), NodeAllocation.class);
decidersBinder.addBinding().to(SameShardNodeAllocation.class);
decidersBinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class);
decidersBinder.addBinding().to(ThrottlingNodeAllocation.class);
bind(NodeAllocations.class).asEagerSingleton();
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class ThrottlingNodeAllocation extends AbstractComponent implements NodeAllocation {
private final int concurrentRecoveries;
@Inject public ThrottlingNodeAllocation(Settings settings) {
super(settings);
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
if (shardRouting.primary()) {
boolean primaryUnassigned = false;
for (MutableShardRouting shard : routingNodes.unassigned()) {
if (shard.shardId().equals(shardRouting.shardId())) {
primaryUnassigned = true;
}
}
if (primaryUnassigned) {
// primary is unassigned, means we are going to do recovery from gateway
// count *just the primary* currently doing recovery on the node and check against concurrent_recoveries
int primariesInRecovery = 0;
for (MutableShardRouting shard : node) {
if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary()) {
primariesInRecovery++;
}
}
if (primariesInRecovery >= concurrentRecoveries) {
return Decision.THROTTLE;
} else {
return Decision.YES;
}
}
}
// either primary or replica doing recovery (from peer shard)
// count the number of recoveries on the node, its for both target (INITIALIZING) and source (RELOCATING)
int currentRecoveries = 0;
for (MutableShardRouting shard : node) {
if (shard.state() == ShardRoutingState.INITIALIZING || shard.state() == ShardRoutingState.RELOCATING) {
currentRecoveries++;
}
}
if (currentRecoveries >= concurrentRecoveries) {
return Decision.THROTTLE;
} else {
return Decision.YES;
}
}
}

View File

@ -283,6 +283,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
// this engine always acts as if waitForOperations=true
if (refreshMutex.compareAndSet(false, true)) {
IndexWriter currentWriter = indexWriter;
if (currentWriter == null) {
throw new EngineClosedException(shardId);
}
try {
if (dirty) {
dirty = false;
@ -298,7 +301,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
} catch (AlreadyClosedException e) {
// an index writer got replaced on us, ignore
} catch (Exception e) {
if (currentWriter != indexWriter) {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
} else if (currentWriter != indexWriter) {
// an index writer got replaced on us, ignore
} else {
throw new RefreshFailedEngineException(shardId, e);
@ -313,12 +318,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
@Override public void flush(Flush flush) throws EngineException {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
// check outside the lock as well so we can check without blocking on the write lock
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
rwl.writeLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
@ -361,6 +372,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (optimizeMutex.compareAndSet(false, true)) {
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
int maxNumberOfSegments = optimize.maxNumSegments();
if (maxNumberOfSegments == -1) {
// not set, optimize down to half the configured number of segments

View File

@ -34,7 +34,6 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
@ -56,8 +55,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private final Store store;
private final RecoveryThrottler recoveryThrottler;
private volatile long lastIndexVersion;
@ -75,13 +72,12 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings,
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway,
Store store, RecoveryThrottler recoveryThrottler) {
Store store) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexShard = (InternalIndexShard) indexShard;
this.shardGateway = shardGateway;
this.store = store;
this.recoveryThrottler = recoveryThrottler;
this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true);
this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10));
@ -146,27 +142,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
recoveryStatus = new RecoveryStatus();
recoveryStatus.updateStage(RecoveryStatus.Stage.INIT);
// we know we are on a thread, we can spin till we can engage in recovery
while (!recoveryThrottler.tryGatewayRecovery(shardId, "gateway")) {
if (indexShard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("ignoring recovery while waiting on retry, closed");
return;
}
recoveryStatus.updateStage(RecoveryStatus.Stage.THROTTLE);
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
recoveryStatus.retryTime(System.currentTimeMillis() - recoveryStatus.startTime());
} catch (InterruptedException e) {
recoveryStatus = null;
if (indexShard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore since closed");
} else {
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e));
}
return;
}
}
try {
logger.debug("starting recovery from {} ...", shardGateway);
shardGateway.recover(recoveryStatus);
@ -188,8 +163,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("], retry_time [").append(TimeValue.timeValueMillis(recoveryStatus.retryTime())).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("], throttling_wait [").append(TimeValue.timeValueMillis(recoveryStatus.index().retryTime())).append("]\n");
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().existingTotalSize())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.translog().time())).append("]");
logger.debug(sb.toString());
@ -214,8 +189,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
listener.onIgnoreRecovery("shard closed");
} catch (Exception e) {
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e));
} finally {
recoveryThrottler.recoveryGatewayDone(shardId, "gateway");
}
}
});

View File

@ -28,7 +28,6 @@ public class RecoveryStatus {
public static enum Stage {
INIT,
THROTTLE,
INDEX,
TRANSLOG,
DONE
@ -38,8 +37,6 @@ public class RecoveryStatus {
private long startTime = System.currentTimeMillis();
private long retryTime = 0;
private long time;
private Index index = new Index();
@ -63,14 +60,6 @@ public class RecoveryStatus {
this.startTime = startTime;
}
public long retryTime() {
return this.retryTime;
}
public void retryTime(long retryTime) {
this.retryTime = retryTime;
}
public long time() {
return this.time;
}
@ -126,7 +115,6 @@ public class RecoveryStatus {
private long totalSize = 0;
private int numberOfExistingFiles = 0;
private long existingTotalSize = 0;
private AtomicLong retryTime = new AtomicLong();
private AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
@ -172,14 +160,6 @@ public class RecoveryStatus {
return this.existingTotalSize;
}
public void addRetryTime(long delta) {
retryTime.addAndGet(delta);
}
public long retryTime() {
return this.retryTime.get();
}
public void updateVersion(long version) {
this.version = version;
}

View File

@ -51,7 +51,6 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import javax.annotation.Nullable;
@ -81,8 +80,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final Store store;
protected final RecoveryThrottler recoveryThrottler;
protected final ByteSizeValue chunkSize;
protected final BlobStore blobStore;
@ -104,13 +101,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
private volatile SnapshotStatus currentSnapshotStatus;
protected BlobStoreIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
IndexShard indexShard, Store store) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexShard = (InternalIndexShard) indexShard;
this.store = store;
this.recoveryThrottler = recoveryThrottler;
BlobStoreIndexGateway blobStoreIndexGateway = (BlobStoreIndexGateway) indexGateway;
@ -571,23 +567,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final BlobMetaData fileToRecover : filesToRecover) {
if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) {
// we managed to get a recovery going
recoverFile(fileToRecover, indicesBlobs, latch, failures);
} else {
// lets reschedule to do it next time
threadPool.schedule(new Runnable() {
@Override public void run() {
recoveryStatus.index().addRetryTime(recoveryThrottler.throttleInterval().millis());
if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) {
// we managed to get a recovery going
recoverFile(fileToRecover, indicesBlobs, latch, failures);
} else {
threadPool.schedule(this, recoveryThrottler.throttleInterval());
}
}
}, recoveryThrottler.throttleInterval());
}
recoverFile(fileToRecover, indicesBlobs, latch, failures);
}
try {
@ -632,7 +612,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
try {
indexOutput = store.directory().createOutput(fileToRecover.name());
} catch (IOException e) {
recoveryThrottler.streamDone(shardId, fileToRecover.name());
failures.add(e);
latch.countDown();
return;
@ -645,7 +624,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
if (!blobs.containsKey(firstFileToRecover)) {
// no file, what to do, what to do?
recoveryThrottler.streamDone(shardId, fileToRecover.name());
logger.warn("no file [{}] to recover, even though it has md5, ignoring it", fileToRecover.name());
latch.countDown();
return;
@ -681,12 +659,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
logger.warn("file [{}] has different md5, actual read content [{}], store [{}]", fileToRecover.name(), md5, fileToRecover.md5());
}
recoveryThrottler.streamDone(shardId, fileToRecover.name());
latch.countDown();
}
@Override public void onFailure(Throwable t) {
recoveryThrottler.streamDone(shardId, fileToRecover.name());
failures.add(t);
latch.countDown();
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -36,8 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool;
public class FsIndexShardGateway extends BlobStoreIndexShardGateway {
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store, recoveryThrottler);
IndexShard indexShard, Store store) {
super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store);
}
@Override public String type() {

View File

@ -32,7 +32,6 @@ import java.util.List;
*/
class RecoveryResponse implements Streamable {
boolean retry = false;
List<String> phase1FileNames = Lists.newArrayList();
List<Long> phase1FileSizes = Lists.newArrayList();
List<String> phase1ExistingFileNames = Lists.newArrayList();
@ -52,7 +51,6 @@ class RecoveryResponse implements Streamable {
}
@Override public void readFrom(StreamInput in) throws IOException {
retry = in.readBoolean();
int size = in.readVInt();
phase1FileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
@ -86,7 +84,6 @@ class RecoveryResponse implements Streamable {
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(retry);
out.writeVInt(phase1FileNames.size());
for (String name : phase1FileNames) {
out.writeUTF(name);

View File

@ -38,7 +38,6 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -46,11 +45,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
@ -69,8 +65,6 @@ public class RecoverySource extends AbstractComponent {
private final IndicesService indicesService;
private final RecoveryThrottler recoveryThrottler;
private final ByteSizeValue fileChunkSize;
@ -78,13 +72,11 @@ public class RecoverySource extends AbstractComponent {
private final int translogBatchSize;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
RecoveryThrottler recoveryThrottler) {
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
this.recoveryThrottler = recoveryThrottler;
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.translogBatchSize = componentSettings.getAsInt("translog_batch_size", 100);
@ -94,196 +86,175 @@ public class RecoverySource extends AbstractComponent {
}
private RecoveryResponse recover(final StartRecoveryRequest request) {
if (!recoveryThrottler.tryPeerRecovery(request.shardId(), "peer recovery source")) {
RecoveryResponse retry = new RecoveryResponse();
retry.retry = true;
return retry;
}
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
try {
logger.trace("starting recovery to {}, mark_as_relocated {}", request.targetNode(), request.markAsRelocated());
final RecoveryResponse response = new RecoveryResponse();
shard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
long existingTotalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
logger.trace("starting recovery to {}, mark_as_relocated {}", request.targetNode(), request.markAsRelocated());
final RecoveryResponse response = new RecoveryResponse();
shard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
long existingTotalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
StoreFileMetaData md = shard.store().metaDataWithMd5(name);
boolean useExisting = false;
for (String name : snapshot.getFiles()) {
StoreFileMetaData md = shard.store().metaDataWithMd5(name);
boolean useExisting = false;
if (request.existingFiles.containsKey(name)) {
if (md.md5().equals(request.existingFiles.get(name).md5())) {
response.phase1ExistingFileNames.add(name);
response.phase1ExistingFileSizes.add(md.sizeInBytes());
existingTotalSize += md.sizeInBytes();
useExisting = true;
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5());
}
}
}
if (!useExisting) {
if (request.existingFiles.containsKey(name)) {
if (md.md5().equals(request.existingFiles.get(name).md5())) {
response.phase1ExistingFileNames.add(name);
response.phase1ExistingFileSizes.add(md.sizeInBytes());
existingTotalSize += md.sizeInBytes();
useExisting = true;
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5());
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5());
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
}
response.phase1FileNames.add(name);
response.phase1FileSizes.add(md.sizeInBytes());
totalSize += md.sizeInBytes();
}
}
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes,
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = snapshot.getDirectory().openInput(name);
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
}
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead),
TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead;
}
indexInput.close();
} catch (Exception e) {
lastException.set(e);
} finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown();
}
}
if (!useExisting) {
if (request.existingFiles.containsKey(name)) {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5());
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
}
response.phase1FileNames.add(name);
response.phase1FileSizes.add(md.sizeInBytes());
totalSize += md.sizeInBytes();
}
}
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
});
}
final AtomicLong throttlingWaitTime = new AtomicLong();
latch.await();
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
if (lastException.get() != null) {
throw lastException.get();
}
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes,
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet();
// now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(request.shardId(), name)) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
}
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
}
}
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
indexInput = snapshot.getDirectory().openInput(name);
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
}
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead),
TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead;
}
indexInput.close();
} catch (Exception e) {
lastException.set(e);
} finally {
recoveryThrottler.streamDone(request.shardId(), name);
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown();
}
}
});
}
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
latch.await();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
if (lastException.get() != null) {
throw lastException.get();
}
int totalOperations = sendSnapshot(snapshot);
// now, set the clean files request
Set<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet();
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
}
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}], throttling_wait [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get()));
response.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
shard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
}
}
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase3Time = stopWatch.totalTime().millis();
response.phase3Operations = totalOperations;
}
@Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
int counter = 0;
int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList();
while (snapshot.hasNext()) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
int totalOperations = sendSnapshot(snapshot);
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase2Time = stopWatch.totalTime().millis();
response.phase2Operations = totalOperations;
}
@Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
StopWatch stopWatch = new StopWatch().start();
int totalOperations = sendSnapshot(snapshot);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet();
if (request.markAsRelocated()) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
shard.relocated();
} catch (IllegalIndexShardStateException e) {
// we can ignore this exception since, on the other node, when it moved to phase3
// it will also send shard started, which might cause the index shard we work against
// to move be closed by the time we get to the the relocated method
}
}
stopWatch.stop();
logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
response.phase3Time = stopWatch.totalTime().millis();
response.phase3Operations = totalOperations;
}
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
int counter = 0;
int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList();
while (snapshot.hasNext()) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
operations.add(snapshot.next());
totalOperations++;
if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
operations = Lists.newArrayList();
}
}
// send the leftover
if (!operations.isEmpty()) {
operations.add(snapshot.next());
totalOperations++;
if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
operations = Lists.newArrayList();
}
return totalOperations;
}
});
return response;
} finally {
recoveryThrottler.recoveryPeerDone(request.shardId(), "peer recovery source");
}
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
}
return totalOperations;
}
});
return response;
}
class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler<StartRecoveryRequest> {

View File

@ -33,7 +33,6 @@ public class RecoveryStatus {
public static enum Stage {
INIT,
THROTTLE,
INDEX,
TRANSLOG,
FINALIZE,
@ -44,7 +43,6 @@ public class RecoveryStatus {
final long startTime = System.currentTimeMillis();
long time;
volatile long retryTime = 0;
List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
@ -64,10 +62,6 @@ public class RecoveryStatus {
return this.time;
}
public long retryTime() {
return retryTime;
}
public long phase1TotalSize() {
return phase1TotalSize;
}

View File

@ -40,7 +40,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -76,17 +75,14 @@ public class RecoveryTarget extends AbstractComponent {
private final IndicesService indicesService;
private final RecoveryThrottler recoveryThrottler;
private final ConcurrentMap<ShardId, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
@Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
IndicesLifecycle indicesLifecycle, RecoveryThrottler recoveryThrottler) {
IndicesLifecycle indicesLifecycle) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
this.recoveryThrottler = recoveryThrottler;
transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler());
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
@ -167,13 +163,6 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecoveries.put(request.shardId(), recovery);
}
if (!recoveryThrottler.tryPeerRecovery(shard.shardId(), "peer recovery target")) {
recovery.stage = RecoveryStatus.Stage.THROTTLE;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
try {
logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode());
@ -183,15 +172,8 @@ public class RecoveryTarget extends AbstractComponent {
return new RecoveryResponse();
}
}).txGet();
if (recoveryStatus.retry) {
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
recovery.stage = RecoveryStatus.Stage.THROTTLE;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
if (shard.state() == IndexShardState.CLOSED) {
listener.onIgnoreRecovery(false, "shard closed, stop recovery");
return;
}
stopWatch.stop();
@ -231,9 +213,7 @@ public class RecoveryTarget extends AbstractComponent {
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
recovery.stage = RecoveryStatus.Stage.THROTTLE;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
listener.onRetryRecovery(TimeValue.timeValueMillis(500));
return;
}
@ -251,8 +231,6 @@ public class RecoveryTarget extends AbstractComponent {
}
listener.onRecoveryFailure(new RecoveryFailedException(request, e), true);
} finally {
recoveryThrottler.recoveryPeerDone(shard.shardId(), "peer recovery target");
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -80,6 +81,8 @@ public class TranslogService extends AbstractIndexShardComponent {
logger.trace("flushing translog, operations [{}], breached [{}]", currentSize, flushThreshold);
try {
indexShard.flush(new Engine.Flush());
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Exception e) {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
/**
@ -46,7 +45,6 @@ public class IndicesModule extends AbstractModule {
bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton();
bind(RecoveryThrottler.class).asEagerSingleton();
bind(RecoveryTarget.class).asEagerSingleton();
bind(RecoverySource.class).asEagerSingleton();

View File

@ -1,147 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery.throttler;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
/**
* Recovery Throttler allows to throttle recoveries (both gateway and peer).
*
* @author kimchy (shay.banon)
*/
public class RecoveryThrottler extends AbstractComponent {
private final Object concurrentRecoveryMutex = new Object();
private final int concurrentRecoveries;
private final TimeValue throttleInterval;
private volatile int onGoingGatewayRecoveries = 0;
private volatile int onGoingPeerRecoveries = 0;
private final int concurrentStreams;
private volatile int onGoingStreams = 0;
private final Object concurrentStreamsMutex = new Object();
@Inject public RecoveryThrottler(Settings settings) {
super(settings);
int defaultConcurrentRecoveries = Runtime.getRuntime().availableProcessors() + 1;
// tap it at 10 (is it a good number?)
if (defaultConcurrentRecoveries > 10) {
defaultConcurrentRecoveries = 10;
} else if (defaultConcurrentRecoveries < 3) {
defaultConcurrentRecoveries = 3;
}
concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", defaultConcurrentRecoveries);
concurrentStreams = componentSettings.getAsInt("concurrent_streams", defaultConcurrentRecoveries * 2);
throttleInterval = componentSettings.getAsTime("interval", TimeValue.timeValueMillis(100));
logger.debug("concurrent_recoveries [{}], concurrent_streams [{}] interval [{}]", concurrentRecoveries, concurrentStreams, throttleInterval);
}
/**
* Try and check if gateway recovery is allowed. Only takes the on going gateway recoveries into account. Ignore
* on going peer recoveries so peer recovery will not block a much more important gateway recovery.
*/
public boolean tryGatewayRecovery(ShardId shardId, String reason) {
synchronized (concurrentRecoveryMutex) {
if ((onGoingGatewayRecoveries + 1) > concurrentRecoveries) {
return false;
}
onGoingGatewayRecoveries++;
logger.trace("Recovery (gateway) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
return true;
}
}
/**
* Mark gateway recvoery as done.
*/
public void recoveryGatewayDone(ShardId shardId, String reason) {
synchronized (concurrentRecoveryMutex) {
--onGoingGatewayRecoveries;
logger.trace("Recovery (gateway) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
}
}
/**
* Try and check if peer recovery is allowed. Takes into account both on going gateway recovery and peer recovery.
*/
public boolean tryPeerRecovery(ShardId shardId, String reason) {
synchronized (concurrentRecoveryMutex) {
if ((onGoingGatewayRecoveries + onGoingPeerRecoveries + 1) > concurrentRecoveries) {
return false;
}
onGoingPeerRecoveries++;
logger.trace("Recovery (peer) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
return true;
}
}
/**
* Mark peer recovery as done.
*/
public void recoveryPeerDone(ShardId shardId, String reason) {
synchronized (concurrentRecoveryMutex) {
--onGoingPeerRecoveries;
logger.trace("Recovery (peer) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason);
}
}
public int onGoingRecoveries() {
return onGoingGatewayRecoveries + onGoingPeerRecoveries;
}
public boolean tryStream(ShardId shardId, String streamName) {
synchronized (concurrentStreamsMutex) {
if (onGoingStreams + 1 > concurrentStreams) {
return false;
}
onGoingStreams++;
logger.trace("Stream [{}] allowed for [{}], on going [{}], allowed [{}]", streamName, shardId, onGoingStreams, concurrentStreams);
return true;
}
}
public void streamDone(ShardId shardId, String streamName) {
synchronized (concurrentStreamsMutex) {
--onGoingStreams;
logger.trace("Stream [{}] done for [{}], on going [{}], allowed [{}]", streamName, shardId, onGoingStreams, concurrentStreams);
}
}
public int onGoingStreams() {
return onGoingStreams;
}
public TimeValue throttleInterval() {
return throttleInterval;
}
}

View File

@ -145,8 +145,6 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("start_time_in_millis", peerRecoveryStatus.startTime());
builder.field("time", peerRecoveryStatus.time());
builder.field("time_in_millis", peerRecoveryStatus.time().millis());
builder.field("throttling_time", peerRecoveryStatus.throttlingTime());
builder.field("throttling_time_in_millis", peerRecoveryStatus.throttlingTime().millis());
builder.startObject("index");
builder.field("progress", peerRecoveryStatus.indexRecoveryProgress());
@ -174,8 +172,6 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("start_time_in_millis", gatewayRecoveryStatus.startTime());
builder.field("time", gatewayRecoveryStatus.time());
builder.field("time_in_millis", gatewayRecoveryStatus.time().millis());
builder.field("throttling_time", gatewayRecoveryStatus.throttlingTime());
builder.field("throttling_time_in_millis", gatewayRecoveryStatus.throttlingTime().millis());
builder.startObject("index");
builder.field("progress", gatewayRecoveryStatus.indexRecoveryProgress());
@ -187,8 +183,6 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("expected_recovered_size_in_bytes", gatewayRecoveryStatus.expectedRecoveredIndexSize().bytes());
builder.field("recovered_size", gatewayRecoveryStatus.recoveredIndexSize());
builder.field("recovered_size_in_bytes", gatewayRecoveryStatus.recoveredIndexSize().bytes());
builder.field("throttling_time", gatewayRecoveryStatus.indexThrottlingTime());
builder.field("throttling_time_in_millis", gatewayRecoveryStatus.indexThrottlingTime().millis());
builder.endObject();
builder.startObject("translog");

View File

@ -36,6 +36,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -47,7 +48,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class);
@Test public void testElectReplicaAsPrimaryDuringRelocation() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -38,6 +38,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -50,7 +51,7 @@ public class FailedShardsRoutingTests {
private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class);
@Test public void testFailures() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -163,7 +164,7 @@ public class FailedShardsRoutingTests {
}
@Test public void test10ShardsWith1ReplicaFailure() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -46,7 +47,7 @@ public class PrimaryElectionRoutingTests {
private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class);
@Test public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -36,6 +36,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -47,8 +48,7 @@ public class RebalanceAfterActiveTests {
private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class);
@Test public void testRebalanceOnlyAfterAllShardsAreActive() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -46,8 +47,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
private final ESLogger logger = Loggers.getLogger(ReplicaAllocatedAfterPrimaryTests.class);
@Test public void testBackupIsAllocatedAfterPrimary() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -43,6 +43,7 @@ import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -54,7 +55,7 @@ public class SingleShardNoReplicasRoutingTests {
private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class);
@Test public void testSingleIndexStartedShard() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -154,7 +155,7 @@ public class SingleShardNoReplicasRoutingTests {
}
@Test public void testSingleIndexShardFailed() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -203,7 +204,7 @@ public class SingleShardNoReplicasRoutingTests {
}
@Test public void testMultiIndexEvenDistribution() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
final int numberOfIndices = 50;
logger.info("Building initial routing table with " + numberOfIndices + " indices");
@ -311,7 +312,7 @@ public class SingleShardNoReplicasRoutingTests {
}
@Test public void testMultiIndexUnevenNodes() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
final int numberOfIndices = 10;
logger.info("Building initial routing table with " + numberOfIndices + " indices");

View File

@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -46,7 +47,7 @@ public class SingleShardOneReplicaRoutingTests {
private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class);
@Test public void testSingleIndexFirstStartPrimaryThenBackups() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -46,7 +47,7 @@ public class TenShardsOneReplicaRoutingTests {
private final ESLogger logger = Loggers.getLogger(TenShardsOneReplicaRoutingTests.class);
@Test public void testSingleIndexFirstStartPrimaryThenBackups() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -0,0 +1,174 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class ThrottlingAllocationTests {
private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class);
@Test public void testPrimaryRecoveryThrottling() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(10).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("start one node, do reroute, only 3 should initialize");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(17));
logger.info("start initializing, another 3 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(14));
logger.info("start initializing, another 3 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(6));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(11));
logger.info("start initializing, another 1 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(9));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(10));
logger.info("start initializing, all primaries should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(10));
}
@Test public void testReplicaAndPrimaryRecoveryThrottling() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("start one node, do reroute, only 3 should initialize");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(7));
logger.info("start initializing, another 2 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(2));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(5));
logger.info("start initializing, all primaries should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(5));
logger.info("start another node, replicas should start being allocated");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(2));
logger.info("start initializing replicas");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(2));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
logger.info("start initializing replicas, all should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
}
private DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
}

View File

@ -17,6 +17,7 @@ import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.RoutingTable.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -28,7 +29,7 @@ public class UpdateNumberOfReplicasTests {
private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class);
@Test public void testUpdateNumberOfReplicas() {
ShardsAllocation strategy = new ShardsAllocation();
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -27,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -36,8 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool;
public class S3IndexShardGateway extends BlobStoreIndexShardGateway {
@Inject public S3IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings, threadPool, indexGateway, indexShard, store, recoveryThrottler);
IndexShard indexShard, Store store) {
super(shardId, indexSettings, threadPool, indexGateway, indexShard, store);
}
@Override public String type() {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -36,8 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool;
public class HdfsIndexShardGateway extends BlobStoreIndexShardGateway {
@Inject public HdfsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway hdfsIndexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings, threadPool, hdfsIndexGateway, indexShard, store, recoveryThrottler);
IndexShard indexShard, Store store) {
super(shardId, indexSettings, threadPool, hdfsIndexGateway, indexShard, store);
}
@Override public String type() {