automatically set translog buffer size based on number of shards

similar to how we set the indexing buffer size, automatically set the translog buffer size based on the number of shards allocated on a node
This commit is contained in:
Shay Banon 2013-07-06 10:56:36 -07:00
parent 4574489c27
commit cc1173b58f
7 changed files with 95 additions and 37 deletions

View File

@ -58,8 +58,6 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TYPE);
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_BUFFER_SIZE, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_READ_ONLY);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardComponent;
@ -41,8 +42,12 @@ import java.io.InputStream;
*/
public interface Translog extends IndexShardComponent, CloseableIndexComponent {
static ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb");
public static final String TRANSLOG_ID_KEY = "translog_id";
void updateBuffer(ByteSizeValue bufferSize);
void closeWithDelete();
/**

View File

@ -176,4 +176,20 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
rwl.writeLock().unlock();
}
}
@Override
public void updateBufferSize(int bufferSize) {
rwl.writeLock().lock();
try {
if (this.buffer.length == bufferSize) {
return;
}
flushBuffer();
this.buffer = new byte[bufferSize];
} catch (IOException e) {
throw new TranslogException(shardId, "failed to flush", e);
} finally {
rwl.writeLock().unlock();
}
}
}

View File

@ -48,24 +48,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class FsTranslog extends AbstractIndexShardComponent implements Translog {
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
public static final String INDEX_TRANSLOG_FS_BUFFER_SIZE = "index.translog.fs.buffer_size";
public static final String INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE = "index.translog.fs.transient_buffer_size";
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
int bufferSize = (int) settings.getAsBytesSize(INDEX_TRANSLOG_FS_BUFFER_SIZE, new ByteSizeValue(FsTranslog.this.bufferSize)).bytes();
if (bufferSize != FsTranslog.this.bufferSize) {
logger.info("updating buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.bufferSize), new ByteSizeValue(bufferSize));
FsTranslog.this.bufferSize = bufferSize;
}
int transientBufferSize = (int) settings.getAsBytesSize(INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE, new ByteSizeValue(FsTranslog.this.transientBufferSize)).bytes();
if (transientBufferSize != FsTranslog.this.transientBufferSize) {
logger.info("updating transient_buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.transientBufferSize), new ByteSizeValue(transientBufferSize));
FsTranslog.this.transientBufferSize = transientBufferSize;
}
FsTranslogFile.Type type = FsTranslogFile.Type.fromString(settings.get(INDEX_TRANSLOG_FS_TYPE, FsTranslog.this.type.name()));
if (type != FsTranslog.this.type) {
logger.info("updating type from [{}] to [{}]", FsTranslog.this.type, type);
@ -86,8 +72,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private boolean syncOnEachOperation = false;
private int bufferSize;
private int transientBufferSize;
private volatile int bufferSize;
private volatile int transientBufferSize;
private final ApplySettings applySettings = new ApplySettings();
@ -103,7 +89,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name()));
this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes();
this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes(); // Not really interesting, updated by IndexingMemoryController...
this.transientBufferSize = (int) componentSettings.getAsBytesSize("transient_buffer_size", ByteSizeValue.parseBytesSizeValue("8k")).bytes();
indexSettingsService.addListener(applySettings);
@ -128,6 +114,24 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
close(false);
}
@Override
public void updateBuffer(ByteSizeValue bufferSize) {
this.bufferSize = bufferSize.bytesAsInt();
rwl.writeLock().lock();
try {
FsTranslogFile current1 = this.current;
if (current1 != null) {
current1.updateBufferSize(this.bufferSize);
}
current1 = this.trans;
if (current1 != null) {
current1.updateBufferSize(this.bufferSize);
}
} finally {
rwl.writeLock().unlock();
}
}
private void close(boolean delete) {
if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings);

View File

@ -71,6 +71,8 @@ public interface FsTranslogFile {
void reuse(FsTranslogFile other) throws TranslogException;
void updateBufferSize(int bufferSize) throws TranslogException;
void sync();
boolean syncNeeded();

View File

@ -116,4 +116,9 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
public void reuse(FsTranslogFile other) {
// nothing to do there
}
@Override
public void updateBufferSize(int bufferSize) throws TranslogException {
// nothing to do here...
}
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -51,17 +52,19 @@ import java.util.concurrent.ScheduledFuture;
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
private final ThreadPool threadPool;
private final IndicesService indicesService;
private final ByteSizeValue indexingBuffer;
private final ByteSizeValue minShardIndexBufferSize;
private final ByteSizeValue maxShardIndexBufferSize;
private final ByteSizeValue translogBuffer;
private final ByteSizeValue minShardTranslogBufferSize;
private final ByteSizeValue maxShardTranslogBufferSize;
private final TimeValue inactiveTime;
private final TimeValue interval;
private final AtomicBoolean shardsCreatedOrDeleted = new AtomicBoolean();
private final Listener listener = new Listener();
@ -94,12 +97,32 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
} else {
indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, null);
}
this.indexingBuffer = indexingBuffer;
this.minShardIndexBufferSize = componentSettings.getAsBytesSize("min_shard_index_buffer_size", new ByteSizeValue(4, ByteSizeUnit.MB));
// LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense: https://issues.apache.org/jira/browse/LUCENE-2324?focusedCommentId=13005155&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13005155
this.maxShardIndexBufferSize = componentSettings.getAsBytesSize("max_shard_index_buffer_size", new ByteSizeValue(512, ByteSizeUnit.MB));
ByteSizeValue translogBuffer;
String translogBufferSetting = componentSettings.get("translog_buffer_size", "1%");
if (translogBufferSetting.endsWith("%")) {
double percent = Double.parseDouble(translogBufferSetting.substring(0, translogBufferSetting.length() - 1));
translogBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100)));
ByteSizeValue minTranslogBuffer = componentSettings.getAsBytesSize("min_translog_buffer_size", new ByteSizeValue(256, ByteSizeUnit.KB));
ByteSizeValue maxTranslogBuffer = componentSettings.getAsBytesSize("max_translog_buffer_size", null);
if (translogBuffer.bytes() < minTranslogBuffer.bytes()) {
translogBuffer = minTranslogBuffer;
}
if (maxTranslogBuffer != null && translogBuffer.bytes() > maxTranslogBuffer.bytes()) {
translogBuffer = maxTranslogBuffer;
}
} else {
translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, null);
}
this.translogBuffer = translogBuffer;
this.minShardTranslogBufferSize = componentSettings.getAsBytesSize("min_shard_translog_buffer_size", new ByteSizeValue(2, ByteSizeUnit.KB));
this.maxShardTranslogBufferSize = componentSettings.getAsBytesSize("max_shard_translog_buffer_size", new ByteSizeValue(64, ByteSizeUnit.KB));
this.inactiveTime = componentSettings.getAsTime("shard_inactive_time", TimeValue.timeValueMinutes(30));
// we need to have this relatively small to move a shard from inactive to active fast (enough)
this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(30));
@ -176,14 +199,16 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
// update inactive indexing buffer size
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER);
((InternalIndexShard) indexShard).translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
// ignore
}
}
if (activeInactiveStatusChanges) {
calcAndSetShardIndexingBuffer("shards became active/inactive (indexing wise)");
boolean shardsCreatedOrDeleted = IndexingMemoryController.this.shardsCreatedOrDeleted.compareAndSet(true, false);
if (shardsCreatedOrDeleted || activeInactiveStatusChanges) {
calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] created/deleted[" + shardsCreatedOrDeleted + "]");
}
}
}
@ -194,43 +219,50 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
@Override
public void afterIndexShardCreated(IndexShard indexShard) {
synchronized (mutex) {
calcAndSetShardIndexingBuffer("created_shard[" + indexShard.shardId().index().name() + "][" + indexShard.shardId().id() + "]");
shardsIndicesStatus.put(indexShard.shardId(), new ShardIndexingStatus());
shardsCreatedOrDeleted.set(true);
}
}
@Override
public void afterIndexShardClosed(ShardId shardId) {
synchronized (mutex) {
calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]");
shardsIndicesStatus.remove(shardId);
shardsCreatedOrDeleted.set(true);
}
}
}
private void calcAndSetShardIndexingBuffer(String reason) {
private void calcAndSetShardBuffers(String reason) {
int shardsCount = countShards();
if (shardsCount == 0) {
return;
}
ByteSizeValue shardIndexingBufferSize = calcShardIndexingBuffer(shardsCount);
if (shardIndexingBufferSize == null) {
return;
}
ByteSizeValue shardIndexingBufferSize = new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
if (shardIndexingBufferSize.bytes() < minShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = minShardIndexBufferSize;
}
if (shardIndexingBufferSize.bytes() > maxShardIndexBufferSize.bytes()) {
shardIndexingBufferSize = maxShardIndexBufferSize;
}
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize);
ByteSizeValue shardTranslogBufferSize = new ByteSizeValue(translogBuffer.bytes() / shardsCount);
if (shardTranslogBufferSize.bytes() < minShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = minShardTranslogBufferSize;
}
if (shardTranslogBufferSize.bytes() > maxShardTranslogBufferSize.bytes()) {
shardTranslogBufferSize = maxShardTranslogBufferSize;
}
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId());
if (status == null || !status.inactiveIndexing) {
try {
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
((InternalIndexShard) indexShard).translog().updateBuffer(shardTranslogBufferSize);
} catch (EngineClosedException e) {
// ignore
continue;
@ -245,10 +277,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
}
private ByteSizeValue calcShardIndexingBuffer(int shardsCount) {
return new ByteSizeValue(indexingBuffer.bytes() / shardsCount);
}
private int countShards() {
int shardsCount = 0;
for (IndexService indexService : indicesService) {