Fix IndexShardIT#testDurableFlagHasEffect to only test if operations have been synced

With global checkpoints we also take into account if a global checkpoint must be fsynced.
Yet, with recent addition of inlining global checkpoints into indexing operations from a
test perspective unnecessary fsyncs might be reported if `Translog#syncNeeded` is checked.
Now the test only check if the last write location triggers an fsync instead.

Closes #24600
This commit is contained in:
Simon Willnauer 2017-05-11 15:09:26 +02:00
parent 840da4aebf
commit 64d0d9184d
1 changed files with 22 additions and 9 deletions

View File

@ -69,6 +69,7 @@ import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalSettingsPlugin;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
@ -81,6 +82,8 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -168,33 +171,43 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("test")); IndexService test = indicesService.indexService(resolveIndex("test"));
IndexShard shard = test.getShardOrNull(0); IndexShard shard = test.getShardOrNull(0);
shard.checkIdle(Long.MIN_VALUE); Translog translog = ShardUtilsTests.getShardEngine(shard).getTranslog();
Predicate<Translog> needsSync = (tlog) -> {
// we can't use tlog.needsSync() here since it also takes the global checkpoint into account
// we explicitly want to check here if our durability checks are taken into account so we only
// check if we are synced upto the current write location
Translog.Location lastWriteLocation = tlog.getLastWriteLocation();
try {
// the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one
return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
setDurability(shard, Translog.Durability.REQUEST); setDurability(shard, Translog.Durability.REQUEST);
assertBusy(() -> assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded())); assertFalse(needsSync.test(translog));
setDurability(shard, Translog.Durability.ASYNC); setDurability(shard, Translog.Durability.ASYNC);
client().prepareIndex("test", "bar", "2").setSource("{}", XContentType.JSON).get(); client().prepareIndex("test", "bar", "2").setSource("{}", XContentType.JSON).get();
assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); assertTrue(needsSync.test(translog));
setDurability(shard, Translog.Durability.REQUEST); setDurability(shard, Translog.Durability.REQUEST);
client().prepareDelete("test", "bar", "1").get(); client().prepareDelete("test", "bar", "1").get();
shard.checkIdle(Long.MIN_VALUE); assertFalse(needsSync.test(translog));
assertBusy(() -> assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()));
setDurability(shard, Translog.Durability.ASYNC); setDurability(shard, Translog.Durability.ASYNC);
client().prepareDelete("test", "bar", "2").get(); client().prepareDelete("test", "bar", "2").get();
assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); assertTrue(translog.syncNeeded());
setDurability(shard, Translog.Durability.REQUEST); setDurability(shard, Translog.Durability.REQUEST);
assertNoFailures(client().prepareBulk() assertNoFailures(client().prepareBulk()
.add(client().prepareIndex("test", "bar", "3").setSource("{}", XContentType.JSON)) .add(client().prepareIndex("test", "bar", "3").setSource("{}", XContentType.JSON))
.add(client().prepareDelete("test", "bar", "1")).get()); .add(client().prepareDelete("test", "bar", "1")).get());
shard.checkIdle(Long.MIN_VALUE); assertFalse(needsSync.test(translog));
assertBusy(() -> assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()));
setDurability(shard, Translog.Durability.ASYNC); setDurability(shard, Translog.Durability.ASYNC);
assertNoFailures(client().prepareBulk() assertNoFailures(client().prepareBulk()
.add(client().prepareIndex("test", "bar", "4").setSource("{}", XContentType.JSON)) .add(client().prepareIndex("test", "bar", "4").setSource("{}", XContentType.JSON))
.add(client().prepareDelete("test", "bar", "3")).get()); .add(client().prepareDelete("test", "bar", "3")).get());
setDurability(shard, Translog.Durability.REQUEST); setDurability(shard, Translog.Durability.REQUEST);
assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); assertTrue(needsSync.test(translog));
} }
private void setDurability(IndexShard shard, Translog.Durability durability) { private void setDurability(IndexShard shard, Translog.Durability durability) {