diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewayRecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewayRecoveryStatus.java index 494ce367af2..101b3bc8403 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewayRecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewayRecoveryStatus.java @@ -30,11 +30,10 @@ public class GatewayRecoveryStatus { public enum Stage { INIT((byte) 0), - THROTTLE((byte) 1), - INDEX((byte) 2), - TRANSLOG((byte) 3), - FINALIZE((byte) 4), - DONE((byte) 5); + INDEX((byte) 1), + TRANSLOG((byte) 2), + FINALIZE((byte) 3), + DONE((byte) 4); private final byte value; @@ -50,14 +49,12 @@ public class GatewayRecoveryStatus { if (value == 0) { return INIT; } else if (value == 1) { - return THROTTLE; - } else if (value == 2) { return INDEX; - } else if (value == 3) { + } else if (value == 2) { return TRANSLOG; - } else if (value == 4) { + } else if (value == 3) { return FINALIZE; - } else if (value == 5) { + } else if (value == 4) { return DONE; } throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']'); @@ -70,10 +67,6 @@ public class GatewayRecoveryStatus { final long time; - final long throttlingTime; - - final long indexThrottlingTime; - final long indexSize; final long reusedIndexSize; @@ -82,13 +75,11 @@ public class GatewayRecoveryStatus { final long recoveredTranslogOperations; - public GatewayRecoveryStatus(Stage stage, long startTime, long time, long throttlingTime, long indexThrottlingTime, long indexSize, long reusedIndexSize, + public GatewayRecoveryStatus(Stage stage, long startTime, long time, long indexSize, long reusedIndexSize, long recoveredIndexSize, long recoveredTranslogOperations) { this.stage = stage; this.startTime = startTime; this.time = time; - this.throttlingTime = throttlingTime; - this.indexThrottlingTime = indexThrottlingTime; this.indexSize = indexSize; this.reusedIndexSize = reusedIndexSize; this.recoveredIndexSize = recoveredIndexSize; @@ -115,22 +106,6 @@ public class GatewayRecoveryStatus { return time(); } - public TimeValue throttlingTime() { - return TimeValue.timeValueMillis(throttlingTime); - } - - public TimeValue getThrottlingTime() { - return throttlingTime(); - } - - public TimeValue indexThrottlingTime() { - return TimeValue.timeValueMillis(indexThrottlingTime); - } - - public TimeValue getIndexThrottlingTime() { - return indexThrottlingTime(); - } - public ByteSizeValue indexSize() { return new ByteSizeValue(indexSize); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/PeerRecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/PeerRecoveryStatus.java index 1c622ac574f..4e361a3734e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/PeerRecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/PeerRecoveryStatus.java @@ -30,11 +30,10 @@ public class PeerRecoveryStatus { public enum Stage { INIT((byte) 0), - THROTTLE((byte) 1), - INDEX((byte) 2), - TRANSLOG((byte) 3), - FINALIZE((byte) 4), - DONE((byte) 5); + INDEX((byte) 1), + TRANSLOG((byte) 2), + FINALIZE((byte) 3), + DONE((byte) 4); private final byte value; @@ -50,14 +49,12 @@ public class PeerRecoveryStatus { if (value == 0) { return INIT; } else if (value == 1) { - return THROTTLE; - } else if (value == 2) { return INDEX; - } else if (value == 3) { + } else if (value == 2) { return TRANSLOG; - } else if (value == 4) { + } else if (value == 3) { return FINALIZE; - } else if (value == 5) { + } else if (value == 4) { return DONE; } throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']'); @@ -70,8 +67,6 @@ public class PeerRecoveryStatus { final long time; - final long throttlingTime; - final long indexSize; final long reusedIndexSize; @@ -80,12 +75,11 @@ public class PeerRecoveryStatus { final long recoveredTranslogOperations; - public PeerRecoveryStatus(Stage stage, long startTime, long time, long throttlingTime, long indexSize, long reusedIndexSize, + public PeerRecoveryStatus(Stage stage, long startTime, long time, long indexSize, long reusedIndexSize, long recoveredIndexSize, long recoveredTranslogOperations) { this.stage = stage; this.startTime = startTime; this.time = time; - this.throttlingTime = throttlingTime; this.indexSize = indexSize; this.reusedIndexSize = reusedIndexSize; this.recoveredIndexSize = recoveredIndexSize; @@ -112,14 +106,6 @@ public class PeerRecoveryStatus { return time(); } - public TimeValue throttlingTime() { - return TimeValue.timeValueMillis(throttlingTime); - } - - public TimeValue getThrottlingTime() { - return throttlingTime(); - } - public ByteSizeValue indexSize() { return new ByteSizeValue(indexSize); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java index 3efe330fb31..05dcdd2a000 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java @@ -227,7 +227,6 @@ public class ShardStatus extends BroadcastShardOperationResponse { out.writeByte(peerRecoveryStatus.stage.value()); out.writeVLong(peerRecoveryStatus.startTime); out.writeVLong(peerRecoveryStatus.time); - out.writeVLong(peerRecoveryStatus.throttlingTime); out.writeVLong(peerRecoveryStatus.indexSize); out.writeVLong(peerRecoveryStatus.reusedIndexSize); out.writeVLong(peerRecoveryStatus.recoveredIndexSize); @@ -241,8 +240,6 @@ public class ShardStatus extends BroadcastShardOperationResponse { out.writeByte(gatewayRecoveryStatus.stage.value()); out.writeVLong(gatewayRecoveryStatus.startTime); out.writeVLong(gatewayRecoveryStatus.time); - out.writeVLong(gatewayRecoveryStatus.throttlingTime); - out.writeVLong(gatewayRecoveryStatus.indexThrottlingTime); out.writeVLong(gatewayRecoveryStatus.indexSize); out.writeVLong(gatewayRecoveryStatus.reusedIndexSize); out.writeVLong(gatewayRecoveryStatus.recoveredIndexSize); @@ -278,12 +275,12 @@ public class ShardStatus extends BroadcastShardOperationResponse { } if (in.readBoolean()) { peerRecoveryStatus = new PeerRecoveryStatus(PeerRecoveryStatus.Stage.fromValue(in.readByte()), - in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); } if (in.readBoolean()) { gatewayRecoveryStatus = new GatewayRecoveryStatus(GatewayRecoveryStatus.Stage.fromValue(in.readByte()), - in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); } if (in.readBoolean()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index f255e84bb93..0fd4b626087 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -177,9 +177,6 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct case TRANSLOG: stage = PeerRecoveryStatus.Stage.TRANSLOG; break; - case THROTTLE: - stage = PeerRecoveryStatus.Stage.THROTTLE; - break; case FINALIZE: stage = PeerRecoveryStatus.Stage.FINALIZE; break; @@ -190,7 +187,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct stage = PeerRecoveryStatus.Stage.INIT; } shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(), - peerRecoveryStatus.retryTime(), peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(), + peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(), peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations()); } @@ -208,17 +205,14 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct case TRANSLOG: stage = GatewayRecoveryStatus.Stage.TRANSLOG; break; - case THROTTLE: - stage = GatewayRecoveryStatus.Stage.THROTTLE; - break; case DONE: stage = GatewayRecoveryStatus.Stage.DONE; break; default: stage = GatewayRecoveryStatus.Stage.INIT; } - shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(), gatewayRecoveryStatus.retryTime(), - gatewayRecoveryStatus.index().retryTime(), gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().existingTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations()); + shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(), + gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().existingTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations()); } SnapshotStatus snapshotStatus = gatewayService.snapshotStatus(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index fb185f74001..f336417a5c4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -43,6 +43,7 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation this(settings, ImmutableSet.builder() .add(new SameShardNodeAllocation(settings)) .add(new ReplicaAfterPrimaryActiveNodeAllocation(settings)) + .add(new ThrottlingNodeAllocation(settings)) .build() ); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java index 8ff9101e90c..20b619d1f3f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java @@ -34,6 +34,7 @@ public class ShardAllocationModule extends AbstractModule { Multibinder decidersBinder = Multibinder.newSetBinder(binder(), NodeAllocation.class); decidersBinder.addBinding().to(SameShardNodeAllocation.class); decidersBinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class); + decidersBinder.addBinding().to(ThrottlingNodeAllocation.class); bind(NodeAllocations.class).asEagerSingleton(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java new file mode 100644 index 00000000000..cc15fb8d70c --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +/** + * @author kimchy (shay.banon) + */ +public class ThrottlingNodeAllocation extends AbstractComponent implements NodeAllocation { + + private final int concurrentRecoveries; + + @Inject public ThrottlingNodeAllocation(Settings settings) { + super(settings); + + this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1); + } + + @Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) { + return false; + } + + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { + if (shardRouting.primary()) { + boolean primaryUnassigned = false; + for (MutableShardRouting shard : routingNodes.unassigned()) { + if (shard.shardId().equals(shardRouting.shardId())) { + primaryUnassigned = true; + } + } + if (primaryUnassigned) { + // primary is unassigned, means we are going to do recovery from gateway + // count *just the primary* currently doing recovery on the node and check against concurrent_recoveries + int primariesInRecovery = 0; + for (MutableShardRouting shard : node) { + if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary()) { + primariesInRecovery++; + } + } + if (primariesInRecovery >= concurrentRecoveries) { + return Decision.THROTTLE; + } else { + return Decision.YES; + } + } + } + + // either primary or replica doing recovery (from peer shard) + + // count the number of recoveries on the node, its for both target (INITIALIZING) and source (RELOCATING) + int currentRecoveries = 0; + for (MutableShardRouting shard : node) { + if (shard.state() == ShardRoutingState.INITIALIZING || shard.state() == ShardRoutingState.RELOCATING) { + currentRecoveries++; + } + } + + if (currentRecoveries >= concurrentRecoveries) { + return Decision.THROTTLE; + } else { + return Decision.YES; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index d61fb97aeb0..9ee97656e48 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -283,6 +283,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, // this engine always acts as if waitForOperations=true if (refreshMutex.compareAndSet(false, true)) { IndexWriter currentWriter = indexWriter; + if (currentWriter == null) { + throw new EngineClosedException(shardId); + } try { if (dirty) { dirty = false; @@ -298,7 +301,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } catch (AlreadyClosedException e) { // an index writer got replaced on us, ignore } catch (Exception e) { - if (currentWriter != indexWriter) { + if (indexWriter == null) { + throw new EngineClosedException(shardId); + } else if (currentWriter != indexWriter) { // an index writer got replaced on us, ignore } else { throw new RefreshFailedEngineException(shardId, e); @@ -313,12 +318,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } @Override public void flush(Flush flush) throws EngineException { + if (indexWriter == null) { + throw new EngineClosedException(shardId); + } // check outside the lock as well so we can check without blocking on the write lock if (disableFlushCounter > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } rwl.writeLock().lock(); try { + if (indexWriter == null) { + throw new EngineClosedException(shardId); + } if (disableFlushCounter > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } @@ -361,6 +372,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, if (optimizeMutex.compareAndSet(false, true)) { rwl.readLock().lock(); try { + if (indexWriter == null) { + throw new EngineClosedException(shardId); + } int maxNumberOfSegments = optimize.maxNumSegments(); if (maxNumberOfSegments == -1) { // not set, optimize down to half the configured number of segments diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index ae257684134..91cdcd2bcc4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -34,7 +34,6 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.ScheduledFuture; @@ -56,8 +55,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem private final Store store; - private final RecoveryThrottler recoveryThrottler; - private volatile long lastIndexVersion; @@ -75,13 +72,12 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem @Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway, - Store store, RecoveryThrottler recoveryThrottler) { + Store store) { super(shardId, indexSettings); this.threadPool = threadPool; this.indexShard = (InternalIndexShard) indexShard; this.shardGateway = shardGateway; this.store = store; - this.recoveryThrottler = recoveryThrottler; this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true); this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10)); @@ -146,27 +142,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem recoveryStatus = new RecoveryStatus(); recoveryStatus.updateStage(RecoveryStatus.Stage.INIT); - // we know we are on a thread, we can spin till we can engage in recovery - while (!recoveryThrottler.tryGatewayRecovery(shardId, "gateway")) { - if (indexShard.state() == IndexShardState.CLOSED) { - listener.onIgnoreRecovery("ignoring recovery while waiting on retry, closed"); - return; - } - recoveryStatus.updateStage(RecoveryStatus.Stage.THROTTLE); - try { - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - recoveryStatus.retryTime(System.currentTimeMillis() - recoveryStatus.startTime()); - } catch (InterruptedException e) { - recoveryStatus = null; - if (indexShard.state() == IndexShardState.CLOSED) { - listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore since closed"); - } else { - listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e)); - } - return; - } - } - try { logger.debug("starting recovery from {} ...", shardGateway); shardGateway.recover(recoveryStatus); @@ -188,8 +163,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); - sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("], retry_time [").append(TimeValue.timeValueMillis(recoveryStatus.retryTime())).append("]\n"); - sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("], throttling_wait [").append(TimeValue.timeValueMillis(recoveryStatus.index().retryTime())).append("]\n"); + sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("]\n"); + sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("]\n"); sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().existingTotalSize())).append("]\n"); sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.translog().time())).append("]"); logger.debug(sb.toString()); @@ -214,8 +189,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem listener.onIgnoreRecovery("shard closed"); } catch (Exception e) { listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e)); - } finally { - recoveryThrottler.recoveryGatewayDone(shardId, "gateway"); } } }); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java index 6005e0227a6..7712e0c56e7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java @@ -28,7 +28,6 @@ public class RecoveryStatus { public static enum Stage { INIT, - THROTTLE, INDEX, TRANSLOG, DONE @@ -38,8 +37,6 @@ public class RecoveryStatus { private long startTime = System.currentTimeMillis(); - private long retryTime = 0; - private long time; private Index index = new Index(); @@ -63,14 +60,6 @@ public class RecoveryStatus { this.startTime = startTime; } - public long retryTime() { - return this.retryTime; - } - - public void retryTime(long retryTime) { - this.retryTime = retryTime; - } - public long time() { return this.time; } @@ -126,7 +115,6 @@ public class RecoveryStatus { private long totalSize = 0; private int numberOfExistingFiles = 0; private long existingTotalSize = 0; - private AtomicLong retryTime = new AtomicLong(); private AtomicLong currentFilesSize = new AtomicLong(); public long startTime() { @@ -172,14 +160,6 @@ public class RecoveryStatus { return this.existingTotalSize; } - public void addRetryTime(long delta) { - retryTime.addAndGet(delta); - } - - public long retryTime() { - return this.retryTime.get(); - } - public void updateVersion(long version) { this.version = version; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index cdef8aab7e8..ab520d856d3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -51,7 +51,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; import javax.annotation.Nullable; @@ -81,8 +80,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo protected final Store store; - protected final RecoveryThrottler recoveryThrottler; - protected final ByteSizeValue chunkSize; protected final BlobStore blobStore; @@ -104,13 +101,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo private volatile SnapshotStatus currentSnapshotStatus; protected BlobStoreIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway, - IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { + IndexShard indexShard, Store store) { super(shardId, indexSettings); this.threadPool = threadPool; this.indexShard = (InternalIndexShard) indexShard; this.store = store; - this.recoveryThrottler = recoveryThrottler; BlobStoreIndexGateway blobStoreIndexGateway = (BlobStoreIndexGateway) indexGateway; @@ -571,23 +567,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); for (final BlobMetaData fileToRecover : filesToRecover) { - if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) { - // we managed to get a recovery going - recoverFile(fileToRecover, indicesBlobs, latch, failures); - } else { - // lets reschedule to do it next time - threadPool.schedule(new Runnable() { - @Override public void run() { - recoveryStatus.index().addRetryTime(recoveryThrottler.throttleInterval().millis()); - if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) { - // we managed to get a recovery going - recoverFile(fileToRecover, indicesBlobs, latch, failures); - } else { - threadPool.schedule(this, recoveryThrottler.throttleInterval()); - } - } - }, recoveryThrottler.throttleInterval()); - } + recoverFile(fileToRecover, indicesBlobs, latch, failures); } try { @@ -632,7 +612,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo try { indexOutput = store.directory().createOutput(fileToRecover.name()); } catch (IOException e) { - recoveryThrottler.streamDone(shardId, fileToRecover.name()); failures.add(e); latch.countDown(); return; @@ -645,7 +624,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } if (!blobs.containsKey(firstFileToRecover)) { // no file, what to do, what to do? - recoveryThrottler.streamDone(shardId, fileToRecover.name()); logger.warn("no file [{}] to recover, even though it has md5, ignoring it", fileToRecover.name()); latch.countDown(); return; @@ -681,12 +659,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo logger.warn("file [{}] has different md5, actual read content [{}], store [{}]", fileToRecover.name(), md5, fileToRecover.md5()); } - recoveryThrottler.streamDone(shardId, fileToRecover.name()); latch.countDown(); } @Override public void onFailure(Throwable t) { - recoveryThrottler.streamDone(shardId, fileToRecover.name()); failures.add(t); latch.countDown(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java index 8148c115d5b..abc505806d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; /** @@ -36,8 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool; public class FsIndexShardGateway extends BlobStoreIndexShardGateway { @Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway, - IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { - super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store, recoveryThrottler); + IndexShard indexShard, Store store) { + super(shardId, indexSettings, threadPool, fsIndexGateway, indexShard, store); } @Override public String type() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java index 477b2b2a3ed..cfecc675276 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryResponse.java @@ -32,7 +32,6 @@ import java.util.List; */ class RecoveryResponse implements Streamable { - boolean retry = false; List phase1FileNames = Lists.newArrayList(); List phase1FileSizes = Lists.newArrayList(); List phase1ExistingFileNames = Lists.newArrayList(); @@ -52,7 +51,6 @@ class RecoveryResponse implements Streamable { } @Override public void readFrom(StreamInput in) throws IOException { - retry = in.readBoolean(); int size = in.readVInt(); phase1FileNames = Lists.newArrayListWithCapacity(size); for (int i = 0; i < size; i++) { @@ -86,7 +84,6 @@ class RecoveryResponse implements Streamable { } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(retry); out.writeVInt(phase1FileNames.size()); for (String name : phase1FileNames) { out.writeUTF(name); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 0099a4951d7..00e77230067 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -38,7 +38,6 @@ import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -46,11 +45,8 @@ import java.io.IOException; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.common.unit.TimeValue.*; - /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. @@ -69,8 +65,6 @@ public class RecoverySource extends AbstractComponent { private final IndicesService indicesService; - private final RecoveryThrottler recoveryThrottler; - private final ByteSizeValue fileChunkSize; @@ -78,13 +72,11 @@ public class RecoverySource extends AbstractComponent { private final int translogBatchSize; - @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, - RecoveryThrottler recoveryThrottler) { + @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.indicesService = indicesService; - this.recoveryThrottler = recoveryThrottler; this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); this.translogBatchSize = componentSettings.getAsInt("translog_batch_size", 100); @@ -94,196 +86,175 @@ public class RecoverySource extends AbstractComponent { } private RecoveryResponse recover(final StartRecoveryRequest request) { - if (!recoveryThrottler.tryPeerRecovery(request.shardId(), "peer recovery source")) { - RecoveryResponse retry = new RecoveryResponse(); - retry.retry = true; - return retry; - } final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); - try { - logger.trace("starting recovery to {}, mark_as_relocated {}", request.targetNode(), request.markAsRelocated()); - final RecoveryResponse response = new RecoveryResponse(); - shard.recover(new Engine.RecoveryHandler() { - @Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException { - long totalSize = 0; - long existingTotalSize = 0; - try { - StopWatch stopWatch = new StopWatch().start(); + logger.trace("starting recovery to {}, mark_as_relocated {}", request.targetNode(), request.markAsRelocated()); + final RecoveryResponse response = new RecoveryResponse(); + shard.recover(new Engine.RecoveryHandler() { + @Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException { + long totalSize = 0; + long existingTotalSize = 0; + try { + StopWatch stopWatch = new StopWatch().start(); - for (String name : snapshot.getFiles()) { - StoreFileMetaData md = shard.store().metaDataWithMd5(name); - boolean useExisting = false; + for (String name : snapshot.getFiles()) { + StoreFileMetaData md = shard.store().metaDataWithMd5(name); + boolean useExisting = false; + if (request.existingFiles.containsKey(name)) { + if (md.md5().equals(request.existingFiles.get(name).md5())) { + response.phase1ExistingFileNames.add(name); + response.phase1ExistingFileSizes.add(md.sizeInBytes()); + existingTotalSize += md.sizeInBytes(); + useExisting = true; + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5()); + } + } + } + if (!useExisting) { if (request.existingFiles.containsKey(name)) { - if (md.md5().equals(request.existingFiles.get(name).md5())) { - response.phase1ExistingFileNames.add(name); - response.phase1ExistingFileSizes.add(md.sizeInBytes()); - existingTotalSize += md.sizeInBytes(); - useExisting = true; - if (logger.isTraceEnabled()) { - logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5()); + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5()); + } else { + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name); + } + response.phase1FileNames.add(name); + response.phase1FileSizes.add(md.sizeInBytes()); + totalSize += md.sizeInBytes(); + } + } + response.phase1TotalSize = totalSize; + response.phase1ExistingTotalSize = existingTotalSize; + + logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + + RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, + response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + + final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); + final AtomicReference lastException = new AtomicReference(); + for (final String name : response.phase1FileNames) { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + IndexInput indexInput = null; + try { + final int BUFFER_SIZE = (int) fileChunkSize.bytes(); + byte[] buf = new byte[BUFFER_SIZE]; + indexInput = snapshot.getDirectory().openInput(name); + long len = indexInput.length(); + long readCount = 0; + while (readCount < len) { + if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us + throw new IndexShardClosedException(shard.shardId()); + } + int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; + long position = indexInput.getFilePointer(); + indexInput.readBytes(buf, 0, toRead, false); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead), + TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); + readCount += toRead; } + indexInput.close(); + } catch (Exception e) { + lastException.set(e); + } finally { + if (indexInput != null) { + try { + indexInput.close(); + } catch (IOException e) { + // ignore + } + } + latch.countDown(); } } - if (!useExisting) { - if (request.existingFiles.containsKey(name)) { - logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5()); - } else { - logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name); - } - response.phase1FileNames.add(name); - response.phase1FileSizes.add(md.sizeInBytes()); - totalSize += md.sizeInBytes(); - } - } - response.phase1TotalSize = totalSize; - response.phase1ExistingTotalSize = existingTotalSize; + }); + } - final AtomicLong throttlingWaitTime = new AtomicLong(); + latch.await(); - logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + if (lastException.get() != null) { + throw lastException.get(); + } - RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, - response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + // now, set the clean files request + Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet(); - final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); - final AtomicReference lastException = new AtomicReference(); - for (final String name : response.phase1FileNames) { - threadPool.cached().execute(new Runnable() { - @Override public void run() { - IndexInput indexInput = null; - try { - long throttlingStartTime = System.currentTimeMillis(); - while (!recoveryThrottler.tryStream(request.shardId(), name)) { - if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us - throw new IndexShardClosedException(shard.shardId()); - } - Thread.sleep(recoveryThrottler.throttleInterval().millis()); - } - throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime); + stopWatch.stop(); + logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); + response.phase1Time = stopWatch.totalTime().millis(); + } catch (Throwable e) { + throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e); + } + } - final int BUFFER_SIZE = (int) fileChunkSize.bytes(); - byte[] buf = new byte[BUFFER_SIZE]; - indexInput = snapshot.getDirectory().openInput(name); - long len = indexInput.length(); - long readCount = 0; - while (readCount < len) { - if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us - throw new IndexShardClosedException(shard.shardId()); - } - int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; - long position = indexInput.getFilePointer(); - indexInput.readBytes(buf, 0, toRead, false); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead), - TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); - readCount += toRead; - } - indexInput.close(); - } catch (Exception e) { - lastException.set(e); - } finally { - recoveryThrottler.streamDone(request.shardId(), name); - if (indexInput != null) { - try { - indexInput.close(); - } catch (IOException e) { - // ignore - } - } - latch.countDown(); - } - } - }); - } + @Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); + StopWatch stopWatch = new StopWatch().start(); - latch.await(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); - if (lastException.get() != null) { - throw lastException.get(); - } + int totalOperations = sendSnapshot(snapshot); - // now, set the clean files request - Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE).txGet(); + stopWatch.stop(); + logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); + response.phase2Time = stopWatch.totalTime().millis(); + response.phase2Operations = totalOperations; + } - stopWatch.stop(); - logger.trace("[{}][{}] recovery [phase1] to {}: took [{}], throttling_wait [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get())); - response.phase1Time = stopWatch.totalTime().millis(); - } catch (Throwable e) { - throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e); + @Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); + StopWatch stopWatch = new StopWatch().start(); + int totalOperations = sendSnapshot(snapshot); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); + if (request.markAsRelocated()) { + // TODO what happens if the recovery process fails afterwards, we need to mark this back to started + try { + shard.relocated(); + } catch (IllegalIndexShardStateException e) { + // we can ignore this exception since, on the other node, when it moved to phase3 + // it will also send shard started, which might cause the index shard we work against + // to move be closed by the time we get to the the relocated method } } + stopWatch.stop(); + logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); + response.phase3Time = stopWatch.totalTime().millis(); + response.phase3Operations = totalOperations; + } - @Override public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException { + private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException { + int counter = 0; + int totalOperations = 0; + List operations = Lists.newArrayList(); + while (snapshot.hasNext()) { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); - StopWatch stopWatch = new StopWatch().start(); - - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); - - int totalOperations = sendSnapshot(snapshot); - - stopWatch.stop(); - logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); - response.phase2Time = stopWatch.totalTime().millis(); - response.phase2Operations = totalOperations; - } - - @Override public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode()); - StopWatch stopWatch = new StopWatch().start(); - int totalOperations = sendSnapshot(snapshot); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE).txGet(); - if (request.markAsRelocated()) { - // TODO what happens if the recovery process fails afterwards, we need to mark this back to started - try { - shard.relocated(); - } catch (IllegalIndexShardStateException e) { - // we can ignore this exception since, on the other node, when it moved to phase3 - // it will also send shard started, which might cause the index shard we work against - // to move be closed by the time we get to the the relocated method - } - } - stopWatch.stop(); - logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime()); - response.phase3Time = stopWatch.totalTime().millis(); - response.phase3Operations = totalOperations; - } - - private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException { - int counter = 0; - int totalOperations = 0; - List operations = Lists.newArrayList(); - while (snapshot.hasNext()) { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - operations.add(snapshot.next()); - totalOperations++; - if (++counter == translogBatchSize) { - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); - counter = 0; - operations = Lists.newArrayList(); - } - } - // send the leftover - if (!operations.isEmpty()) { + operations.add(snapshot.next()); + totalOperations++; + if (++counter == translogBatchSize) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); + counter = 0; + operations = Lists.newArrayList(); } - return totalOperations; } - }); - return response; - } finally { - recoveryThrottler.recoveryPeerDone(request.shardId(), "peer recovery source"); - } + // send the leftover + if (!operations.isEmpty()) { + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); + } + return totalOperations; + } + }); + return response; } class StartRecoveryTransportRequestHandler extends BaseTransportRequestHandler { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java index 4cdae0d62f9..3b87166eb16 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java @@ -33,7 +33,6 @@ public class RecoveryStatus { public static enum Stage { INIT, - THROTTLE, INDEX, TRANSLOG, FINALIZE, @@ -44,7 +43,6 @@ public class RecoveryStatus { final long startTime = System.currentTimeMillis(); long time; - volatile long retryTime = 0; List phase1FileNames; List phase1FileSizes; List phase1ExistingFileNames; @@ -64,10 +62,6 @@ public class RecoveryStatus { return this.time; } - public long retryTime() { - return retryTime; - } - public long phase1TotalSize() { return phase1TotalSize; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index 9160a269bc0..8fb561feb8d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -40,7 +40,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -76,17 +75,14 @@ public class RecoveryTarget extends AbstractComponent { private final IndicesService indicesService; - private final RecoveryThrottler recoveryThrottler; - private final ConcurrentMap onGoingRecoveries = ConcurrentCollections.newConcurrentMap(); @Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, - IndicesLifecycle indicesLifecycle, RecoveryThrottler recoveryThrottler) { + IndicesLifecycle indicesLifecycle) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.indicesService = indicesService; - this.recoveryThrottler = recoveryThrottler; transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler()); transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler()); @@ -167,13 +163,6 @@ public class RecoveryTarget extends AbstractComponent { onGoingRecoveries.put(request.shardId(), recovery); } - if (!recoveryThrottler.tryPeerRecovery(shard.shardId(), "peer recovery target")) { - recovery.stage = RecoveryStatus.Stage.THROTTLE; - recovery.retryTime = System.currentTimeMillis() - recovery.startTime; - listener.onRetryRecovery(recoveryThrottler.throttleInterval()); - return; - } - try { logger.trace("[{}][{}] starting recovery from {}", request.shardId().index().name(), request.shardId().id(), request.sourceNode()); @@ -183,15 +172,8 @@ public class RecoveryTarget extends AbstractComponent { return new RecoveryResponse(); } }).txGet(); - if (recoveryStatus.retry) { - if (shard.state() == IndexShardState.CLOSED) { - listener.onIgnoreRecovery(false, "shard closed, stop recovery"); - return; - } - logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval()); - recovery.stage = RecoveryStatus.Stage.THROTTLE; - recovery.retryTime = System.currentTimeMillis() - recovery.startTime; - listener.onRetryRecovery(recoveryThrottler.throttleInterval()); + if (shard.state() == IndexShardState.CLOSED) { + listener.onIgnoreRecovery(false, "shard closed, stop recovery"); return; } stopWatch.stop(); @@ -231,9 +213,7 @@ public class RecoveryTarget extends AbstractComponent { } if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) { - recovery.stage = RecoveryStatus.Stage.THROTTLE; - recovery.retryTime = System.currentTimeMillis() - recovery.startTime; - listener.onRetryRecovery(recoveryThrottler.throttleInterval()); + listener.onRetryRecovery(TimeValue.timeValueMillis(500)); return; } @@ -251,8 +231,6 @@ public class RecoveryTarget extends AbstractComponent { } listener.onRecoveryFailure(new RecoveryFailedException(request, e), true); - } finally { - recoveryThrottler.recoveryPeerDone(shard.shardId(), "peer recovery target"); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java index 9706a05615c..8767c39fb87 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -80,6 +81,8 @@ public class TranslogService extends AbstractIndexShardComponent { logger.trace("flushing translog, operations [{}], breached [{}]", currentSize, flushThreshold); try { indexShard.flush(new Engine.Flush()); + } catch (EngineClosedException e) { + // we are being closed, ignore } catch (FlushNotAllowedEngineException e) { // ignore this exception, we are not allowed to perform flush } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java index a663d402a3e..b454b54c456 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -27,7 +27,6 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.memory.IndexingMemoryBufferController; import org.elasticsearch.indices.memory.IndicesMemoryCleaner; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; /** @@ -46,7 +45,6 @@ public class IndicesModule extends AbstractModule { bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton(); - bind(RecoveryThrottler.class).asEagerSingleton(); bind(RecoveryTarget.class).asEagerSingleton(); bind(RecoverySource.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java deleted file mode 100644 index fda7729a155..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/throttler/RecoveryThrottler.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.recovery.throttler; - -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; - -/** - * Recovery Throttler allows to throttle recoveries (both gateway and peer). - * - * @author kimchy (shay.banon) - */ -public class RecoveryThrottler extends AbstractComponent { - - private final Object concurrentRecoveryMutex = new Object(); - - private final int concurrentRecoveries; - - private final TimeValue throttleInterval; - - private volatile int onGoingGatewayRecoveries = 0; - - private volatile int onGoingPeerRecoveries = 0; - - private final int concurrentStreams; - - private volatile int onGoingStreams = 0; - - private final Object concurrentStreamsMutex = new Object(); - - @Inject public RecoveryThrottler(Settings settings) { - super(settings); - - int defaultConcurrentRecoveries = Runtime.getRuntime().availableProcessors() + 1; - // tap it at 10 (is it a good number?) - if (defaultConcurrentRecoveries > 10) { - defaultConcurrentRecoveries = 10; - } else if (defaultConcurrentRecoveries < 3) { - defaultConcurrentRecoveries = 3; - } - - concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", defaultConcurrentRecoveries); - concurrentStreams = componentSettings.getAsInt("concurrent_streams", defaultConcurrentRecoveries * 2); - throttleInterval = componentSettings.getAsTime("interval", TimeValue.timeValueMillis(100)); - - logger.debug("concurrent_recoveries [{}], concurrent_streams [{}] interval [{}]", concurrentRecoveries, concurrentStreams, throttleInterval); - } - - /** - * Try and check if gateway recovery is allowed. Only takes the on going gateway recoveries into account. Ignore - * on going peer recoveries so peer recovery will not block a much more important gateway recovery. - */ - public boolean tryGatewayRecovery(ShardId shardId, String reason) { - synchronized (concurrentRecoveryMutex) { - if ((onGoingGatewayRecoveries + 1) > concurrentRecoveries) { - return false; - } - onGoingGatewayRecoveries++; - logger.trace("Recovery (gateway) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); - return true; - } - } - - /** - * Mark gateway recvoery as done. - */ - public void recoveryGatewayDone(ShardId shardId, String reason) { - synchronized (concurrentRecoveryMutex) { - --onGoingGatewayRecoveries; - logger.trace("Recovery (gateway) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); - } - } - - /** - * Try and check if peer recovery is allowed. Takes into account both on going gateway recovery and peer recovery. - */ - public boolean tryPeerRecovery(ShardId shardId, String reason) { - synchronized (concurrentRecoveryMutex) { - if ((onGoingGatewayRecoveries + onGoingPeerRecoveries + 1) > concurrentRecoveries) { - return false; - } - onGoingPeerRecoveries++; - logger.trace("Recovery (peer) allowed for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); - return true; - } - } - - /** - * Mark peer recovery as done. - */ - public void recoveryPeerDone(ShardId shardId, String reason) { - synchronized (concurrentRecoveryMutex) { - --onGoingPeerRecoveries; - logger.trace("Recovery (peer) done for [{}], on_going (gateway [{}], peer [{}]), allowed [{}], reason [{}]", shardId, onGoingGatewayRecoveries, onGoingPeerRecoveries, concurrentRecoveries, reason); - } - } - - public int onGoingRecoveries() { - return onGoingGatewayRecoveries + onGoingPeerRecoveries; - } - - public boolean tryStream(ShardId shardId, String streamName) { - synchronized (concurrentStreamsMutex) { - if (onGoingStreams + 1 > concurrentStreams) { - return false; - } - onGoingStreams++; - logger.trace("Stream [{}] allowed for [{}], on going [{}], allowed [{}]", streamName, shardId, onGoingStreams, concurrentStreams); - return true; - } - } - - public void streamDone(ShardId shardId, String streamName) { - synchronized (concurrentStreamsMutex) { - --onGoingStreams; - logger.trace("Stream [{}] done for [{}], on going [{}], allowed [{}]", streamName, shardId, onGoingStreams, concurrentStreams); - } - } - - public int onGoingStreams() { - return onGoingStreams; - } - - public TimeValue throttleInterval() { - return throttleInterval; - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java index 5ebaea24e6d..cdf933361d7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java @@ -145,8 +145,6 @@ public class RestIndicesStatusAction extends BaseRestHandler { builder.field("start_time_in_millis", peerRecoveryStatus.startTime()); builder.field("time", peerRecoveryStatus.time()); builder.field("time_in_millis", peerRecoveryStatus.time().millis()); - builder.field("throttling_time", peerRecoveryStatus.throttlingTime()); - builder.field("throttling_time_in_millis", peerRecoveryStatus.throttlingTime().millis()); builder.startObject("index"); builder.field("progress", peerRecoveryStatus.indexRecoveryProgress()); @@ -174,8 +172,6 @@ public class RestIndicesStatusAction extends BaseRestHandler { builder.field("start_time_in_millis", gatewayRecoveryStatus.startTime()); builder.field("time", gatewayRecoveryStatus.time()); builder.field("time_in_millis", gatewayRecoveryStatus.time().millis()); - builder.field("throttling_time", gatewayRecoveryStatus.throttlingTime()); - builder.field("throttling_time_in_millis", gatewayRecoveryStatus.throttlingTime().millis()); builder.startObject("index"); builder.field("progress", gatewayRecoveryStatus.indexRecoveryProgress()); @@ -187,8 +183,6 @@ public class RestIndicesStatusAction extends BaseRestHandler { builder.field("expected_recovered_size_in_bytes", gatewayRecoveryStatus.expectedRecoveredIndexSize().bytes()); builder.field("recovered_size", gatewayRecoveryStatus.recoveredIndexSize()); builder.field("recovered_size_in_bytes", gatewayRecoveryStatus.recoveredIndexSize().bytes()); - builder.field("throttling_time", gatewayRecoveryStatus.indexThrottlingTime()); - builder.field("throttling_time_in_millis", gatewayRecoveryStatus.indexThrottlingTime().millis()); builder.endObject(); builder.startObject("translog"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java index 255f1072b92..8e51df2e37a 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java @@ -36,6 +36,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -47,7 +48,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests { private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class); @Test public void testElectReplicaAsPrimaryDuringRelocation() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 93fe9a29dc2..622d793a2a8 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -38,6 +38,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -50,7 +51,7 @@ public class FailedShardsRoutingTests { private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class); @Test public void testFailures() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -163,7 +164,7 @@ public class FailedShardsRoutingTests { } @Test public void test10ShardsWith1ReplicaFailure() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 1ae21bd25cc..022e5fc66d4 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -46,7 +47,7 @@ public class PrimaryElectionRoutingTests { private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class); @Test public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index e38e70e6adf..d1b26ca48b6 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -36,6 +36,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -47,8 +48,7 @@ public class RebalanceAfterActiveTests { private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class); @Test public void testRebalanceOnlyAfterAllShardsAreActive() { - - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java index e69fdd9adf9..968d1d79640 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java @@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -46,8 +47,7 @@ public class ReplicaAllocatedAfterPrimaryTests { private final ESLogger logger = Loggers.getLogger(ReplicaAllocatedAfterPrimaryTests.class); @Test public void testBackupIsAllocatedAfterPrimary() { - - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index bdd57c43e23..1239f034dcc 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -43,6 +43,7 @@ import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.Sets.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -54,7 +55,7 @@ public class SingleShardNoReplicasRoutingTests { private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class); @Test public void testSingleIndexStartedShard() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -154,7 +155,7 @@ public class SingleShardNoReplicasRoutingTests { } @Test public void testSingleIndexShardFailed() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -203,7 +204,7 @@ public class SingleShardNoReplicasRoutingTests { } @Test public void testMultiIndexEvenDistribution() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); final int numberOfIndices = 50; logger.info("Building initial routing table with " + numberOfIndices + " indices"); @@ -311,7 +312,7 @@ public class SingleShardNoReplicasRoutingTests { } @Test public void testMultiIndexUnevenNodes() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); final int numberOfIndices = 10; logger.info("Building initial routing table with " + numberOfIndices + " indices"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java index ef44eca6822..2279a316d49 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java @@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -46,7 +47,7 @@ public class SingleShardOneReplicaRoutingTests { private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class); @Test public void testSingleIndexFirstStartPrimaryThenBackups() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index bd5c2be8cb9..410f67876bb 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -35,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -46,7 +47,7 @@ public class TenShardsOneReplicaRoutingTests { private final ESLogger logger = Loggers.getLogger(TenShardsOneReplicaRoutingTests.class); @Test public void testSingleIndexFirstStartPrimaryThenBackups() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java new file mode 100644 index 00000000000..18e582ef49f --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -0,0 +1,174 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class ThrottlingAllocationTests { + + private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class); + + @Test public void testPrimaryRecoveryThrottling() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(10).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("start one node, do reroute, only 3 should initialize"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(17)); + + logger.info("start initializing, another 3 should initialize"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(14)); + + logger.info("start initializing, another 3 should initialize"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(6)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(11)); + + logger.info("start initializing, another 1 should initialize"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(9)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(10)); + + logger.info("start initializing, all primaries should be started"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(10)); + } + + @Test public void testReplicaAndPrimaryRecoveryThrottling() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 3).build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("start one node, do reroute, only 3 should initialize"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(7)); + + logger.info("start initializing, another 2 should initialize"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(5)); + + logger.info("start initializing, all primaries should be started"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(5)); + + logger.info("start another node, replicas should start being allocated"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); + routingTable = strategy.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(2)); + + logger.info("start initializing replicas"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + + logger.info("start initializing replicas, all should be started"); + routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10)); + assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java index 90a81465ecf..b9fb86462e3 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java @@ -17,6 +17,7 @@ import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.cluster.routing.RoutingBuilders.*; import static org.elasticsearch.cluster.routing.RoutingTable.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -28,7 +29,7 @@ public class UpdateNumberOfReplicasTests { private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class); @Test public void testUpdateNumberOfReplicas() { - ShardsAllocation strategy = new ShardsAllocation(); + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java index 502dbffdba9..02c71d2aa5e 100644 --- a/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java +++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/index/gateway/s3/S3IndexShardGateway.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; /** @@ -36,8 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool; public class S3IndexShardGateway extends BlobStoreIndexShardGateway { @Inject public S3IndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway, - IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { - super(shardId, indexSettings, threadPool, indexGateway, indexShard, store, recoveryThrottler); + IndexShard indexShard, Store store) { + super(shardId, indexSettings, threadPool, indexGateway, indexShard, store); } @Override public String type() { diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java index b7c34d513cd..85fdbdab2ed 100644 --- a/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java +++ b/plugins/hadoop/src/main/java/org/elasticsearch/index/gateway/hdfs/HdfsIndexShardGateway.java @@ -27,7 +27,6 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.store.Store; -import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; /** @@ -36,8 +35,8 @@ import org.elasticsearch.threadpool.ThreadPool; public class HdfsIndexShardGateway extends BlobStoreIndexShardGateway { @Inject public HdfsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway hdfsIndexGateway, - IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) { - super(shardId, indexSettings, threadPool, hdfsIndexGateway, indexShard, store, recoveryThrottler); + IndexShard indexShard, Store store) { + super(shardId, indexSettings, threadPool, hdfsIndexGateway, indexShard, store); } @Override public String type() {