From fcfd98e9e89231d748ae66c81791b0b08b0c6200 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 21 Dec 2015 16:44:35 +0100 Subject: [PATCH] Drop support for simple translog and hard-wire buffer to 8kb Today we have two variants of translogs for indexing. We only recommend the buffered one which also has a 20% advantage in indexing speed. This commit removes the option and defaults to the buffered case. It also hard-wires the translog buffer to 8kb instead of 64kb. We used to adjust that buffer based on if the shard is active or not, this code has also been removed and instead we just keep an 8kb buffer arround. --- .../elasticsearch/cluster/ClusterModule.java | 1 - .../elasticsearch/index/shard/IndexShard.java | 12 +- .../translog/BufferingTranslogWriter.java | 177 ----------------- .../index/translog/Translog.java | 9 +- .../index/translog/TranslogConfig.java | 51 ++--- .../index/translog/TranslogWriter.java | 181 +++++++++++------- .../memory/IndexingMemoryController.java | 59 +----- .../index/translog/BufferedTranslogTests.java | 44 ----- .../index/translog/TranslogTests.java | 18 +- .../memory/IndexingMemoryControllerTests.java | 53 ++--- .../reference/index-modules/translog.asciidoc | 23 +-- docs/reference/migration/migrate_3_0.asciidoc | 3 +- .../elasticsearch/test/ESIntegTestCase.java | 1 - 13 files changed, 165 insertions(+), 467 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java delete mode 100644 core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 6f7eb20874f..6a4831a2eee 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -140,7 +140,6 @@ public class ClusterModule extends AbstractModule { registerIndexDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*", Validator.EMPTY); registerIndexDynamicSetting(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, Validator.EMPTY); registerIndexDynamicSetting(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, Validator.EMPTY); - registerIndexDynamicSetting(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, Validator.EMPTY); registerIndexDynamicSetting(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, Validator.NON_NEGATIVE_INTEGER); registerIndexDynamicSetting(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, Validator.EMPTY); registerIndexDynamicSetting(IndexMetaData.SETTING_READ_ONLY, Validator.EMPTY); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b0d19a2c0b2..fd8490ae8e0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1016,7 +1016,7 @@ public class IndexShard extends AbstractIndexShardComponent { * Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than * the new buffering indexing size then we do a refresh to free up the heap. */ - public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + public void updateBufferSize(ByteSizeValue shardIndexingBufferSize) { final EngineConfig config = engineConfig; final ByteSizeValue preValue = config.getIndexingBufferSize(); @@ -1054,8 +1054,6 @@ public class IndexShard extends AbstractIndexShardComponent { logger.debug(message); } } - - engine.getTranslog().updateBuffer(shardTranslogBufferSize); } /** @@ -1072,7 +1070,7 @@ public class IndexShard extends AbstractIndexShardComponent { if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { boolean wasActive = active.getAndSet(false); if (wasActive) { - updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); + updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); logger.debug("marking shard as inactive (inactive_time=[{}]) indexing wise", inactiveTime); indexEventListener.onShardInactive(this); } @@ -1161,12 +1159,6 @@ public class IndexShard extends AbstractIndexShardComponent { this.flushOnClose = flushOnClose; } - TranslogWriter.Type type = TranslogWriter.Type.fromString(settings.get(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, translogConfig.getType().name())); - if (type != translogConfig.getType()) { - logger.info("updating type from [{}] to [{}]", translogConfig.getType(), type); - translogConfig.setType(type); - } - final Translog.Durabilty durabilty = getFromSettings(logger, settings, translogConfig.getDurabilty()); if (durabilty != translogConfig.getDurabilty()) { logger.info("updating durability from [{}] to [{}]", translogConfig.getDurabilty(), durabilty); diff --git a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java deleted file mode 100644 index a2eb0bff646..00000000000 --- a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.Channels; -import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -/** - */ -public final class BufferingTranslogWriter extends TranslogWriter { - private byte[] buffer; - private int bufferCount; - private WrapperOutputStream bufferOs = new WrapperOutputStream(); - - /* the total offset of this file including the bytes written to the file as well as into the buffer */ - private volatile long totalOffset; - - public BufferingTranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, int bufferSize) throws IOException { - super(shardId, generation, channelReference); - this.buffer = new byte[bufferSize]; - this.totalOffset = writtenOffset; - } - - @Override - public Translog.Location add(BytesReference data) throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - final long offset = totalOffset; - if (data.length() >= buffer.length) { - flush(); - // we use the channel to write, since on windows, writing to the RAF might not be reflected - // when reading through the channel - try { - data.writeTo(channel); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - writtenOffset += data.length(); - totalOffset += data.length(); - } else { - if (data.length() > buffer.length - bufferCount) { - flush(); - } - data.writeTo(bufferOs); - totalOffset += data.length(); - } - operationCounter++; - return new Translog.Location(generation, offset, data.length()); - } - } - - protected final void flush() throws IOException { - assert writeLock.isHeldByCurrentThread(); - if (bufferCount > 0) { - ensureOpen(); - // we use the channel to write, since on windows, writing to the RAF might not be reflected - // when reading through the channel - final int bufferSize = bufferCount; - try { - Channels.writeToChannel(buffer, 0, bufferSize, channel); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - writtenOffset += bufferSize; - bufferCount = 0; - } - } - - @Override - protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException { - try (ReleasableLock lock = readLock.acquire()) { - if (position >= writtenOffset) { - assert targetBuffer.hasArray() : "buffer must have array"; - final int sourcePosition = (int) (position - writtenOffset); - System.arraycopy(buffer, sourcePosition, - targetBuffer.array(), targetBuffer.position(), targetBuffer.limit()); - targetBuffer.position(targetBuffer.limit()); - return; - } - } - // we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete - // for the requested location. - Channels.readFromFileChannelWithEofException(channel, position, targetBuffer); - } - - @Override - public boolean syncNeeded() { - return totalOffset != lastSyncedOffset; - } - - @Override - public synchronized void sync() throws IOException { - if (syncNeeded()) { - ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event - channelReference.incRef(); - try { - final long offsetToSync; - final int opsCounter; - try (ReleasableLock lock = writeLock.acquire()) { - flush(); - offsetToSync = totalOffset; - opsCounter = operationCounter; - } - // we can do this outside of the write lock but we have to protect from - // concurrent syncs - ensureOpen(); // just for kicks - the checkpoint happens or not either way - try { - checkpoint(offsetToSync, opsCounter, channelReference); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - lastSyncedOffset = offsetToSync; - } finally { - channelReference.decRef(); - } - } - } - - - public void updateBufferSize(int bufferSize) { - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - if (this.buffer.length != bufferSize) { - flush(); - this.buffer = new byte[bufferSize]; - } - } catch (IOException e) { - throw new TranslogException(shardId, "failed to flush", e); - } - } - - class WrapperOutputStream extends OutputStream { - - @Override - public void write(int b) throws IOException { - buffer[bufferCount++] = (byte) b; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - // we do safety checked when we decide to use this stream... - System.arraycopy(b, off, buffer, bufferCount, len); - bufferCount += len; - } - } - - @Override - public long sizeInBytes() { - return totalOffset; - } -} diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 17c7f753137..7da54ed8a37 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -280,13 +280,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC throw new IllegalArgumentException("can't parse id from file: " + fileName); } - public void updateBuffer(ByteSizeValue bufferSize) { - config.setBufferSize(bufferSize.bytesAsInt()); - try (ReleasableLock lock = writeLock.acquire()) { - current.updateBufferSize(config.getBufferSize()); - } - } - /** Returns {@code true} if this {@code Translog} is still open. */ public boolean isOpen() { return closed.get() == false; @@ -367,7 +360,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC TranslogWriter createWriter(long fileGeneration) throws IOException { TranslogWriter newFile; try { - newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize(), getChannelFactory()); + newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), getChannelFactory(), config.getBufferSize()); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index ca479bec080..442792f3132 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -20,13 +20,13 @@ package org.elasticsearch.index.translog; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog.TranslogGeneration; -import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; @@ -39,21 +39,19 @@ import java.nio.file.Path; public final class TranslogConfig { public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability"; - public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type"; - public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size"; public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval"; + public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB); private final TimeValue syncInterval; private final BigArrays bigArrays; private final ThreadPool threadPool; private final boolean syncOnEachOperation; - private volatile int bufferSize; private volatile TranslogGeneration translogGeneration; private volatile Translog.Durabilty durabilty = Translog.Durabilty.REQUEST; - private volatile TranslogWriter.Type type; private final IndexSettings indexSettings; private final ShardId shardId; private final Path translogPath; + private final ByteSizeValue bufferSize; /** * Creates a new TranslogConfig instance @@ -65,14 +63,17 @@ public final class TranslogConfig { * @param threadPool a {@link ThreadPool} to schedule async sync durability */ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, Translog.Durabilty durabilty, BigArrays bigArrays, @Nullable ThreadPool threadPool) { + this(shardId, translogPath, indexSettings, durabilty, bigArrays, threadPool, DEFAULT_BUFFER_SIZE); + } + + TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, Translog.Durabilty durabilty, BigArrays bigArrays, @Nullable ThreadPool threadPool, ByteSizeValue bufferSize) { + this.bufferSize = bufferSize; this.indexSettings = indexSettings; this.shardId = shardId; this.translogPath = translogPath; this.durabilty = durabilty; this.threadPool = threadPool; this.bigArrays = bigArrays; - this.type = TranslogWriter.Type.fromString(indexSettings.getSettings().get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name())); - this.bufferSize = (int) indexSettings.getSettings().getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController... syncInterval = indexSettings.getSettings().getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5)); if (syncInterval.millis() > 0 && threadPool != null) { @@ -84,6 +85,7 @@ public final class TranslogConfig { } } + /** * Returns a {@link ThreadPool} to schedule async durability operations */ @@ -105,20 +107,6 @@ public final class TranslogConfig { this.durabilty = durabilty; } - /** - * Returns the translog type - */ - public TranslogWriter.Type getType() { - return type; - } - - /** - * Sets the TranslogType for this Translog. The change will affect all subsequent translog files. - */ - public void setType(TranslogWriter.Type type) { - this.type = type; - } - /** * Returns true iff each low level operation shoudl be fsynced */ @@ -126,20 +114,6 @@ public final class TranslogConfig { return syncOnEachOperation; } - /** - * Retruns the current translog buffer size. - */ - public int getBufferSize() { - return bufferSize; - } - - /** - * Sets the current buffer size - for setting a live setting use {@link Translog#updateBuffer(ByteSizeValue)} - */ - public void setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - } - /** * Returns the current async fsync interval */ @@ -192,4 +166,11 @@ public final class TranslogConfig { public void setTranslogGeneration(TranslogGeneration translogGeneration) { this.translogGeneration = translogGeneration; } + + /** + * The translog buffer size. Default is 8kb + */ + public ByteSizeValue getBufferSize() { + return bufferSize; + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 975d722b085..49392088692 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -28,11 +28,14 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.shard.ShardId; +import java.io.BufferedOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -49,30 +52,39 @@ public class TranslogWriter extends TranslogReader { public static final int VERSION_CHECKPOINTS = 2; // since 2.0 we have checkpoints? public static final int VERSION = VERSION_CHECKPOINTS; - protected final ShardId shardId; - protected final ReleasableLock readLock; - protected final ReleasableLock writeLock; + private final ShardId shardId; + private final ReleasableLock readLock; + private final ReleasableLock writeLock; /* the offset in bytes that was written when the file was last synced*/ - protected volatile long lastSyncedOffset; + private volatile long lastSyncedOffset; /* the number of translog operations written to this file */ - protected volatile int operationCounter; + private volatile int operationCounter; /* the offset in bytes written to the file */ - protected volatile long writtenOffset; + private volatile long writtenOffset; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ private volatile Throwable tragedy; + private final byte[] buffer; + private int bufferCount; + private WrapperOutputStream bufferOs = new WrapperOutputStream(); - public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException { + /* the total offset of this file including the bytes written to the file as well as into the buffer */ + private volatile long totalOffset; + + + public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException { super(generation, channelReference, channelReference.getChannel().position()); this.shardId = shardId; ReadWriteLock rwl = new ReentrantReadWriteLock(); readLock = new ReleasableLock(rwl.readLock()); writeLock = new ReleasableLock(rwl.writeLock()); this.writtenOffset = channelReference.getChannel().position(); - this.lastSyncedOffset = channelReference.getChannel().position();; + this.totalOffset = writtenOffset; + this.buffer = new byte[bufferSize.bytesAsInt()]; + this.lastSyncedOffset = channelReference.getChannel().position(); } - public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, int bufferSize, ChannelFactory channelFactory) throws IOException { + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT; final FileChannel channel = channelFactory.open(file); @@ -85,7 +97,7 @@ public class TranslogWriter extends TranslogReader { out.writeBytes(ref.bytes, ref.offset, ref.length); channel.force(false); writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE); - final TranslogWriter writer = type.create(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize); + final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize); return writer; } catch (Throwable throwable){ IOUtils.closeWhileHandlingException(channel); @@ -104,34 +116,7 @@ public class TranslogWriter extends TranslogReader { return tragedy; } - public enum Type { - - SIMPLE() { - @Override - public TranslogWriter create(ShardId shardId, long generation, ChannelReference channelReference, int bufferSize) throws IOException { - return new TranslogWriter(shardId, generation, channelReference); - } - }, - BUFFERED() { - @Override - public TranslogWriter create(ShardId shardId, long generation, ChannelReference channelReference, int bufferSize) throws IOException { - return new BufferingTranslogWriter(shardId, generation, channelReference, bufferSize); - } - }; - - public abstract TranslogWriter create(ShardId shardId, long generation, ChannelReference raf, int bufferSize) throws IOException; - - public static Type fromString(String type) { - if (SIMPLE.name().equalsIgnoreCase(type)) { - return SIMPLE; - } else if (BUFFERED.name().equalsIgnoreCase(type)) { - return BUFFERED; - } - throw new IllegalArgumentException("No translog fs type [" + type + "]"); - } - } - - protected final void closeWithTragicEvent(Throwable throwable) throws IOException { + private final void closeWithTragicEvent(Throwable throwable) throws IOException { try (ReleasableLock lock = writeLock.acquire()) { if (tragedy == null) { tragedy = throwable; @@ -146,38 +131,60 @@ public class TranslogWriter extends TranslogReader { * add the given bytes to the translog and return the location they were written at */ public Translog.Location add(BytesReference data) throws IOException { - final long position; try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); - position = writtenOffset; - try { - data.writeTo(channel); - } catch (Throwable e) { - closeWithTragicEvent(e); - throw e; + final long offset = totalOffset; + if (data.length() >= buffer.length) { + flush(); + // we use the channel to write, since on windows, writing to the RAF might not be reflected + // when reading through the channel + try { + data.writeTo(channel); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + writtenOffset += data.length(); + totalOffset += data.length(); + } else { + if (data.length() > buffer.length - bufferCount) { + flush(); + } + data.writeTo(bufferOs); + totalOffset += data.length(); } - writtenOffset = writtenOffset + data.length(); - operationCounter++;; + operationCounter++; + return new Translog.Location(generation, offset, data.length()); } - return new Translog.Location(generation, position, data.length()); - } - - /** - * change the size of the internal buffer if relevant - */ - public void updateBufferSize(int bufferSize) throws TranslogException { } /** * write all buffered ops to disk and fsync file */ - public synchronized void sync() throws IOException { // synchronized to ensure only one sync happens a time - // check if we really need to sync here... + public synchronized void sync() throws IOException { if (syncNeeded()) { - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - checkpoint(writtenOffset, operationCounter, channelReference); - lastSyncedOffset = writtenOffset; + ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event + channelReference.incRef(); + try { + final long offsetToSync; + final int opsCounter; + try (ReleasableLock lock = writeLock.acquire()) { + flush(); + offsetToSync = totalOffset; + opsCounter = operationCounter; + } + // we can do this outside of the write lock but we have to protect from + // concurrent syncs + ensureOpen(); // just for kicks - the checkpoint happens or not either way + try { + checkpoint(offsetToSync, opsCounter, channelReference); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + lastSyncedOffset = offsetToSync; + } finally { + channelReference.decRef(); } } } @@ -185,9 +192,7 @@ public class TranslogWriter extends TranslogReader { /** * returns true if there are buffered ops */ - public boolean syncNeeded() { - return writtenOffset != lastSyncedOffset; // by default nothing is buffered - } + public boolean syncNeeded() { return totalOffset != lastSyncedOffset; } @Override public int totalOperations() { @@ -196,14 +201,29 @@ public class TranslogWriter extends TranslogReader { @Override public long sizeInBytes() { - return writtenOffset; + return totalOffset; } /** * Flushes the buffer if the translog is buffered. */ - protected void flush() throws IOException { + private final void flush() throws IOException { + assert writeLock.isHeldByCurrentThread(); + if (bufferCount > 0) { + ensureOpen(); + // we use the channel to write, since on windows, writing to the RAF might not be reflected + // when reading through the channel + final int bufferSize = bufferCount; + try { + Channels.writeToChannel(buffer, 0, bufferSize, channel); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + writtenOffset += bufferSize; + bufferCount = 0; + } } /** @@ -292,13 +312,23 @@ public class TranslogWriter extends TranslogReader { } @Override - protected void readBytes(ByteBuffer buffer, long position) throws IOException { + protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException { try (ReleasableLock lock = readLock.acquire()) { - Channels.readFromFileChannelWithEofException(channel, position, buffer); + if (position >= writtenOffset) { + assert targetBuffer.hasArray() : "buffer must have array"; + final int sourcePosition = (int) (position - writtenOffset); + System.arraycopy(buffer, sourcePosition, + targetBuffer.array(), targetBuffer.position(), targetBuffer.limit()); + targetBuffer.position(targetBuffer.limit()); + return; + } } + // we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete + // for the requested location. + Channels.readFromFileChannelWithEofException(channel, position, targetBuffer); } - protected synchronized void checkpoint(long lastSyncPosition, int operationCounter, ChannelReference channelReference) throws IOException { + private synchronized void checkpoint(long lastSyncPosition, int operationCounter, ChannelReference channelReference) throws IOException { channelReference.getChannel().force(false); writeCheckpoint(lastSyncPosition, operationCounter, channelReference.getPath().getParent(), channelReference.getGeneration(), StandardOpenOption.WRITE); } @@ -324,4 +354,19 @@ public class TranslogWriter extends TranslogReader { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy); } } + + class WrapperOutputStream extends OutputStream { + + @Override + public void write(int b) throws IOException { + buffer[bufferCount++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + // we do safety checked when we decide to use this stream... + System.arraycopy(b, off, buffer, bufferCount, len); + bufferCount += len; + } + } } diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 72b951cadd5..a72c115835c 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -58,15 +58,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponentindices.memory.translog_buffer_size is a %, to set a floor on the actual size in bytes (default: 256 KB). */ - public static final String MIN_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_translog_buffer_size"; - - /** Only applies when indices.memory.translog_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ - public static final String MAX_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_translog_buffer_size"; - /** Sets a floor on the per-shard translog buffer size (default: 2 KB). */ public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size"; @@ -88,11 +79,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent maxTranslogBuffer.bytes()) { - translogBuffer = maxTranslogBuffer; - } - } else { - translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, TRANSLOG_BUFFER_SIZE_SETTING); - } - this.translogBuffer = translogBuffer; - this.minShardTranslogBufferSize = this.settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB)); - this.maxShardTranslogBufferSize = this.settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB)); - // we need to have this relatively small to move a shard from inactive to active fast (enough) this.interval = this.settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30)); @@ -192,14 +157,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent availableShards() { List availableShards = new ArrayList<>(); @@ -220,9 +177,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent maxShardTranslogBufferSize.bytes()) { - shardTranslogBufferSize = maxShardTranslogBufferSize; - } - - logger.debug("recalculating shard indexing buffer, total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize); + logger.debug("recalculating shard indexing buffer, total is [{}] with [{}] active shards, each shard set to indexing=[{}]", indexingBuffer, activeShardCount, shardIndexingBufferSize); for (IndexShard shard : activeShards) { - updateShardBuffers(shard, shardIndexingBufferSize, shardTranslogBufferSize); + updateShardBuffers(shard, shardIndexingBufferSize); } } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java deleted file mode 100644 index b021f3252d6..00000000000 --- a/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.test.IndexSettingsModule; - -import java.nio.file.Path; - -/** - * - */ -public class BufferedTranslogTests extends TranslogTests { - - @Override - protected TranslogConfig getTranslogConfig(Path path) { - Settings build = Settings.settingsBuilder() - .put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name()) - .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES) - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); - return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); - } -} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 3173f7c5dc9..db9f212b568 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -36,6 +36,8 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -132,12 +134,12 @@ public class TranslogTests extends ESTestCase { return new Translog(getTranslogConfig(path)); } - protected TranslogConfig getTranslogConfig(Path path) { + private TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() - .put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.SIMPLE.name()) .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .build(); - return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); + ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); + return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null, bufferSize); } protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { @@ -1412,12 +1414,10 @@ public class TranslogTests extends ESTestCase { fail.set(true); try { Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); - if (config.getType() == TranslogWriter.Type.BUFFERED) { // the buffered case will fail on the add if we exceed the buffer or will fail on the flush once we sync - if (randomBoolean()) { - translog.ensureSynced(location); - } else { - translog.sync(); - } + if (randomBoolean()) { + translog.ensureSynced(location); + } else { + translog.sync(); } //TODO once we have a mock FS that can simulate we can also fail on plain sync fail("WTF"); diff --git a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java index 83c7be0374b..d980c3c598d 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -45,7 +45,6 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { final static ByteSizeValue INACTIVE = new ByteSizeValue(-1); final Map indexingBuffers = new HashMap<>(); - final Map translogBuffers = new HashMap<>(); final Map lastIndexTimeNanos = new HashMap<>(); final Set activeShards = new HashSet<>(); @@ -63,17 +62,14 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { public void deleteShard(IndexShard id) { indexingBuffers.remove(id); - translogBuffers.remove(id); } - public void assertBuffers(IndexShard id, ByteSizeValue indexing, ByteSizeValue translog) { + public void assertBuffers(IndexShard id, ByteSizeValue indexing) { assertThat(indexingBuffers.get(id), equalTo(indexing)); - assertThat(translogBuffers.get(id), equalTo(translog)); } public void assertInactive(IndexShard id) { assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); - assertThat(translogBuffers.get(id), equalTo(INACTIVE)); } @Override @@ -92,9 +88,8 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { } @Override - protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize) { indexingBuffers.put(shard, shardIndexingBufferSize); - translogBuffers.put(shard, shardTranslogBufferSize); } @Override @@ -105,7 +100,6 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { return true; } else if (currentTimeInNanos() - ns >= inactiveTime.nanos()) { indexingBuffers.put(shard, INACTIVE); - translogBuffers.put(shard, INACTIVE); activeShards.remove(shard); return true; } else { @@ -122,7 +116,6 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { if (indexingBuffers.containsKey(shard) == false) { // First time we are seeing this shard; start it off with inactive buffers as IndexShard does: indexingBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER); - translogBuffers.put(shard, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER); } activeShards.add(shard); forceCheck(); @@ -135,22 +128,21 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { IndexService test = indicesService.indexService("test"); MockController controller = new MockController(Settings.builder() - .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb").build()); IndexShard shard0 = test.getShard(0); controller.simulateIndexing(shard0); - controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB)); // translog is maxed at 64K // add another shard IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB)); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB)); // remove first shard controller.deleteShard(shard0); controller.forceCheck(); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB)); // translog is maxed at 64K // remove second shard controller.deleteShard(shard1); @@ -159,7 +151,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { // add a new one IndexShard shard2 = test.getShard(2); controller.simulateIndexing(shard2); - controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB)); // translog is maxed at 64K } public void testActiveInactive() { @@ -169,7 +161,6 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") .put(IndexShard.INDEX_SHARD_INACTIVE_TIME_SETTING, "5s") .build()); @@ -177,8 +168,8 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { controller.simulateIndexing(shard0); IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); - controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard0, new ByteSizeValue(5, ByteSizeUnit.MB)); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB)); // index into both shards, move the clock and see that they are still active controller.simulateIndexing(shard0); @@ -193,12 +184,12 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { // index into one shard only, see it becomes active controller.simulateIndexing(shard0); - controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB)); controller.assertInactive(shard1); controller.incrementTimeSec(3); // increment but not enough to become inactive controller.forceCheck(); - controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertBuffers(shard0, new ByteSizeValue(10, ByteSizeUnit.MB)); controller.assertInactive(shard1); controller.incrementTimeSec(3); // increment some more @@ -209,13 +200,12 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { // index some and shard becomes immediately active controller.simulateIndexing(shard1); controller.assertInactive(shard0); - controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB)); } public void testMinShardBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); @@ -225,7 +215,6 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { public void testMaxShardBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); @@ -235,34 +224,26 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { public void testRelativeBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") .build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); - assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } public void testMinBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") - .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); - assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } public void testMaxBufferSizes() { MockController controller = new MockController(Settings.builder() .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") - .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") - .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb").build()); assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); - assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); } protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { @@ -273,7 +254,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { controller.simulateIndexing(shard0); IndexShard shard1 = test.getShard(1); controller.simulateIndexing(shard1); - controller.assertBuffers(shard0, indexBufferSize, translogBufferSize); - controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); + controller.assertBuffers(shard0, indexBufferSize); + controller.assertBuffers(shard1, indexBufferSize); } } diff --git a/docs/reference/index-modules/translog.asciidoc b/docs/reference/index-modules/translog.asciidoc index b8ada9a55a3..de72bed7ac8 100644 --- a/docs/reference/index-modules/translog.asciidoc +++ b/docs/reference/index-modules/translog.asciidoc @@ -70,25 +70,4 @@ update, or bulk request. This setting accepts the following parameters: `fsync` and commit in the background every `sync_interval`. In the event of hardware failure, all acknowledged writes since the last automatic commit will be discarded. --- - -`index.translog.fs.type`:: -+ --- - -Whether to buffer writes to the transaction log in memory or not. This -setting accepts the following parameters: - -`buffered`:: - - (default) Translog writes first go to a 64kB buffer in memory, - and are only written to the disk when the buffer is full, or when an - `fsync` is triggered by a write request or the `sync_interval`. - -`simple`:: - - Translog writes are written to the file system immediately, without - buffering. However, these writes will only be persisted to disk when an - `fsync` and commit is triggered by a write request or the `sync_interval`. - --- +-- \ No newline at end of file diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index 83107cbfa9f..8b6775b082c 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -203,7 +203,8 @@ cluster settings please use the settings update API and set their superseded key ==== Translog settings The `index.translog.flush_threshold_ops` setting is not supported anymore. In order to control flushes based on the transaction log -growth use `index.translog.flush_threshold_size` instead. +growth use `index.translog.flush_threshold_size` instead. Changing the translog type with `index.translog.fs.type` is not supported +anymore, the `buffered` implementation is now the only available option and uses a fixed `8kb` buffer. [[breaking_30_mapping_changes]] === Mapping changes diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index f2c7d7c4772..b28842b1976 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -522,7 +522,6 @@ public abstract class ESIntegTestCase extends ESTestCase { } if (random.nextBoolean()) { - builder.put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, RandomPicks.randomFrom(random, TranslogWriter.Type.values())); if (rarely(random)) { builder.put(TranslogConfig.INDEX_TRANSLOG_SYNC_INTERVAL, 0); // 0 has special meaning to sync each op } else {