diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 3af35610582..1e5b402518d 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -274,7 +274,7 @@ public class Store extends AbstractIndexShardComponent { /** * The idea of the store directory is to cache file level meta data, as well as md5 of it */ - protected class StoreDirectory extends Directory implements ForceSyncDirectory { + class StoreDirectory extends Directory implements ForceSyncDirectory { private final Directory[] delegates; diff --git a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java new file mode 100644 index 00000000000..7809c78bf12 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java @@ -0,0 +1,169 @@ +package org.elasticsearch.index.translog.fs; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + */ +public class BufferingFsTranslogFile implements FsTranslogFile { + + private final long id; + private final ShardId shardId; + private final RafReference raf; + + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + + private volatile int operationCounter; + + private long lastPosition; + private volatile long lastWrittenPosition; + + private volatile long lastSyncPosition = 0; + + private byte[] buffer; + private int bufferCount; + + public BufferingFsTranslogFile(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException { + this.shardId = shardId; + this.id = id; + this.raf = raf; + this.buffer = new byte[bufferSize]; + raf.raf().setLength(0); + } + + public long id() { + return this.id; + } + + public int estimatedNumberOfOperations() { + return operationCounter; + } + + public long translogSizeInBytes() { + return lastWrittenPosition; + } + + @Override + public Translog.Location add(byte[] data, int from, int size) throws IOException { + rwl.writeLock().lock(); + try { + operationCounter++; + long position = lastPosition; + if (size >= buffer.length) { + flushBuffer(); + raf.raf().write(data, from, size); + lastWrittenPosition += size; + lastPosition += size; + return new Translog.Location(id, position, size); + } + if (size > buffer.length - bufferCount) { + flushBuffer(); + } + System.arraycopy(data, from, buffer, bufferCount, size); + bufferCount += size; + lastPosition += size; + return new Translog.Location(id, position, size); + } finally { + rwl.writeLock().unlock(); + } + } + + private void flushBuffer() throws IOException { + if (bufferCount > 0) { + raf.raf().write(buffer, 0, bufferCount); + lastWrittenPosition += bufferCount; + bufferCount = 0; + } + } + + @Override + public byte[] read(Translog.Location location) throws IOException { + rwl.readLock().lock(); + try { + if (location.translogLocation >= lastWrittenPosition) { + byte[] data = new byte[location.size]; + System.arraycopy(buffer, (int) (location.translogLocation - lastWrittenPosition), data, 0, location.size); + return data; + } + } finally { + rwl.readLock().unlock(); + } + ByteBuffer buffer = ByteBuffer.allocate(location.size); + raf.channel().read(buffer, location.translogLocation); + return buffer.array(); + } + + @Override + public FsChannelSnapshot snapshot() throws TranslogException { + rwl.writeLock().lock(); + try { + flushBuffer(); + if (!raf.increaseRefCount()) { + return null; + } + return new FsChannelSnapshot(this.id, raf, lastWrittenPosition, operationCounter); + } catch (IOException e) { + throw new TranslogException(shardId, "failed to flush", e); + } finally { + rwl.writeLock().unlock(); + } + } + + @Override + public void sync() { + try { + // check if we really need to sync here... + long last = lastWrittenPosition; + if (last == lastSyncPosition) { + return; + } + lastSyncPosition = last; + rwl.writeLock().lock(); + try { + flushBuffer(); + } finally { + rwl.writeLock().unlock(); + } + raf.channel().force(false); + } catch (Exception e) { + // ignore + } + } + + @Override + public void close(boolean delete) { + if (!delete) { + rwl.writeLock().lock(); + try { + flushBuffer(); + } catch (IOException e) { + throw new TranslogException(shardId, "failed to close", e); + } finally { + rwl.writeLock().unlock(); + } + } + raf.decreaseRefCount(delete); + } + + @Override + public void reuse(FsTranslogFile other) { + if (!(other instanceof BufferingFsTranslogFile)) { + return; + } + rwl.writeLock().lock(); + try { + flushBuffer(); + this.buffer = ((BufferingFsTranslogFile) other).buffer; + } catch (IOException e) { + throw new TranslogException(shardId, "failed to flush", e); + } finally { + rwl.writeLock().unlock(); + } + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index d31f21e9efb..23a4f6d6abd 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -20,13 +20,16 @@ package org.elasticsearch.index.translog.fs; import jsr166y.ThreadLocalRandom; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -44,29 +47,99 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class FsTranslog extends AbstractIndexShardComponent implements Translog { + static { + IndexMetaData.addDynamicSettings( + "index.translog.fs.type", + "index.translog.fs.buffer_size", + "index.translog.fs.transient_buffer_size" + ); + } + + class ApplySettings implements IndexSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + int bufferSize = (int) settings.getAsBytesSize("index.translog.fs.buffer_size", new ByteSizeValue(FsTranslog.this.bufferSize)).bytes(); + if (bufferSize != FsTranslog.this.bufferSize) { + logger.info("updating buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.bufferSize), new ByteSizeValue(bufferSize)); + FsTranslog.this.bufferSize = bufferSize; + } + + int transientBufferSize = (int) settings.getAsBytesSize("index.translog.fs.transient_buffer_size", new ByteSizeValue(FsTranslog.this.transientBufferSize)).bytes(); + if (transientBufferSize != FsTranslog.this.transientBufferSize) { + logger.info("updating transient_buffer_size from [{}] to [{}]", new ByteSizeValue(FsTranslog.this.transientBufferSize), new ByteSizeValue(transientBufferSize)); + FsTranslog.this.transientBufferSize = transientBufferSize; + } + + FsTranslogFile.Type type = FsTranslogFile.Type.fromString(settings.get("index.translog.fs.type", FsTranslog.this.type.name())); + if (type != FsTranslog.this.type) { + logger.info("updating type from [{}] to [{}]", FsTranslog.this.type, type); + FsTranslog.this.type = type; + } + } + } + + private final IndexSettingsService indexSettingsService; + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final File[] locations; private volatile FsTranslogFile current; private volatile FsTranslogFile trans; + private FsTranslogFile.Type type; + private boolean syncOnEachOperation = false; + private int bufferSize; + private int transientBufferSize; + + private final ApplySettings applySettings = new ApplySettings(); + @Inject - public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { + public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv) { super(shardId, indexSettings); + this.indexSettingsService = indexSettingsService; File[] shardLocations = nodeEnv.shardLocations(shardId); this.locations = new File[shardLocations.length]; for (int i = 0; i < shardLocations.length; i++) { locations[i] = new File(shardLocations[i], "translog"); FileSystemUtils.mkdirs(locations[i]); } + + this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name())); + this.bufferSize = (int) componentSettings.getAsBytesSize("buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes(); + this.transientBufferSize = (int) componentSettings.getAsBytesSize("transient_buffer_size", ByteSizeValue.parseBytesSizeValue("8k")).bytes(); + + indexSettingsService.addListener(applySettings); } public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) { super(shardId, indexSettings); + this.indexSettingsService = null; this.locations = new File[]{location}; FileSystemUtils.mkdirs(location); + + this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name())); + } + + @Override + public void close(boolean delete) { + if (indexSettingsService != null) { + indexSettingsService.removeListener(applySettings); + } + rwl.writeLock().lock(); + try { + FsTranslogFile current1 = this.current; + if (current1 != null) { + current1.close(delete); + } + current1 = this.trans; + if (current1 != null) { + current1.close(delete); + } + } finally { + rwl.writeLock().unlock(); + } } public File[] locations() { @@ -149,7 +222,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } try { - newFile = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); + newFile = type.create(shardId, id, new RafReference(new File(location, "translog-" + id)), bufferSize); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -184,7 +257,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog location = file; } } - this.trans = new SimpleFsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); + this.trans = type.create(shardId, id, new RafReference(new File(location, "translog-" + id)), transientBufferSize); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } finally { @@ -205,6 +278,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog rwl.writeLock().unlock(); } old.close(true); + current.reuse(old); } @Override @@ -311,22 +385,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Override public void syncOnEachOperation(boolean syncOnEachOperation) { this.syncOnEachOperation = syncOnEachOperation; - } - - @Override - public void close(boolean delete) { - rwl.writeLock().lock(); - try { - FsTranslogFile current1 = this.current; - if (current1 != null) { - current1.close(delete); - } - current1 = this.trans; - if (current1 != null) { - current1.close(delete); - } - } finally { - rwl.writeLock().unlock(); + if (syncOnEachOperation) { + type = FsTranslogFile.Type.SIMPLE; + } else { + type = FsTranslogFile.Type.BUFFERED; } } } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java index b774eba83ae..68a70130310 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.translog.fs; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; @@ -26,6 +28,33 @@ import java.io.IOException; public interface FsTranslogFile { + public static enum Type { + + SIMPLE() { + @Override + public FsTranslogFile create(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException { + return new SimpleFsTranslogFile(shardId, id, raf); + } + }, + BUFFERED() { + @Override + public FsTranslogFile create(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException { + return new BufferingFsTranslogFile(shardId, id, raf, bufferSize); + } + }; + + public abstract FsTranslogFile create(ShardId shardId, long id, RafReference raf, int bufferSize) throws IOException; + + public static Type fromString(String type) throws ElasticSearchIllegalArgumentException { + if (SIMPLE.name().equalsIgnoreCase(type)) { + return SIMPLE; + } else if (BUFFERED.name().equalsIgnoreCase(type)) { + return BUFFERED; + } + throw new ElasticSearchIllegalArgumentException("No translog fs type [" + type + "]"); + } + } + long id(); int estimatedNumberOfOperations(); @@ -36,9 +65,11 @@ public interface FsTranslogFile { byte[] read(Translog.Location location) throws IOException; - void close(boolean delete); + void close(boolean delete) throws TranslogException; FsChannelSnapshot snapshot() throws TranslogException; + void reuse(FsTranslogFile other) throws TranslogException; + void sync(); } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java index f9a3619f987..3a64f219d88 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java @@ -105,4 +105,9 @@ public class SimpleFsTranslogFile implements FsTranslogFile { // ignore } } + + @Override + public void reuse(FsTranslogFile other) { + // nothing to do there + } } diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index dd8ba0086fa..1aa5c91264f 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; -import org.elasticsearch.indices.memory.IndexingMemoryBufferController; +import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.query.IndicesQueriesModule; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySource; @@ -62,7 +62,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(RecoverySource.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton(); - bind(IndexingMemoryBufferController.class).asEagerSingleton(); + bind(IndexingMemoryController.class).asEagerSingleton(); bind(IndicesNodeFilterCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryBufferController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java similarity index 88% rename from src/main/java/org/elasticsearch/indices/memory/IndexingMemoryBufferController.java rename to src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 71aec5faa82..5b3b9889a3a 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryBufferController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.memory; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -40,13 +41,14 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; /** * */ -public class IndexingMemoryBufferController extends AbstractLifecycleComponent { +public class IndexingMemoryController extends AbstractLifecycleComponent { private final ThreadPool threadPool; @@ -70,7 +72,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent activeToInactiveIndexingShards = Lists.newArrayList(); + List inactiveToActiveIndexingShards = Lists.newArrayList(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { long time = threadPool.estimatedTimeInMillis(); @@ -145,27 +149,20 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent inactiveTime.millis() && indexShard.mergeStats().current() == 0) { - try { - ((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(Engine.INACTIVE_SHARD_INDEXING_BUFFER); - } catch (EngineClosedException e) { - // ignore - continue; - } catch (FlushNotAllowedEngineException e) { - // ignore - continue; - } // inactive for this amount of time, mark it - status.inactive = true; + activeToInactiveIndexingShards.add(indexShard); + status.inactiveIndexing = true; activeInactiveStatusChanges = true; logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER); } } } else { - if (status.inactive) { - status.inactive = false; + if (status.inactiveIndexing) { + inactiveToActiveIndexingShards.add(indexShard); + status.inactiveIndexing = false; activeInactiveStatusChanges = true; logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); } @@ -175,6 +172,16 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent