diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index cc5c4799479..70cbebb774e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -80,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1540,15 +1541,41 @@ public class InternalEngine extends Engine { } private void pruneDeletedTombstones() { + /* + * We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary + * are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on + * primary for user facing APIs but this arbitrary time limit is problematic for replicas. On replicas however we should + * trim only deletes whose seqno at most the local checkpoint. This requirement is explained as follows. + * + * Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica. + * o2 is processed normally since it arrives first; when o1 arrives it should be discarded: + * - If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added. + * - If seq#(o1) > LCP, then it depends on the nature of o2: + * *) If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP, + * so a lookup can find it and determine that o1 is stale. + * *) If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet), + * so a real-time lookup can find it and determine that o1 is stale. + * + * Here we prefer to deploy a single trimming strategy, which satisfies two constraints, on both primary and replicas because: + * - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted. + * - If a replica subsequently is promoted, user experience is maintained as that replica remembers deletes for the last GC cycle. + * + * However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas. + */ final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); - versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis()); + final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis(); + versionMap.pruneTombstones(maxTimestampToPrune, localCheckpointTracker.getCheckpoint()); lastDeleteVersionPruneTimeMSec = timeMSec; } // testing void clearDeletedTombstones() { - // clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here. - versionMap.pruneTombstones(Long.MAX_VALUE, 0); + versionMap.pruneTombstones(Long.MAX_VALUE, localCheckpointTracker.getMaxSeqNo()); + } + + // for testing + final Collection getDeletedTombstones() { + return versionMap.getAllTombstones().values(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index fc62f1fb32e..7c5dcfa5c90 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -375,21 +375,25 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta } } - private boolean canRemoveTombstone(long currentTime, long pruneInterval, DeleteVersionValue versionValue) { - // check if the value is old enough to be removed - final boolean isTooOld = currentTime - versionValue.time > pruneInterval; + private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) { + // check if the value is old enough and safe to be removed + final boolean isTooOld = versionValue.time < maxTimestampToPrune; + final boolean isSafeToPrune = versionValue.seqNo <= maxSeqNoToPrune; // version value can't be removed it's // not yet flushed to lucene ie. it's part of this current maps object final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp(); - return isTooOld && isNotTrackedByCurrentMaps; + return isTooOld && isSafeToPrune && isNotTrackedByCurrentMaps; } - void pruneTombstones(long currentTime, long pruneInterval) { + /** + * Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune. + */ + void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { for (Map.Entry entry : tombstones.entrySet()) { // we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not // prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w // we won't collect the tombstone below since it must be newer than this one. - if (canRemoveTombstone(currentTime, pruneInterval, entry.getValue())) { + if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, entry.getValue())) { final BytesRef uid = entry.getKey(); try (Releasable lock = keyedLock.tryAcquire(uid)) { // we use tryAcquire here since this is a best effort and we try to be least disruptive @@ -399,7 +403,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: final DeleteVersionValue versionValue = tombstones.get(uid); if (versionValue != null) { - if (canRemoveTombstone(currentTime, pruneInterval, versionValue)) { + if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) { removeTombstoneUnderLock(uid); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c48a9ee8a2d..b0d701bdfda 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -76,6 +76,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -163,6 +164,8 @@ import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTr import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; @@ -173,6 +176,8 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class InternalEngineTests extends EngineTestCase { @@ -4464,4 +4469,65 @@ public class InternalEngineTests extends EngineTestCase { } } } + + public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { + final AtomicLong clock = new AtomicLong(0); + threadPool = spy(threadPool); + when(threadPool.relativeTimeInMillis()).thenAnswer(invocation -> clock.get()); + final EngineConfig config = engine.config(); + final long gcInterval = randomIntBetween(0, 10); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(gcInterval).getStringRep())).build(); + indexSettings.updateIndexMetaData(indexMetaData); + + try (Store store = createStore(); + InternalEngine engine = createEngine(new EngineConfig(config.getShardId(), config.getAllocationId(), threadPool, + indexSettings, config.getWarmer(), store, config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), + config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), + config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier()))) { + engine.config().setEnableGcDeletes(false); + for (int i = 0, docs = scaledRandomIntBetween(0, 10); i < docs; i++) { + index(engine, i); + } + final long deleteBatch = between(10, 20); + final long gapSeqNo = randomLongBetween( + engine.getLocalCheckpointTracker().getMaxSeqNo() + 1, engine.getLocalCheckpointTracker().getMaxSeqNo() + deleteBatch); + for (int i = 0; i < deleteBatch; i++) { + final long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + if (seqno != gapSeqNo) { + if (randomBoolean()) { + clock.incrementAndGet(); + } + engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis())); + } + } + List tombstones = new ArrayList<>(engine.getDeletedTombstones()); + engine.config().setEnableGcDeletes(true); + // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval. + clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval)); + engine.refresh("test"); + tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval); + assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno). + clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4. + engine.refresh("test"); + tombstones.removeIf(v -> v.seqNo < gapSeqNo); + assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + // Fill the seqno gap - should prune all tombstones. + clock.set(between(0, 100)); + if (randomBoolean()) { + engine.index(replicaIndexForDoc(testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, gapSeqNo, false)); + } else { + engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), Versions.MATCH_ANY, gapSeqNo, threadPool.relativeTimeInMillis())); + } + clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4. + engine.refresh("test"); + assertThat(engine.getDeletedTombstones(), empty()); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index 8c5973e8750..ce3ddff00da 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -37,7 +37,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.empty; public class LiveVersionMapTests extends ESTestCase { @@ -106,7 +107,6 @@ public class LiveVersionMapTests extends ESTestCase { map.afterRefresh(randomBoolean()); assertNull(map.getUnderLock(uid("test"))); - map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1)); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); map.beforeRefresh(); @@ -114,6 +114,8 @@ public class LiveVersionMapTests extends ESTestCase { map.afterRefresh(randomBoolean()); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); map.pruneTombstones(2, 0); + assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); + map.pruneTombstones(2, 1); assertNull(map.getUnderLock(uid("test"))); } } @@ -134,8 +136,10 @@ public class LiveVersionMapTests extends ESTestCase { CountDownLatch startGun = new CountDownLatch(numThreads); CountDownLatch done = new CountDownLatch(numThreads); int randomValuesPerThread = randomIntBetween(5000, 20000); - AtomicLong clock = new AtomicLong(0); - AtomicLong lastPrunedTimestamp = new AtomicLong(-1); + final AtomicLong clock = new AtomicLong(0); + final AtomicLong lastPrunedTimestamp = new AtomicLong(-1); + final AtomicLong maxSeqNo = new AtomicLong(); + final AtomicLong lastPrunedSeqNo = new AtomicLong(); for (int j = 0; j < threads.length; j++) { threads[j] = new Thread(() -> { startGun.countDown(); @@ -148,29 +152,31 @@ public class LiveVersionMapTests extends ESTestCase { try { for (int i = 0; i < randomValuesPerThread; ++i) { BytesRef bytesRef = randomFrom(random(), keyList); - final long clockTick = clock.get(); try (Releasable r = map.acquireLock(bytesRef)) { VersionValue versionValue = values.computeIfAbsent(bytesRef, - v -> new VersionValue(randomLong(), randomLong(), randomLong())); + v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong())); boolean isDelete = versionValue instanceof DeleteVersionValue; if (isDelete) { map.removeTombstoneUnderLock(bytesRef); deletes.remove(bytesRef); } if (isDelete == false && rarely()) { - versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1, + versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term, clock.getAndIncrement()); deletes.put(bytesRef, (DeleteVersionValue) versionValue); } else { - versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term); + versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term); } values.put(bytesRef, versionValue); map.putUnderLock(bytesRef, versionValue); } if (rarely()) { - map.pruneTombstones(clockTick, 0); - // timestamp we pruned the deletes - lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev)); // make sure we track the latest + final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get()); + final long clockTick = randomLongBetween(0, clock.get()); + map.pruneTombstones(clockTick, pruneSeqNo); + // make sure we track the latest timestamp and seqno we pruned the deletes + lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev)); + lastPrunedSeqNo.updateAndGet(prev -> Math.max(pruneSeqNo, prev)); } } } finally { @@ -234,15 +240,17 @@ public class LiveVersionMapTests extends ESTestCase { VersionValue value = map.getUnderLock(e.getKey()); // here we keep track of the deletes and ensure that all deletes that are not visible anymore ie. not in the map // have a timestamp that is smaller or equal to the maximum timestamp that we pruned on + final DeleteVersionValue delete = e.getValue(); if (value == null) { - assertTrue(e.getValue().time + " > " + lastPrunedTimestamp.get(), e.getValue().time <= lastPrunedTimestamp.get()); + assertTrue(delete.time + " > " + lastPrunedTimestamp.get() + "," + delete.seqNo + " > " + lastPrunedSeqNo.get(), + delete.time <= lastPrunedTimestamp.get() && delete.seqNo <= lastPrunedSeqNo.get()); } else { - assertEquals(value, e.getValue()); + assertEquals(value, delete); } } }); - map.pruneTombstones(clock.incrementAndGet(), 0); - assertEquals(0, StreamSupport.stream(map.getAllTombstones().entrySet().spliterator(), false).count()); + map.pruneTombstones(clock.incrementAndGet(), maxSeqNo.get()); + assertThat(map.getAllTombstones().entrySet(), empty()); } public void testCarryOnSafeAccess() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index ad046dddc0c..ba5b43b1d92 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.replication; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; @@ -30,11 +31,13 @@ import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -617,6 +620,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase return result; } + private + BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception { + final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), + new BulkItemRequest[]{new BulkItemRequest(0, request)}); + return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest(); + } + private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception { final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request); @@ -631,13 +641,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase * indexes the given requests on the supplied primary, modifying it for replicas */ BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { - final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request); - BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1]; - bulkItemRequests[0] = bulkItemRequest; - final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests); - final TransportWriteAction.WritePrimaryResult result = - executeShardBulkOnPrimary(primary, bulkShardRequest); - return result.replicaRequest(); + return executeReplicationRequestOnPrimary(primary, request); + } + + /** + * Executes the delete request on the primary, and modifies it for replicas. + */ + BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) throws Exception { + return executeReplicationRequestOnPrimary(primary, request); } /** @@ -647,6 +658,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint()); } + /** + * Executes the delete request on the given replica shard. + */ + void deleteOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception { + executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint()); + } + class GlobalCheckpointSync extends ReplicationAction< GlobalCheckpointSyncAction.Request, GlobalCheckpointSyncAction.Request, diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 86436d8d88a..4762c23319a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -26,9 +26,14 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; @@ -43,6 +48,8 @@ import org.elasticsearch.index.shard.IndexShardTests; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; import java.io.IOException; @@ -52,13 +59,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -368,6 +375,50 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase } } + /** + * This test ensures the consistency between primary and replica with late and out of order delivery on the replica. + * An index operation on the primary is followed by a delete operation. The delete operation is delivered first + * and processed on the replica but the index is delayed with an interval that is even longer the gc deletes cycle. + * This makes sure that that replica still remembers the delete operation and correctly ignores the stale index operation. + */ + public void testLateDeliveryAfterGCTriggeredOnReplica() throws Exception { + ThreadPool.terminate(this.threadPool, 10, TimeUnit.SECONDS); + this.threadPool = new TestThreadPool(getClass().getName(), + Settings.builder().put(threadPoolSettings()).put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0).build()); + + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + final TimeValue gcInterval = TimeValue.timeValueMillis(between(1, 10)); + // I think we can just set this to something very small (10ms?) and also set ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING to 0? + + updateGCDeleteCycle(replica, gcInterval); + final BulkShardRequest indexRequest = indexOnPrimary( + new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON), primary); + final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", "d1"), primary); + deleteOnReplica(deleteRequest, shards, replica); // delete arrives on replica first. + final long deleteTimestamp = threadPool.relativeTimeInMillis(); + replica.refresh("test"); + assertBusy(() -> + assertThat(threadPool.relativeTimeInMillis() - deleteTimestamp, greaterThan(gcInterval.millis())) + ); + getEngine(replica).maybePruneDeletes(); + indexOnReplica(indexRequest, shards, replica); // index arrives on replica lately. + shards.assertAllEqual(0); + } + } + + private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) { + IndexMetaData.Builder builder = IndexMetaData.builder(shard.indexSettings().getIndexMetaData()); + builder.settings(Settings.builder() + .put(shard.indexSettings().getSettings()) + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), interval.getStringRep()) + ); + shard.indexSettings().updateIndexMetaData(builder.build()); + shard.onSettingsChanged(); + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f0e46cf0223..8a9ad3d2a76 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -485,4 +485,8 @@ public abstract class EngineTestCase extends ESTestCase { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } + protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) { + return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, startTime); + } }