Prune only gc deletes below local checkpoint (#28790)
Once a document is deleted and Lucene is refreshed, we will not be able to look up the `version/seq#` associated with that delete in Lucene. As conflicting operations can still be indexed, we need another mechanism to remember these deletes. Therefore deletes should still be stored in the Version Map, even after Lucene is refreshed. Obviously, we can't remember all deletes forever so a trimming mechanism is needed. Currently, we remember deletes for at least 1 minute (the default GC deletes cycle) and clean them periodically. This is, at the moment, the best we can do on the primary for user facing APIs but this arbitrary time limit is problematic for replicas. Furthermore, we can't rely on the primary and replicas doing the trimming in a synchronized manner, and failing to do so results in the replica and primary making different decisions. The following scenario can cause inconsistency between primary and replica. 1. Primary index doc (index, id=1, v2) 2. Network packet issue causes index operation to back off and wait 3. Primary deletes doc (delete, id=1, v3) 4. Replica processes delete (delete, id=1, v3) 5. 1+ minute passes (GC deletes runs replica) 6. Indexing op is finally sent to the replica which no processes it because it forgot about the delete. We can reply on sequence-numbers to prevent this issue. If we prune only deletes whose seqno at most the local checkpoint, a replica will correctly remember what it needs. The correctness is explained as follows: Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica. o2 is processed normally since it arrives first; when o1 arrives it should be discarded: 1. If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added. 2. If seq#(o1) > LCP, then it depends on the nature of o2: - If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP, so a lookup can find it and determine that o1 is stale. - If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet), so a real-time lookup can find it and determine that o1 is stale. In this PR, we prefer to deploy a single trimming strategy, which satisfies both requirements, on primary and replicas because: - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted. - If a replica subsequently is promoted, user experience is fully maintained as that replica remembers deletes for the last GC cycle. However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas.
This commit is contained in:
parent
bca264699a
commit
87957603c0
|
@ -80,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -1540,15 +1541,41 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void pruneDeletedTombstones() {
|
private void pruneDeletedTombstones() {
|
||||||
|
/*
|
||||||
|
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
|
||||||
|
* are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on
|
||||||
|
* primary for user facing APIs but this arbitrary time limit is problematic for replicas. On replicas however we should
|
||||||
|
* trim only deletes whose seqno at most the local checkpoint. This requirement is explained as follows.
|
||||||
|
*
|
||||||
|
* Suppose o1 and o2 are two operations on the same document with seq#(o1) < seq#(o2), and o2 arrives before o1 on the replica.
|
||||||
|
* o2 is processed normally since it arrives first; when o1 arrives it should be discarded:
|
||||||
|
* - If seq#(o1) <= LCP, then it will be not be added to Lucene, as it was already previously added.
|
||||||
|
* - If seq#(o1) > LCP, then it depends on the nature of o2:
|
||||||
|
* *) If o2 is a delete then its seq# is recorded in the VersionMap, since seq#(o2) > seq#(o1) > LCP,
|
||||||
|
* so a lookup can find it and determine that o1 is stale.
|
||||||
|
* *) If o2 is an indexing then its seq# is either in Lucene (if refreshed) or the VersionMap (if not refreshed yet),
|
||||||
|
* so a real-time lookup can find it and determine that o1 is stale.
|
||||||
|
*
|
||||||
|
* Here we prefer to deploy a single trimming strategy, which satisfies two constraints, on both primary and replicas because:
|
||||||
|
* - It's simpler - no need to distinguish if an engine is running at primary mode or replica mode or being promoted.
|
||||||
|
* - If a replica subsequently is promoted, user experience is maintained as that replica remembers deletes for the last GC cycle.
|
||||||
|
*
|
||||||
|
* However, the version map may consume less memory if we deploy two different trimming strategies for primary and replicas.
|
||||||
|
*/
|
||||||
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
|
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
|
||||||
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
|
final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
|
||||||
|
versionMap.pruneTombstones(maxTimestampToPrune, localCheckpointTracker.getCheckpoint());
|
||||||
lastDeleteVersionPruneTimeMSec = timeMSec;
|
lastDeleteVersionPruneTimeMSec = timeMSec;
|
||||||
}
|
}
|
||||||
|
|
||||||
// testing
|
// testing
|
||||||
void clearDeletedTombstones() {
|
void clearDeletedTombstones() {
|
||||||
// clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here.
|
versionMap.pruneTombstones(Long.MAX_VALUE, localCheckpointTracker.getMaxSeqNo());
|
||||||
versionMap.pruneTombstones(Long.MAX_VALUE, 0);
|
}
|
||||||
|
|
||||||
|
// for testing
|
||||||
|
final Collection<DeleteVersionValue> getDeletedTombstones() {
|
||||||
|
return versionMap.getAllTombstones().values();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -375,21 +375,25 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canRemoveTombstone(long currentTime, long pruneInterval, DeleteVersionValue versionValue) {
|
private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) {
|
||||||
// check if the value is old enough to be removed
|
// check if the value is old enough and safe to be removed
|
||||||
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
|
final boolean isTooOld = versionValue.time < maxTimestampToPrune;
|
||||||
|
final boolean isSafeToPrune = versionValue.seqNo <= maxSeqNoToPrune;
|
||||||
// version value can't be removed it's
|
// version value can't be removed it's
|
||||||
// not yet flushed to lucene ie. it's part of this current maps object
|
// not yet flushed to lucene ie. it's part of this current maps object
|
||||||
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
|
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
|
||||||
return isTooOld && isNotTrackedByCurrentMaps;
|
return isTooOld && isSafeToPrune && isNotTrackedByCurrentMaps;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pruneTombstones(long currentTime, long pruneInterval) {
|
/**
|
||||||
|
* Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune.
|
||||||
|
*/
|
||||||
|
void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) {
|
||||||
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
|
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
|
||||||
// we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not
|
// we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not
|
||||||
// prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w
|
// prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w
|
||||||
// we won't collect the tombstone below since it must be newer than this one.
|
// we won't collect the tombstone below since it must be newer than this one.
|
||||||
if (canRemoveTombstone(currentTime, pruneInterval, entry.getValue())) {
|
if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, entry.getValue())) {
|
||||||
final BytesRef uid = entry.getKey();
|
final BytesRef uid = entry.getKey();
|
||||||
try (Releasable lock = keyedLock.tryAcquire(uid)) {
|
try (Releasable lock = keyedLock.tryAcquire(uid)) {
|
||||||
// we use tryAcquire here since this is a best effort and we try to be least disruptive
|
// we use tryAcquire here since this is a best effort and we try to be least disruptive
|
||||||
|
@ -399,7 +403,7 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta
|
||||||
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
|
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
|
||||||
final DeleteVersionValue versionValue = tombstones.get(uid);
|
final DeleteVersionValue versionValue = tombstones.get(uid);
|
||||||
if (versionValue != null) {
|
if (versionValue != null) {
|
||||||
if (canRemoveTombstone(currentTime, pruneInterval, versionValue)) {
|
if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) {
|
||||||
removeTombstoneUnderLock(uid);
|
removeTombstoneUnderLock(uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.common.Randomness;
|
import org.elasticsearch.common.Randomness;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
@ -163,6 +164,8 @@ import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTr
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.CoreMatchers.sameInstance;
|
import static org.hamcrest.CoreMatchers.sameInstance;
|
||||||
import static org.hamcrest.Matchers.contains;
|
import static org.hamcrest.Matchers.contains;
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.everyItem;
|
import static org.hamcrest.Matchers.everyItem;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
@ -173,6 +176,8 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class InternalEngineTests extends EngineTestCase {
|
public class InternalEngineTests extends EngineTestCase {
|
||||||
|
|
||||||
|
@ -4464,4 +4469,65 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception {
|
||||||
|
final AtomicLong clock = new AtomicLong(0);
|
||||||
|
threadPool = spy(threadPool);
|
||||||
|
when(threadPool.relativeTimeInMillis()).thenAnswer(invocation -> clock.get());
|
||||||
|
final EngineConfig config = engine.config();
|
||||||
|
final long gcInterval = randomIntBetween(0, 10);
|
||||||
|
final IndexSettings indexSettings = engine.config().getIndexSettings();
|
||||||
|
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
|
||||||
|
.settings(Settings.builder().put(indexSettings.getSettings())
|
||||||
|
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(gcInterval).getStringRep())).build();
|
||||||
|
indexSettings.updateIndexMetaData(indexMetaData);
|
||||||
|
|
||||||
|
try (Store store = createStore();
|
||||||
|
InternalEngine engine = createEngine(new EngineConfig(config.getShardId(), config.getAllocationId(), threadPool,
|
||||||
|
indexSettings, config.getWarmer(), store, config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
|
||||||
|
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
|
||||||
|
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(),
|
||||||
|
config.getIndexSort(), config.getTranslogRecoveryRunner(), config.getCircuitBreakerService(),
|
||||||
|
config.getGlobalCheckpointSupplier()))) {
|
||||||
|
engine.config().setEnableGcDeletes(false);
|
||||||
|
for (int i = 0, docs = scaledRandomIntBetween(0, 10); i < docs; i++) {
|
||||||
|
index(engine, i);
|
||||||
|
}
|
||||||
|
final long deleteBatch = between(10, 20);
|
||||||
|
final long gapSeqNo = randomLongBetween(
|
||||||
|
engine.getLocalCheckpointTracker().getMaxSeqNo() + 1, engine.getLocalCheckpointTracker().getMaxSeqNo() + deleteBatch);
|
||||||
|
for (int i = 0; i < deleteBatch; i++) {
|
||||||
|
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
|
||||||
|
if (seqno != gapSeqNo) {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
clock.incrementAndGet();
|
||||||
|
}
|
||||||
|
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<DeleteVersionValue> tombstones = new ArrayList<>(engine.getDeletedTombstones());
|
||||||
|
engine.config().setEnableGcDeletes(true);
|
||||||
|
// Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval.
|
||||||
|
clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval));
|
||||||
|
engine.refresh("test");
|
||||||
|
tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval);
|
||||||
|
assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
|
||||||
|
// Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno).
|
||||||
|
clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4.
|
||||||
|
engine.refresh("test");
|
||||||
|
tombstones.removeIf(v -> v.seqNo < gapSeqNo);
|
||||||
|
assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
|
||||||
|
// Fill the seqno gap - should prune all tombstones.
|
||||||
|
clock.set(between(0, 100));
|
||||||
|
if (randomBoolean()) {
|
||||||
|
engine.index(replicaIndexForDoc(testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, gapSeqNo, false));
|
||||||
|
} else {
|
||||||
|
engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), Versions.MATCH_ANY, gapSeqNo, threadPool.relativeTimeInMillis()));
|
||||||
|
}
|
||||||
|
clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4.
|
||||||
|
engine.refresh("test");
|
||||||
|
assertThat(engine.getDeletedTombstones(), empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,8 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.StreamSupport;
|
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
|
||||||
public class LiveVersionMapTests extends ESTestCase {
|
public class LiveVersionMapTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -106,7 +107,6 @@ public class LiveVersionMapTests extends ESTestCase {
|
||||||
map.afterRefresh(randomBoolean());
|
map.afterRefresh(randomBoolean());
|
||||||
assertNull(map.getUnderLock(uid("test")));
|
assertNull(map.getUnderLock(uid("test")));
|
||||||
|
|
||||||
|
|
||||||
map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
|
map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1));
|
||||||
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
|
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
|
||||||
map.beforeRefresh();
|
map.beforeRefresh();
|
||||||
|
@ -114,6 +114,8 @@ public class LiveVersionMapTests extends ESTestCase {
|
||||||
map.afterRefresh(randomBoolean());
|
map.afterRefresh(randomBoolean());
|
||||||
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
|
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
|
||||||
map.pruneTombstones(2, 0);
|
map.pruneTombstones(2, 0);
|
||||||
|
assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test")));
|
||||||
|
map.pruneTombstones(2, 1);
|
||||||
assertNull(map.getUnderLock(uid("test")));
|
assertNull(map.getUnderLock(uid("test")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,8 +136,10 @@ public class LiveVersionMapTests extends ESTestCase {
|
||||||
CountDownLatch startGun = new CountDownLatch(numThreads);
|
CountDownLatch startGun = new CountDownLatch(numThreads);
|
||||||
CountDownLatch done = new CountDownLatch(numThreads);
|
CountDownLatch done = new CountDownLatch(numThreads);
|
||||||
int randomValuesPerThread = randomIntBetween(5000, 20000);
|
int randomValuesPerThread = randomIntBetween(5000, 20000);
|
||||||
AtomicLong clock = new AtomicLong(0);
|
final AtomicLong clock = new AtomicLong(0);
|
||||||
AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
|
final AtomicLong lastPrunedTimestamp = new AtomicLong(-1);
|
||||||
|
final AtomicLong maxSeqNo = new AtomicLong();
|
||||||
|
final AtomicLong lastPrunedSeqNo = new AtomicLong();
|
||||||
for (int j = 0; j < threads.length; j++) {
|
for (int j = 0; j < threads.length; j++) {
|
||||||
threads[j] = new Thread(() -> {
|
threads[j] = new Thread(() -> {
|
||||||
startGun.countDown();
|
startGun.countDown();
|
||||||
|
@ -148,29 +152,31 @@ public class LiveVersionMapTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < randomValuesPerThread; ++i) {
|
for (int i = 0; i < randomValuesPerThread; ++i) {
|
||||||
BytesRef bytesRef = randomFrom(random(), keyList);
|
BytesRef bytesRef = randomFrom(random(), keyList);
|
||||||
final long clockTick = clock.get();
|
|
||||||
try (Releasable r = map.acquireLock(bytesRef)) {
|
try (Releasable r = map.acquireLock(bytesRef)) {
|
||||||
VersionValue versionValue = values.computeIfAbsent(bytesRef,
|
VersionValue versionValue = values.computeIfAbsent(bytesRef,
|
||||||
v -> new VersionValue(randomLong(), randomLong(), randomLong()));
|
v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong()));
|
||||||
boolean isDelete = versionValue instanceof DeleteVersionValue;
|
boolean isDelete = versionValue instanceof DeleteVersionValue;
|
||||||
if (isDelete) {
|
if (isDelete) {
|
||||||
map.removeTombstoneUnderLock(bytesRef);
|
map.removeTombstoneUnderLock(bytesRef);
|
||||||
deletes.remove(bytesRef);
|
deletes.remove(bytesRef);
|
||||||
}
|
}
|
||||||
if (isDelete == false && rarely()) {
|
if (isDelete == false && rarely()) {
|
||||||
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
|
versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(),
|
||||||
versionValue.term, clock.getAndIncrement());
|
versionValue.term, clock.getAndIncrement());
|
||||||
deletes.put(bytesRef, (DeleteVersionValue) versionValue);
|
deletes.put(bytesRef, (DeleteVersionValue) versionValue);
|
||||||
} else {
|
} else {
|
||||||
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
|
versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term);
|
||||||
}
|
}
|
||||||
values.put(bytesRef, versionValue);
|
values.put(bytesRef, versionValue);
|
||||||
map.putUnderLock(bytesRef, versionValue);
|
map.putUnderLock(bytesRef, versionValue);
|
||||||
}
|
}
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
map.pruneTombstones(clockTick, 0);
|
final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get());
|
||||||
// timestamp we pruned the deletes
|
final long clockTick = randomLongBetween(0, clock.get());
|
||||||
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev)); // make sure we track the latest
|
map.pruneTombstones(clockTick, pruneSeqNo);
|
||||||
|
// make sure we track the latest timestamp and seqno we pruned the deletes
|
||||||
|
lastPrunedTimestamp.updateAndGet(prev -> Math.max(clockTick, prev));
|
||||||
|
lastPrunedSeqNo.updateAndGet(prev -> Math.max(pruneSeqNo, prev));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -234,15 +240,17 @@ public class LiveVersionMapTests extends ESTestCase {
|
||||||
VersionValue value = map.getUnderLock(e.getKey());
|
VersionValue value = map.getUnderLock(e.getKey());
|
||||||
// here we keep track of the deletes and ensure that all deletes that are not visible anymore ie. not in the map
|
// here we keep track of the deletes and ensure that all deletes that are not visible anymore ie. not in the map
|
||||||
// have a timestamp that is smaller or equal to the maximum timestamp that we pruned on
|
// have a timestamp that is smaller or equal to the maximum timestamp that we pruned on
|
||||||
|
final DeleteVersionValue delete = e.getValue();
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
assertTrue(e.getValue().time + " > " + lastPrunedTimestamp.get(), e.getValue().time <= lastPrunedTimestamp.get());
|
assertTrue(delete.time + " > " + lastPrunedTimestamp.get() + "," + delete.seqNo + " > " + lastPrunedSeqNo.get(),
|
||||||
|
delete.time <= lastPrunedTimestamp.get() && delete.seqNo <= lastPrunedSeqNo.get());
|
||||||
} else {
|
} else {
|
||||||
assertEquals(value, e.getValue());
|
assertEquals(value, delete);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
map.pruneTombstones(clock.incrementAndGet(), 0);
|
map.pruneTombstones(clock.incrementAndGet(), maxSeqNo.get());
|
||||||
assertEquals(0, StreamSupport.stream(map.getAllTombstones().entrySet().spliterator(), false).count());
|
assertThat(map.getAllTombstones().entrySet(), empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCarryOnSafeAccess() throws IOException {
|
public void testCarryOnSafeAccess() throws IOException {
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.replication;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkItemRequest;
|
import org.elasticsearch.action.bulk.BulkItemRequest;
|
||||||
|
@ -30,11 +31,13 @@ import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkShardResponse;
|
import org.elasticsearch.action.bulk.BulkShardResponse;
|
||||||
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
import org.elasticsearch.action.bulk.TransportShardBulkAction;
|
||||||
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
|
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.resync.ResyncReplicationRequest;
|
import org.elasticsearch.action.resync.ResyncReplicationRequest;
|
||||||
import org.elasticsearch.action.resync.ResyncReplicationResponse;
|
import org.elasticsearch.action.resync.ResyncReplicationResponse;
|
||||||
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
|
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||||
|
@ -617,6 +620,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <Request extends ReplicatedWriteRequest & DocWriteRequest>
|
||||||
|
BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception {
|
||||||
|
final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(),
|
||||||
|
new BulkItemRequest[]{new BulkItemRequest(0, request)});
|
||||||
|
return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest();
|
||||||
|
}
|
||||||
|
|
||||||
private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception {
|
private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception {
|
||||||
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
|
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
|
||||||
replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request);
|
replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request);
|
||||||
|
@ -631,13 +641,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||||
* indexes the given requests on the supplied primary, modifying it for replicas
|
* indexes the given requests on the supplied primary, modifying it for replicas
|
||||||
*/
|
*/
|
||||||
BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
|
BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
|
||||||
final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request);
|
return executeReplicationRequestOnPrimary(primary, request);
|
||||||
BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1];
|
}
|
||||||
bulkItemRequests[0] = bulkItemRequest;
|
|
||||||
final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests);
|
/**
|
||||||
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
|
* Executes the delete request on the primary, and modifies it for replicas.
|
||||||
executeShardBulkOnPrimary(primary, bulkShardRequest);
|
*/
|
||||||
return result.replicaRequest();
|
BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
|
||||||
|
return executeReplicationRequestOnPrimary(primary, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -647,6 +658,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
||||||
executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
|
executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes the delete request on the given replica shard.
|
||||||
|
*/
|
||||||
|
void deleteOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
|
||||||
|
executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
|
||||||
|
}
|
||||||
|
|
||||||
class GlobalCheckpointSync extends ReplicationAction<
|
class GlobalCheckpointSync extends ReplicationAction<
|
||||||
GlobalCheckpointSyncAction.Request,
|
GlobalCheckpointSyncAction.Request,
|
||||||
GlobalCheckpointSyncAction.Request,
|
GlobalCheckpointSyncAction.Request,
|
||||||
|
|
|
@ -26,9 +26,14 @@ import org.apache.lucene.search.TopDocs;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.engine.EngineFactory;
|
import org.elasticsearch.index.engine.EngineFactory;
|
||||||
|
@ -43,6 +48,8 @@ import org.elasticsearch.index.shard.IndexShardTests;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.translog.Translog;
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||||
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -52,13 +59,13 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
|
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.anyOf;
|
import static org.hamcrest.Matchers.anyOf;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -368,6 +375,50 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test ensures the consistency between primary and replica with late and out of order delivery on the replica.
|
||||||
|
* An index operation on the primary is followed by a delete operation. The delete operation is delivered first
|
||||||
|
* and processed on the replica but the index is delayed with an interval that is even longer the gc deletes cycle.
|
||||||
|
* This makes sure that that replica still remembers the delete operation and correctly ignores the stale index operation.
|
||||||
|
*/
|
||||||
|
public void testLateDeliveryAfterGCTriggeredOnReplica() throws Exception {
|
||||||
|
ThreadPool.terminate(this.threadPool, 10, TimeUnit.SECONDS);
|
||||||
|
this.threadPool = new TestThreadPool(getClass().getName(),
|
||||||
|
Settings.builder().put(threadPoolSettings()).put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0).build());
|
||||||
|
|
||||||
|
try (ReplicationGroup shards = createGroup(1)) {
|
||||||
|
shards.startAll();
|
||||||
|
final IndexShard primary = shards.getPrimary();
|
||||||
|
final IndexShard replica = shards.getReplicas().get(0);
|
||||||
|
final TimeValue gcInterval = TimeValue.timeValueMillis(between(1, 10));
|
||||||
|
// I think we can just set this to something very small (10ms?) and also set ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING to 0?
|
||||||
|
|
||||||
|
updateGCDeleteCycle(replica, gcInterval);
|
||||||
|
final BulkShardRequest indexRequest = indexOnPrimary(
|
||||||
|
new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON), primary);
|
||||||
|
final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", "d1"), primary);
|
||||||
|
deleteOnReplica(deleteRequest, shards, replica); // delete arrives on replica first.
|
||||||
|
final long deleteTimestamp = threadPool.relativeTimeInMillis();
|
||||||
|
replica.refresh("test");
|
||||||
|
assertBusy(() ->
|
||||||
|
assertThat(threadPool.relativeTimeInMillis() - deleteTimestamp, greaterThan(gcInterval.millis()))
|
||||||
|
);
|
||||||
|
getEngine(replica).maybePruneDeletes();
|
||||||
|
indexOnReplica(indexRequest, shards, replica); // index arrives on replica lately.
|
||||||
|
shards.assertAllEqual(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) {
|
||||||
|
IndexMetaData.Builder builder = IndexMetaData.builder(shard.indexSettings().getIndexMetaData());
|
||||||
|
builder.settings(Settings.builder()
|
||||||
|
.put(shard.indexSettings().getSettings())
|
||||||
|
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), interval.getStringRep())
|
||||||
|
);
|
||||||
|
shard.indexSettings().updateIndexMetaData(builder.build());
|
||||||
|
shard.onSettingsChanged();
|
||||||
|
}
|
||||||
|
|
||||||
/** Throws <code>documentFailure</code> on every indexing operation */
|
/** Throws <code>documentFailure</code> on every indexing operation */
|
||||||
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
|
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
|
||||||
final String documentFailureMessage;
|
final String documentFailureMessage;
|
||||||
|
|
|
@ -485,4 +485,8 @@ public abstract class EngineTestCase extends ESTestCase {
|
||||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
|
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) {
|
||||||
|
return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL,
|
||||||
|
Engine.Operation.Origin.REPLICA, startTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue