Recovery Settings: Change settings (still support old settings) and allow for more dynamic settings, closes #1303.

This commit is contained in:
Shay Banon 2011-09-06 11:21:32 +03:00
parent 9f427010bf
commit 8ebbd1e7b9
21 changed files with 253 additions and 79 deletions

View File

@ -39,10 +39,10 @@ import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.gateway.SnapshotStatus; import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.RecoveryStatus;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.ElasticSearchInterruptedException;
/**
*/
// LUCENE MONITOR: Taken from trunk of Lucene at 06-09-11
public class RateLimiter {
private volatile double nsPerByte;
private volatile long lastNS;
// TODO: we could also allow eg a sub class to dynamically
// determine the allowed rate, eg if an app wants to
// change the allowed rate over time or something
/**
* mbPerSec is the MB/sec max IO rate
*/
public RateLimiter(double mbPerSec) {
setMaxRate(mbPerSec);
}
public void setMaxRate(double mbPerSec) {
nsPerByte = 1000000000. / (1024 * 1024 * mbPerSec);
}
/**
* Pauses, if necessary, to keep the instantaneous IO
* rate at or below the target. NOTE: multiple threads
* may safely use this, however the implementation is
* not perfectly thread safe but likely in practice this
* is harmless (just means in some rare cases the rate
* might exceed the target). It's best to call this
* with a biggish count, not one byte at a time.
*/
public void pause(long bytes) {
// TODO: this is purely instantenous rate; maybe we
// should also offer decayed recent history one?
final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
long curNS = System.nanoTime();
if (lastNS < curNS) {
lastNS = curNS;
}
// While loop because Thread.sleep doesn't alway sleep
// enough:
while (true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
Thread.sleep((int) (pauseNS / 1000000), (int) (pauseNS % 1000000));
} catch (InterruptedException ie) {
throw new ElasticSearchInterruptedException("interrupted while rate limiting", ie);
}
curNS = System.nanoTime();
continue;
}
break;
}
}
}

View File

@ -65,12 +65,12 @@ import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.recovery.RecoveryStatus;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;

View File

