From c66bae39c34faf4a2228f332a8a3650d91dcbfeb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 23 Aug 2019 11:09:49 -0400 Subject: [PATCH] Update translog checkpoint after marking ops as persisted (#45634) If two translog syncs happen concurrently, then one can return before its operations are marked as persisted. In general, this should not be an issue; however, peer recoveries currently rely on this assumption. Closes #29161 --- .../elasticsearch/index/shard/IndexShard.java | 8 ++- .../index/translog/Translog.java | 6 +- .../index/translog/TranslogWriter.java | 13 ++-- .../index/translog/TranslogTests.java | 64 +++++++++++++++++++ 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ed73aca3acd..5e7009c2973 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2370,9 +2370,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { assert shardRouting.primary() && shardRouting.isRelocationTarget() : "only primary relocation target can update allocation IDs from primary context: " + shardRouting; - assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) && - getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()) - .getLocalCheckpoint() || indexSettings().getTranslogDurability() == Translog.Durability.ASYNC; + assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) : + "primary context [" + primaryContext + "] does not contain relocation target [" + routingEntry() + "]"; + assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()) + .getLocalCheckpoint() || indexSettings().getTranslogDurability() == Translog.Durability.ASYNC : + "local checkpoint [" + getLocalCheckpoint() + "] does not match checkpoint from primary context [" + primaryContext + "]"; synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 02117e8a8ca..5f7f834c5e2 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -599,8 +599,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * @return the last synced checkpoint */ public long getLastSyncedGlobalCheckpoint() { + return getLastSyncedCheckpoint().globalCheckpoint; + } + + final Checkpoint getLastSyncedCheckpoint() { try (ReleasableLock ignored = readLock.acquire()) { - return current.getLastSyncedCheckpoint().globalCheckpoint; + return current.getLastSyncedCheckpoint(); } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 0695a2bf650..e2240977c94 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -350,15 +350,14 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * * @return true if this call caused an actual sync operation */ - public boolean syncUpTo(long offset) throws IOException { - boolean synced = false; + final boolean syncUpTo(long offset) throws IOException { if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { - LongArrayList flushedSequenceNumbers = null; synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { // double checked locking - we don't want to fsync unless we have to and now that we have // the lock we should check again since if this code is busy we might have fsynced enough already final Checkpoint checkpointToSync; + final LongArrayList flushedSequenceNumbers; synchronized (this) { ensureOpen(); try { @@ -380,17 +379,15 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { closeWithTragicEvent(ex); throw ex; } + flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept); assert lastSyncedCheckpoint.offset <= checkpointToSync.offset : "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset; lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock - synced = true; + return true; } } - if (flushedSequenceNumbers != null) { - flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept); - } } - return synced; + return false; } @Override diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index bb3804d3aa4..32c64483c84 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -110,6 +110,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -124,6 +125,7 @@ import java.util.stream.Stream; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; @@ -3272,4 +3274,66 @@ public class TranslogTests extends ESTestCase { } } } + + public void testSyncConcurrently() throws Exception { + Path path = createTempDir("translog"); + TranslogConfig config = getTranslogConfig(path); + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + Set persistedSeqNos = ConcurrentCollections.newConcurrentSet(); + AtomicLong lastGlobalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + LongSupplier globalCheckpointSupplier = () -> { + if (randomBoolean()) { + return lastGlobalCheckpoint.addAndGet(randomIntBetween(1, 100)); + } else { + return lastGlobalCheckpoint.get(); + } + }; + try (Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + globalCheckpointSupplier, primaryTerm::get, persistedSeqNos::add)) { + Thread[] threads = new Thread[between(2, 8)]; + Phaser phaser = new Phaser(threads.length); + AtomicLong nextSeqNo = new AtomicLong(); + for (int t = 0; t < threads.length; t++) { + threads[t] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + int iterations = randomIntBetween(10, 100); + for (int i = 0; i < iterations; i++) { + List ops = IntStream.range(0, between(1, 10)) + .mapToObj(n -> new Translog.Index("test", "1", nextSeqNo.incrementAndGet(), primaryTerm.get(), new byte[]{1})) + .collect(Collectors.toList()); + try { + Translog.Location location = null; + for (Translog.Operation op : ops) { + location = translog.add(op); + } + assertNotNull(location); + long globalCheckpoint = lastGlobalCheckpoint.get(); + if (randomBoolean()) { + translog.ensureSynced(location); + } else { + translog.sync(); + } + for (Translog.Operation op : ops) { + assertThat("seq# " + op.seqNo() + " was not marked as persisted", persistedSeqNos, hasItem(op.seqNo())); + } + Checkpoint checkpoint = translog.getLastSyncedCheckpoint(); + assertThat(checkpoint.offset, greaterThanOrEqualTo(location.translogLocation)); + assertThat(checkpoint.globalCheckpoint, greaterThanOrEqualTo(globalCheckpoint)); + for (Translog.Operation op : ops) { + assertThat(checkpoint.minSeqNo, lessThanOrEqualTo(op.seqNo())); + assertThat(checkpoint.maxSeqNo, greaterThanOrEqualTo(op.seqNo())); + } + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + threads[t].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + } }