diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 1bd4622e615..60ec296135e 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -58,8 +58,6 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION); indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION); indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TYPE); - indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_BUFFER_SIZE, Validator.BYTES_SIZE); - indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE, Validator.BYTES_SIZE); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, Validator.NON_NEGATIVE_INTEGER); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS); indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_READ_ONLY); diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 81631a453ea..6a220fc96d8 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShardComponent; @@ -41,8 +42,12 @@ import java.io.InputStream; */ public interface Translog extends IndexShardComponent, CloseableIndexComponent { + static ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb"); + public static final String TRANSLOG_ID_KEY = "translog_id"; + void updateBuffer(ByteSizeValue bufferSize); + void closeWithDelete(); /** diff --git a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java index 05485b50039..89b80e27295 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java @@ -176,4 +176,20 @@ public class BufferingFsTranslogFile implements FsTranslogFile { rwl.writeLock().unlock(); } } + + @Override + public void updateBufferSize(int bufferSize) { + rwl.writeLock().lock(); + try { + if (this.buffer.length == bufferSize) { + return; + } + flushBuffer(); + this.buffer = new byte[bufferSize]; + } catch (IOException e) { + throw new TranslogException(shardId, "failed to flush", e); + } finally { + rwl.writeLock().unlock(); + } + } } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 8e80e1d9d69..f7780be9d11 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -48,24 +48,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class FsTranslog extends AbstractIndexShardComponent implements Translog { public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type"; - public static final String INDEX_TRANSLOG_FS_BUFFER_SIZE = "index.translog.fs.buffer_size"; - public static final String INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE = "index.translog.fs.transient_buffer_size"; class ApplySettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { - int bufferSize = (int) settings.getAsBytesSize(INDEX_TRANSLOG_FS_BUFFER_SIZE, new ByteSizeValue(FsTranslog.this.bufferSize)).bytes(); - if (bufferSize != FsTranslog.this.bufferSize) { - logger.info("updating buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.bufferSize), new ByteSizeValue(bufferSize)); - FsTranslog.this.bufferSize = bufferSize; - } - - int transientBufferSize = (int) settings.getAsBytesSize(INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE, new ByteSizeValue(FsTranslog.this.transientBufferSize)).bytes(); - if (transientBufferSize != FsTranslog.this.transientBufferSize) { - logger.info("updating transient_buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.transientBufferSize), new ByteSizeValue(transientBufferSize)); - FsTranslog.this.transientBufferSize = transientBufferSize; - } - FsTranslogFile.Type type = FsTranslogFile.Type.fromString(settings.get(INDEX_TRANSLOG_FS_TYPE, FsTranslog.this.type.name())); if (type != FsTranslog.this.type) { logger.info("updating type from [{}] to [{}]", FsTranslog.this.type, type); @@ -86,8 +72,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog private boolean syncOnEachOperation = false; - private int bufferSize; - private int transientBufferSize; + private volatile int bufferSize; + private volatile int transientBufferSize; private final ApplySettings applySettings = new ApplySettings(); @@ -103,7 +89,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name())); - this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes(); + this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes(); // Not really interesting, updated by IndexingMemoryController... this.transientBufferSize = (int) componentSettings.getAsBytesSize("transient_buffer_size", ByteSizeValue.parseBytesSizeValue("8k")).bytes(); indexSettingsService.addListener(applySettings); @@ -128,6 +114,24 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog close(false); } + @Override + public void updateBuffer(ByteSizeValue bufferSize) { + this.bufferSize = bufferSize.bytesAsInt(); + rwl.writeLock().lock(); + try { + FsTranslogFile current1 = this.current; + if (current1 != null) { + current1.updateBufferSize(this.bufferSize); + } + current1 = this.trans; + if (current1 != null) { + current1.updateBufferSize(this.bufferSize); + } + } finally { + rwl.writeLock().unlock(); + } + } + private void close(boolean delete) { if (indexSettingsService != null) { indexSettingsService.removeListener(applySettings); diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java index eb7efd2e2a6..76f840735b9 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java @@ -71,6 +71,8 @@ public interface FsTranslogFile { void reuse(FsTranslogFile other) throws TranslogException; + void updateBufferSize(int bufferSize) throws TranslogException; + void sync(); boolean syncNeeded(); diff --git a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java index 9b2ca7a7fc8..f1c9f6b4bae 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java @@ -116,4 +116,9 @@ public class SimpleFsTranslogFile implements FsTranslogFile { public void reuse(FsTranslogFile other) { // nothing to do there } + + @Override + public void updateBufferSize(int bufferSize) throws TranslogException { + // nothing to do here... + } } diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index b83225294b8..2f7a3575cab 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -44,6 +44,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -51,17 +52,19 @@ import java.util.concurrent.ScheduledFuture; public class IndexingMemoryController extends AbstractLifecycleComponent { private final ThreadPool threadPool; - private final IndicesService indicesService; - private final ByteSizeValue indexingBuffer; - private final ByteSizeValue minShardIndexBufferSize; private final ByteSizeValue maxShardIndexBufferSize; + private final ByteSizeValue translogBuffer; + private final ByteSizeValue minShardTranslogBufferSize; + private final ByteSizeValue maxShardTranslogBufferSize; + private final TimeValue inactiveTime; private final TimeValue interval; + private final AtomicBoolean shardsCreatedOrDeleted = new AtomicBoolean(); private final Listener listener = new Listener(); @@ -94,12 +97,32 @@ public class IndexingMemoryController extends AbstractLifecycleComponent maxTranslogBuffer.bytes()) { + translogBuffer = maxTranslogBuffer; + } + } else { + translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, null); + } + this.translogBuffer = translogBuffer; + this.minShardTranslogBufferSize = componentSettings.getAsBytesSize("min_shard_translog_buffer_size", new ByteSizeValue(2, ByteSizeUnit.KB)); + this.maxShardTranslogBufferSize = componentSettings.getAsBytesSize("max_shard_translog_buffer_size", new ByteSizeValue(64, ByteSizeUnit.KB)); + this.inactiveTime = componentSettings.getAsTime("shard_inactive_time", TimeValue.timeValueMinutes(30)); // we need to have this relatively small to move a shard from inactive to active fast (enough) this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(30)); @@ -176,14 +199,16 @@ public class IndexingMemoryController extends AbstractLifecycleComponent maxShardIndexBufferSize.bytes()) { shardIndexingBufferSize = maxShardIndexBufferSize; } - logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize); + + ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / shardsCount); + if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) { + shardTranslogBufferSize = minShardTranslogBufferSize; + } + if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) { + shardTranslogBufferSize = maxShardTranslogBufferSize; + } + + logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); if (status == null || !status.inactiveIndexing) { try { ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize); + ((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize); } catch (EngineClosedException e) { // ignore continue; @@ -245,10 +277,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent