diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 338634d977e..a5e5ecd8aa6 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -69,6 +69,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -81,6 +82,8 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -168,33 +171,43 @@ public class IndexShardIT extends ESSingleNodeTestCase { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); IndexShard shard = test.getShardOrNull(0); - shard.checkIdle(Long.MIN_VALUE); + Translog translog = ShardUtilsTests.getShardEngine(shard).getTranslog(); + Predicate needsSync = (tlog) -> { + // we can't use tlog.needsSync() here since it also takes the global checkpoint into account + // we explicitly want to check here if our durability checks are taken into account so we only + // check if we are synced upto the current write location + Translog.Location lastWriteLocation = tlog.getLastWriteLocation(); + try { + // the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one + return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; setDurability(shard, Translog.Durability.REQUEST); - assertBusy(() -> assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded())); + assertFalse(needsSync.test(translog)); setDurability(shard, Translog.Durability.ASYNC); client().prepareIndex("test", "bar", "2").setSource("{}", XContentType.JSON).get(); - assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + assertTrue(needsSync.test(translog)); setDurability(shard, Translog.Durability.REQUEST); client().prepareDelete("test", "bar", "1").get(); - shard.checkIdle(Long.MIN_VALUE); - assertBusy(() -> assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded())); + assertFalse(needsSync.test(translog)); setDurability(shard, Translog.Durability.ASYNC); client().prepareDelete("test", "bar", "2").get(); - assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + assertTrue(translog.syncNeeded()); setDurability(shard, Translog.Durability.REQUEST); assertNoFailures(client().prepareBulk() .add(client().prepareIndex("test", "bar", "3").setSource("{}", XContentType.JSON)) .add(client().prepareDelete("test", "bar", "1")).get()); - shard.checkIdle(Long.MIN_VALUE); - assertBusy(() -> assertFalse(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded())); + assertFalse(needsSync.test(translog)); setDurability(shard, Translog.Durability.ASYNC); assertNoFailures(client().prepareBulk() .add(client().prepareIndex("test", "bar", "4").setSource("{}", XContentType.JSON)) .add(client().prepareDelete("test", "bar", "3")).get()); setDurability(shard, Translog.Durability.REQUEST); - assertTrue(ShardUtilsTests.getShardEngine(shard).getTranslog().syncNeeded()); + assertTrue(needsSync.test(translog)); } private void setDurability(IndexShard shard, Translog.Durability durability) {