diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index d37670caf13..0dad125120b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -39,10 +39,10 @@ import org.elasticsearch.index.gateway.IndexShardGatewayService; import org.elasticsearch.index.gateway.SnapshotStatus; import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.recovery.RecoveryStatus; -import org.elasticsearch.index.shard.recovery.RecoveryTarget; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryStatus; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/RateLimiter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/RateLimiter.java new file mode 100644 index 00000000000..64b2d31421d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/RateLimiter.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common; + +import org.elasticsearch.ElasticSearchInterruptedException; + +/** + */ +// LUCENE MONITOR: Taken from trunk of Lucene at 06-09-11 +public class RateLimiter { + + private volatile double nsPerByte; + private volatile long lastNS; + + // TODO: we could also allow eg a sub class to dynamically + // determine the allowed rate, eg if an app wants to + // change the allowed rate over time or something + + /** + * mbPerSec is the MB/sec max IO rate + */ + public RateLimiter(double mbPerSec) { + setMaxRate(mbPerSec); + } + + public void setMaxRate(double mbPerSec) { + nsPerByte = 1000000000. / (1024 * 1024 * mbPerSec); + } + + /** + * Pauses, if necessary, to keep the instantaneous IO + * rate at or below the target. NOTE: multiple threads + * may safely use this, however the implementation is + * not perfectly thread safe but likely in practice this + * is harmless (just means in some rare cases the rate + * might exceed the target). It's best to call this + * with a biggish count, not one byte at a time. + */ + public void pause(long bytes) { + + // TODO: this is purely instantenous rate; maybe we + // should also offer decayed recent history one? + final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte)); + long curNS = System.nanoTime(); + if (lastNS < curNS) { + lastNS = curNS; + } + + // While loop because Thread.sleep doesn't alway sleep + // enough: + while (true) { + final long pauseNS = targetNS - curNS; + if (pauseNS > 0) { + try { + Thread.sleep((int) (pauseNS / 1000000), (int) (pauseNS % 1000000)); + } catch (InterruptedException ie) { + throw new ElasticSearchInterruptedException("interrupted while rate limiting", ie); + } + curNS = System.nanoTime(); + continue; + } + break; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index d0e5de29537..0ead6d7276a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -65,12 +65,12 @@ import org.elasticsearch.index.search.stats.ShardSearchService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.*; -import org.elasticsearch.index.shard.recovery.RecoveryStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle; +import org.elasticsearch.indices.recovery.RecoveryStatus; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java index 567001710b2..8f55b4395dc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -24,13 +24,14 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.recovery.RecoverySource; -import org.elasticsearch.index.shard.recovery.RecoveryTarget; import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.memory.IndexingMemoryBufferController; import org.elasticsearch.indices.query.IndicesQueriesModule; +import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.indices.recovery.RecoverySource; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; /** @@ -53,6 +54,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton(); + bind(RecoverySettings.class).asEagerSingleton(); bind(RecoveryTarget.class).asEagerSingleton(); bind(RecoverySource.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index a98f130f0c3..1ac25b53553 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -75,6 +75,7 @@ import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.indices.analysis.IndicesAnalysisService; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.plugins.IndexPluginsModule; import org.elasticsearch.plugins.PluginsService; @@ -168,6 +169,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent lastException = new AtomicReference(); for (final String name : response.phase1FileNames) { - concurrentStreamPool.execute(new Runnable() { + recoverySettings.concurrentStreamPool().execute(new Runnable() { @Override public void run() { IndexInput indexInput = null; try { - final int BUFFER_SIZE = (int) fileChunkSize.bytes(); + final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes(); byte[] buf = new byte[BUFFER_SIZE]; StoreFileMetaData md = shard.store().metaData(name); indexInput = snapshot.getDirectory().openInput(name); @@ -184,7 +153,7 @@ public class RecoverySource extends AbstractComponent { long position = indexInput.getFilePointer(); indexInput.readBytes(buf, 0, toRead, false); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), - TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } indexInput.close(); @@ -277,9 +246,9 @@ public class RecoverySource extends AbstractComponent { ops += 1; size += operation.estimateSize(); totalOperations++; - if (ops >= translogOps || size >= translogSize.bytes()) { + if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); ops = 0; size = 0; operations.clear(); @@ -288,7 +257,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); } return totalOperations; } @@ -311,16 +280,5 @@ public class RecoverySource extends AbstractComponent { channel.sendResponse(response); } } - - class ApplySettings implements NodeSettingsService.Listener { - @Override public void onRefreshSettings(Settings settings) { - int concurrentStreams = settings.getAsInt("index.shard.recovery.concurrent_streams", RecoverySource.this.concurrentStreams); - if (concurrentStreams != RecoverySource.this.concurrentStreams) { - logger.info("updating [index.shard.recovery.concurrent_streams] from [{}] to [{}]", RecoverySource.this.concurrentStreams, concurrentStreams); - RecoverySource.this.concurrentStreams = concurrentStreams; - RecoverySource.this.concurrentStreamPool.setMaximumPoolSize(concurrentStreams); - } - } - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java similarity index 98% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java index c20c2f01c3b..bc43f54ba56 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.shard.recovery; +package org.elasticsearch.indices.recovery; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java similarity index 99% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 0385fe9d996..428cc7e3584 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.shard.recovery; +package org.elasticsearch.indices.recovery; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexOutput; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java similarity index 98% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index cbcb1830fd6..7db2df0c4dd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTranslogOperationsRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.shard.recovery; +package org.elasticsearch.indices.recovery; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java similarity index 98% rename from modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index c413eacbcf2..bddbd0d803c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.shard.recovery; +package org.elasticsearch.indices.recovery; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.Maps;