@ -24,13 +24,14 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.recovery.RecoverySource;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache; import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.memory.IndexingMemoryBufferController; import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
/** /**
@ -53,6 +54,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton(); bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton();
bind(RecoverySettings.class).asEagerSingleton();
bind(RecoveryTarget.class).asEagerSingleton(); bind(RecoveryTarget.class).asEagerSingleton();
bind(RecoverySource.class).asEagerSingleton(); bind(RecoverySource.class).asEagerSingleton();

View File

@ -75,6 +75,7 @@ import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.IndexPluginsModule; import org.elasticsearch.plugins.IndexPluginsModule;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
@ -168,6 +169,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
injector.getInstance(RecoverySettings.class).close();
indicesStore.close(); indicesStore.close();
indicesAnalysisService.close(); indicesAnalysisService.close();
} }

View File

@ -61,13 +61,12 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.recovery.RecoveryFailedException;
import org.elasticsearch.index.shard.recovery.RecoverySource;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.index.shard.recovery.StartRecoveryRequest;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.List; import java.util.List;
@ -88,8 +87,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final RecoverySource recoverySource;
private final RecoveryTarget recoveryTarget; private final RecoveryTarget recoveryTarget;
private final ShardStateAction shardStateAction; private final ShardStateAction shardStateAction;
@ -113,7 +110,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler(); private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
@Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, @Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget, RecoverySource recoverySource, ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction, ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction, NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction, NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction,
@ -122,7 +119,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.recoverySource = recoverySource;
this.recoveryTarget = recoveryTarget; this.recoveryTarget = recoveryTarget;
this.shardStateAction = shardStateAction; this.shardStateAction = shardStateAction;
this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.nodeIndexCreatedAction = nodeIndexCreatedAction;
@ -141,7 +137,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
recoverySource.close();
} }
@Override public void clusterChanged(final ClusterChangedEvent event) { @Override public void clusterChanged(final ClusterChangedEvent event) {

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticSearchWrapperException; import org.elasticsearch.ElasticSearchWrapperException;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -0,0 +1,135 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.concurrent.ThreadPoolExecutor;
/**
*/
public class RecoverySettings extends AbstractComponent {
static {
MetaData.addDynamicSettings("indices.recovery.file_chunk_size");
MetaData.addDynamicSettings("indices.recovery.translog_ops");
MetaData.addDynamicSettings("indices.recovery.translog_size");
MetaData.addDynamicSettings("indices.recovery.compress");
MetaData.addDynamicSettings("`");
}
private volatile ByteSizeValue fileChunkSize;
private volatile boolean compress;
private volatile int translogOps;
private volatile ByteSizeValue translogSize;
private volatile int concurrentStreams;
private final ThreadPoolExecutor concurrentStreamPool;
@Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", settings.getAsBytesSize("index.shard.recovery.file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)));
this.translogOps = componentSettings.getAsInt("translog_ops", settings.getAsInt("index.shard.recovery.translog_ops", 1000));
this.translogSize = componentSettings.getAsBytesSize("translog_size", settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(100, ByteSizeUnit.KB)));
this.compress = componentSettings.getAsBoolean("compress", true);
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 5));
this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
nodeSettingsService.addListener(new ApplySettings());
}
public void close() {
concurrentStreamPool.shutdown();
}
public ByteSizeValue fileChunkSize() {
return fileChunkSize;
}
public boolean compress() {
return compress;
}
public int translogOps() {
return translogOps;
}
public ByteSizeValue translogSize() {
return translogSize;
}
public int concurrentStreams() {
return concurrentStreams;
}
public ThreadPoolExecutor concurrentStreamPool() {
return concurrentStreamPool;
}
class ApplySettings implements NodeSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
ByteSizeValue fileChunkSize = settings.getAsBytesSize("indices.recovery.file_chunk_size", RecoverySettings.this.fileChunkSize);
if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) {
logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize);
RecoverySettings.this.fileChunkSize = fileChunkSize;
}
int translogOps = settings.getAsInt("indices.recovery.translog_ops", RecoverySettings.this.translogOps);
if (translogOps != RecoverySettings.this.translogOps) {
logger.info("updating [indices.recovery.translog_ops] from [{}] to [{}]", RecoverySettings.this.translogOps, translogOps);
RecoverySettings.this.translogOps = translogOps;
}
ByteSizeValue translogSize = settings.getAsBytesSize("indices.recovery.translog_size", RecoverySettings.this.translogSize);
if (!translogSize.equals(RecoverySettings.this.translogSize)) {
logger.info("updating [indices.recovery.translog_size] from [{}] to [{}]", RecoverySettings.this.translogSize, translogSize);
RecoverySettings.this.translogSize = translogSize;
}
boolean compress = settings.getAsBoolean("indices.recovery.compress", RecoverySettings.this.compress);
if (compress != RecoverySettings.this.compress) {
logger.info("updating [indices.recovery.compress] from [{}] to [{}]", RecoverySettings.this.compress, compress);
RecoverySettings.this.compress = compress;
}
int concurrentStreams = settings.getAsInt("indices.recovery.concurrent_streams", RecoverySettings.this.concurrentStreams);
if (concurrentStreams != RecoverySettings.this.concurrentStreams) {
logger.info("updating [indices.recovery.concurrent_streams] from [{}] to [{}]", RecoverySettings.this.concurrentStreams, concurrentStreams);
RecoverySettings.this.concurrentStreams = concurrentStreams;
RecoverySettings.this.concurrentStreamPool.setMaximumPoolSize(concurrentStreams);
}
}
}
}

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
@ -28,11 +28,7 @@ import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IllegalIndexShardStateException;
@ -42,7 +38,6 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
@ -54,14 +49,11 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this * The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard. * source shard to the target shard.
*
* @author kimchy (shay.banon)
*/ */
public class RecoverySource extends AbstractComponent { public class RecoverySource extends AbstractComponent {
@ -79,42 +71,19 @@ public class RecoverySource extends AbstractComponent {
private final IndicesService indicesService; private final IndicesService indicesService;
private final RecoverySettings recoverySettings;
private final ByteSizeValue fileChunkSize;
private final boolean compress;
private final int translogOps;
private final ByteSizeValue translogSize;
private volatile int concurrentStreams;
private final ThreadPoolExecutor concurrentStreamPool;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
NodeSettingsService nodeSettingsService) { RecoverySettings recoverySettings) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.transportService = transportService; this.transportService = transportService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); this.recoverySettings = recoverySettings;
this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.translogOps = componentSettings.getAsInt("translog_ops", 1000);
this.translogSize = componentSettings.getAsBytesSize("translog_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.compress = componentSettings.getAsBoolean("compress", true);
logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
nodeSettingsService.addListener(new ApplySettings());
}
public void close() {
concurrentStreamPool.shutdown();
} }
private RecoveryResponse recover(final StartRecoveryRequest request) { private RecoveryResponse recover(final StartRecoveryRequest request) {
@ -166,11 +135,11 @@ public class RecoverySource extends AbstractComponent {
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>(); final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) { for (final String name : response.phase1FileNames) {
concurrentStreamPool.execute(new Runnable() { recoverySettings.concurrentStreamPool().execute(new Runnable() {
@Override public void run() { @Override public void run() {
IndexInput indexInput = null; IndexInput indexInput = null;
try { try {
final int BUFFER_SIZE = (int) fileChunkSize.bytes(); final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
byte[] buf = new byte[BUFFER_SIZE]; byte[] buf = new byte[BUFFER_SIZE];
StoreFileMetaData md = shard.store().metaData(name); StoreFileMetaData md = shard.store().metaData(name);
indexInput = snapshot.getDirectory().openInput(name); indexInput = snapshot.getDirectory().openInput(name);
@ -184,7 +153,7 @@ public class RecoverySource extends AbstractComponent {
long position = indexInput.getFilePointer(); long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false); indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead; readCount += toRead;
} }
indexInput.close(); indexInput.close();
@ -277,9 +246,9 @@ public class RecoverySource extends AbstractComponent {
ops += 1; ops += 1;
size += operation.estimateSize(); size += operation.estimateSize();
totalOperations++; totalOperations++;
if (ops >= translogOps || size >= translogSize.bytes()) { if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0; ops = 0;
size = 0; size = 0;
operations.clear(); operations.clear();
@ -288,7 +257,7 @@ public class RecoverySource extends AbstractComponent {
// send the leftover // send the leftover
if (!operations.isEmpty()) { if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
} }
return totalOperations; return totalOperations;
} }
@ -311,16 +280,5 @@ public class RecoverySource extends AbstractComponent {
channel.sendResponse(response); channel.sendResponse(response);
} }
} }
class ApplySettings implements NodeSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
int concurrentStreams = settings.getAsInt("index.shard.recovery.concurrent_streams", RecoverySource.this.concurrentStreams);
if (concurrentStreams != RecoverySource.this.concurrentStreams) {
logger.info("updating [index.shard.recovery.concurrent_streams] from [{}] to [{}]", RecoverySource.this.concurrentStreams, concurrentStreams);
RecoverySource.this.concurrentStreams = concurrentStreams;
RecoverySource.this.concurrentStreamPool.setMaximumPoolSize(concurrentStreams);
}
}
}
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.shard.recovery; package org.elasticsearch.indices.recovery;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Maps;