Introduce translog generation rolling

This commit introduces a maximum size for a translog generation and
automatically rolls the translog when a generation exceeds the threshold
into a new generation. This threshold is configurable per index and
defaults to sixty-four megabytes. We introduce this constraint as
sequence numbers will require keeping around more than the current
generation (to ensure that we can rollback to the global
checkpoint). Without keeping the size of generations under control,
having to keep old generations around could consume excessive disk
space. A follow-up will enable commits to trim previous generations
based on the global checkpoint.

Relates #23606
This commit is contained in:
Jason Tedor 2017-03-27 16:43:54 -04:00 committed by GitHub
parent defd0452e7
commit b54a9e9c83
9 changed files with 441 additions and 113 deletions

View File

@ -22,16 +22,12 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -46,7 +42,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -302,15 +297,21 @@ public abstract class TransportWriteAction<
}
void run() {
// we either respond immediately ie. if we we don't fsync per request or wait for refresh
// OR we got an pass async operations on and wait for them to return to respond.
indexShard.maybeFlush();
maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success.
/*
* We either respond immediately (i.e., if we do not fsync per request or wait for
* refresh), or we there are past async operations and we wait for them to return to
* respond.
*/
indexShard.afterWriteOperation();
// decrement pending by one, if there is nothing else to do we just respond with success
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request);
logger.warn(
"block until refresh ran out of slots and forced a refresh: [{}]",
request);
}
refreshed.set(forcedRefresh);
maybeFinish();

View File

@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,

View File

@ -22,7 +22,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
@ -112,6 +111,16 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);
/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
*/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting(
"index.translog.generation_threshold_size",
new ByteSizeValue(64, ByteSizeUnit.MB),
new Property[]{Property.Dynamic, Property.IndexScope});
public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL =
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
@ -156,6 +165,7 @@ public final class IndexSettings {
private volatile TimeValue refreshInterval;
private final TimeValue globalCheckpointInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexScopedSettings scopedSettings;
@ -250,6 +260,7 @@ public final class IndexSettings {
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
@ -281,6 +292,9 @@ public final class IndexSettings {
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
@ -290,6 +304,10 @@ public final class IndexSettings {
this.flushThresholdSize = byteSizeValue;
}
private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}
private void setGCDeletes(TimeValue timeValue) {
this.gcDeletesInMillis = timeValue.getMillis();
}
@ -461,6 +479,19 @@ public final class IndexSettings {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }
/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
* growing too large to avoid excessive disk space consumption. Therefore, the translog is
* automatically rolled to a new generation when the current generation exceeds this generation
* threshold size.
*
* @return the generation threshold size
*/
public ByteSizeValue getGenerationThresholdSize() {
return generationThresholdSize;
}
/**
* Returns the {@link MergeSchedulerConfig}
*/

View File

@ -771,27 +771,44 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return engine.syncFlush(syncId, expectedCommitId);
}
public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
boolean waitIfOngoing = request.waitIfOngoing();
boolean force = request.force();
if (logger.isTraceEnabled()) {
/**
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return the commit ID
*/
public Engine.CommitId flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
// we use #writeIndexingBuffer for this now.
/*
* We allow flushes while recovery since we allow operations to happen while recovering and
* we want to keep the translog under control (up to deletes, which we do not GC). Yet, we
* do not use flush internally to clear deletes and flush the index writer since we use
* Engine#writeIndexingBuffer for this now.
*/
verifyNotClosed();
Engine engine = getEngine();
final Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
" from translog");
throw new IllegalIndexShardStateException(
shardId(),
state,
"flush is only allowed if the engine is not recovery from translog");
}
long time = System.nanoTime();
Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
final long time = System.nanoTime();
final Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
}
/**
* Rolls the tranlog generation.
*
* @throws IOException if any file operations on the translog throw an I/O exception
*/
private void rollTranslogGeneration() throws IOException {
final Engine engine = getEngine();
engine.getTranslog().rollGeneration();
}
public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
@ -1256,17 +1273,39 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Returns <code>true</code> iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
* Otherwise <code>false</code>.
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
boolean shouldFlush() {
Engine engine = getEngineOrNull();
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
final Translog translog = engine.getTranslog();
return translog.shouldFlush();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
}
/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
boolean shouldRollTranslogGeneration() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldRollGeneration();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
@ -1810,28 +1849,31 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return indexSettings.getTranslogDurability();
}
private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();
/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
*
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
* Schedules a flush or translog generation roll if needed but will not schedule more than one
* concurrently. The operation will be executed asynchronously on the flush thread pool.
*/
public void afterWriteOperation() {
if (shouldFlush() || shouldRollTranslogGeneration()) {
if (flushOrRollRunning.compareAndSet(false, true)) {
/*
* We have to check again since otherwise there is a race when a thread passes the
* first check next to another thread which performs the operation quickly enough to
* finish before the current thread could flip the flag. In that situation, we have
* an extra operation.
*
* Additionally, a flush implicitly executes a translog generation roll so if we
* execute a flush then we do not need to check if we should roll the translog
* generation.
*/
public boolean maybeFlush() {
if (shouldFlush()) {
if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread
if (shouldFlush() == false) {
// we have to check again since otherwise there is a race when a thread passes
// the first shouldFlush() check next to another thread which flushes fast enough
// to finish before the current thread could flip the asyncFlushRunning flag.
// in that situation we have an extra unexpected flush.
asyncFlushRunning.compareAndSet(true, false);
} else {
logger.debug("submitting async flush request");
final AbstractRunnable abstractRunnable = new AbstractRunnable() {
final AbstractRunnable flush = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
}
@ -1844,16 +1886,38 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
@Override
public void onAfter() {
asyncFlushRunning.compareAndSet(true, false);
maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
return true;
threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
} else if (shouldRollTranslogGeneration()) {
logger.debug("submitting async roll translog generation request");
final AbstractRunnable roll = new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to roll translog generation", e);
}
}
@Override
protected void doRun() throws Exception {
rollTranslogGeneration();
}
@Override
public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(roll);
} else {
flushOrRollRunning.compareAndSet(true, false);
}
}
}
return false;
}
/**

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.util.BigArrays;
@ -55,6 +56,7 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -329,7 +331,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* Returns the generation of the current transaction log.
*/
public long currentFileGeneration() {
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
return current.getGeneration();
}
}
@ -409,10 +411,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public Location add(final Operation operation) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
final long start = out.position();
out.skip(Integer.BYTES);
writeOperationNoSize(checksumStreamOutput, operation);
writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
final long end = out.position();
final int operationSize = (int) (end - Integer.BYTES - start);
out.seek(start);
@ -442,6 +443,30 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
/**
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
public boolean shouldFlush() {
final long size = this.sizeInBytes();
return size > this.indexSettings.getFlushThresholdSize().getBytes();
}
/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public boolean shouldRollGeneration() {
final long size = this.current.sizeInBytes();
final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes();
return size > threshold;
}
/**
* The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which
* can be returned by the next write.
@ -1322,44 +1347,63 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
out.writeInt((int) checksum);
}
@Override
public long prepareCommit() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration);
}
currentCommittingGeneration = current.getGeneration();
TranslogReader currentCommittingTranslog = current.closeIntoReader();
readers.add(currentCommittingTranslog);
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration()));
Files.copy(checkpoint, commitCheckpoint);
IOUtils.fsync(commitCheckpoint, false);
IOUtils.fsync(commitCheckpoint.getParent(), true);
// create a new translog file - this will sync it and update the checkpoint data;
/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
*
* @throws IOException if an I/O exception occurred during any file operations
*/
public void rollGeneration() throws IOException {
try (Releasable ignored = writeLock.acquire()) {
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
} catch (Exception e) {
} catch (final Exception e) {
IOUtils.closeWhileHandlingException(this); // tragic event
throw e;
}
return 0L;
}
}
@Override
public long prepareCommit() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
final String message = String.format(
Locale.ROOT,
"already committing a translog with generation [%d]",
currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
return 0;
}
@Override
public long commit() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
assert currentCommittingGeneration != NOT_SET_GENERATION;
assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent()
: "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]";
lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}

View File

@ -370,6 +370,27 @@ public class IndexSettingsTests extends ESTestCase {
assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize());
}
public void testTranslogGenerationSizeThreshold() {
final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt()));
final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey();
final ByteSizeValue actualValue =
ByteSizeValue.parseBytesSizeValue(size.toString(), key);
final IndexMetaData metaData =
newIndexMeta(
"index",
Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(key, size.toString())
.build());
final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY);
assertEquals(actualValue, settings.getGenerationThresholdSize());
final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt()));
final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key);
settings.updateIndexMetaData(
newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build()));
assertEquals(actual, settings.getGenerationThresholdSize());
}
public void testArchiveBrokenIndexSettings() {
Settings settings =
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings(

View File

@ -363,49 +363,104 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertEquals(0, shard.getEngine().getTranslog().totalOperations());
}
public void testStressMaybeFlush() throws Exception {
public void testMaybeRollTranslogGeneration() throws Exception {
final int generationThreshold = randomIntBetween(1, 512);
final Settings settings =
Settings
.builder()
.put("index.number_of_shards", 1)
.put("index.translog.generation_threshold_size", generationThreshold + "b")
.put()
.build();
createIndex("test", settings);
ensureGreen("test");
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
int rolls = 0;
final Translog translog = shard.getEngine().getTranslog();
final long generation = translog.currentFileGeneration();
for (int i = 0; i < randomIntBetween(32, 128); i++) {
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
final ParsedDocument doc = testParsedDocument(
"1",
"test",
null,
SequenceNumbersService.UNASSIGNED_SEQ_NO,
new ParseContext.Document(),
new BytesArray(new byte[]{1}), XContentType.JSON, null);
final Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
final Engine.IndexResult result = shard.index(index);
final Translog.Location location = result.getTranslogLocation();
shard.afterWriteOperation();
if (location.translogLocation + location.size > generationThreshold) {
// wait until the roll completes
assertBusy(() -> assertFalse(shard.shouldRollTranslogGeneration()));
rolls++;
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
}
}
}
public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test"));
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(117/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
final String key;
final boolean flush = randomBoolean();
if (flush) {
key = "index.translog.flush_threshold_size";
} else {
key = "index.translog.generation_threshold_size";
}
// size of the operation plus header and footer
final Settings settings = Settings.builder().put(key, "117b").build();
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
client().prepareIndex("test", "test", "0")
.setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE)
.get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);
final int numThreads = randomIntBetween(2, 4);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
final Thread[] threads = new Thread[numThreads];
final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
threads[i] = new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
} catch (final InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
while (running.get()) {
shard.maybeFlush();
shard.afterWriteOperation();
}
}
};
});
threads[i].start();
}
barrier.await();
FlushStats flushStats = shard.flushStats();
long total = flushStats.getTotal();
final Runnable check;
if (flush) {
final FlushStats flushStats = shard.flushStats();
final long total = flushStats.getTotal();
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal()));
check = () -> assertEquals(total + 1, shard.flushStats().getTotal());
} else {
final long generation = shard.getEngine().getTranslog().currentFileGeneration();
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
check = () -> assertEquals(
generation + 1,
shard.getEngine().getTranslog().currentFileGeneration());
}
assertBusy(check);
running.set(false);
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
assertEquals(total + 1, shard.flushStats().getTotal());
check.run();
}
public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {

View File

@ -41,16 +41,18 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
@ -100,6 +102,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -156,12 +159,25 @@ public class TranslogTests extends ESTestCase {
return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get());
}
private TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.builder()
private TranslogConfig getTranslogConfig(final Path path) {
final Settings settings = Settings
.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
return getTranslogConfig(path, settings);
}
private TranslogConfig getTranslogConfig(final Path path, final Settings settings) {
final ByteSizeValue bufferSize;
if (randomBoolean()) {
bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE;
} else {
bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
}
final IndexSettings indexSettings =
IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
}
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
@ -2073,4 +2089,93 @@ public class TranslogTests extends ESTestCase {
Translog.Delete serializedDelete = new Translog.Delete(in);
assertEquals(delete, serializedDelete);
}
public void testRollGeneration() throws IOException {
final long generation = translog.currentFileGeneration();
final int rolls = randomIntBetween(1, 16);
int totalOperations = 0;
int seqNo = 0;
for (int i = 0; i < rolls; i++) {
final int operations = randomIntBetween(1, 128);
for (int j = 0; j < operations; j++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
totalOperations++;
}
try (ReleasableLock ignored = translog.writeLock.acquire()) {
translog.rollGeneration();
}
assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1));
assertThat(translog.totalOperations(), equalTo(totalOperations));
}
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
}
translog.commit();
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1));
assertThat(translog.totalOperations(), equalTo(0));
for (int i = 0; i <= rolls; i++) {
assertFileDeleted(translog, generation + i);
}
assertFileIsPresent(translog, generation + rolls + 1);
}
public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException {
final long generation = translog.currentFileGeneration();
int seqNo = 0;
final int rollsBefore = randomIntBetween(0, 16);
for (int r = 1; r <= rollsBefore; r++) {
final int operationsBefore = randomIntBetween(1, 256);
for (int i = 0; i < operationsBefore; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
}
try (Releasable ignored = translog.writeLock.acquire()) {
translog.rollGeneration();
}
assertThat(translog.currentFileGeneration(), equalTo(generation + r));
for (int i = 0; i <= r; i++) {
assertFileIsPresent(translog, generation + r);
}
}
assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore));
translog.prepareCommit();
assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1));
for (int i = 0; i <= rollsBefore + 1; i++) {
assertFileIsPresent(translog, generation + i);
}
final int rollsBetween = randomIntBetween(0, 16);
for (int r = 1; r <= rollsBetween; r++) {
final int operationsBetween = randomIntBetween(1, 256);
for (int i = 0; i < operationsBetween; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
}
try (Releasable ignored = translog.writeLock.acquire()) {
translog.rollGeneration();
}
assertThat(
translog.currentFileGeneration(),
equalTo(generation + rollsBefore + 1 + r));
for (int i = 0; i <= rollsBefore + 1 + r; i++) {
assertFileIsPresent(translog, generation + i);
}
}
translog.commit();
for (int i = 0; i <= rollsBefore; i++) {
assertFileDeleted(translog, generation + i);
}
for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) {
assertFileIsPresent(translog, generation + i);
}
}
}

View File

@ -93,7 +93,11 @@ public class UpdateSettingsIT extends ESIntegTestCase {
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.refresh_interval", -1).put("index.translog.flush_threshold_size", "1024b"))
.setSettings(
Settings.builder()
.put("index.refresh_interval", -1)
.put("index.translog.flush_threshold_size", "1024b")
.put("index.translog.generation_threshold_size", "4096b"))
.execute()
.actionGet();
IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
@ -103,6 +107,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
if (indexService != null) {
assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), -1);
assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024);
assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096);
}
}
client()
@ -119,6 +124,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
if (indexService != null) {
assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), 1000);
assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024);
assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096);
}
}
}