Update translog checkpoint after marking ops as persisted (#45634)

If two translog syncs happen concurrently, then one can return before
its operations are marked as persisted. In general, this should not be
an issue; however, peer recoveries currently rely on this assumption.

Closes #29161
This commit is contained in:
Nhat Nguyen 2019-08-23 11:09:49 -04:00
parent f2e8b17696
commit c66bae39c3
4 changed files with 79 additions and 12 deletions

View File

@ -2370,9 +2370,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
assert shardRouting.primary() && shardRouting.isRelocationTarget() :
"only primary relocation target can update allocation IDs from primary context: " + shardRouting;
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId())
.getLocalCheckpoint() || indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) :
"primary context [" + primaryContext + "] does not contain relocation target [" + routingEntry() + "]";
assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId())
.getLocalCheckpoint() || indexSettings().getTranslogDurability() == Translog.Durability.ASYNC :
"local checkpoint [" + getLocalCheckpoint() + "] does not match checkpoint from primary context [" + primaryContext + "]";
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
}

View File

@ -599,8 +599,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* @return the last synced checkpoint
*/
public long getLastSyncedGlobalCheckpoint() {
return getLastSyncedCheckpoint().globalCheckpoint;
}
final Checkpoint getLastSyncedCheckpoint() {
try (ReleasableLock ignored = readLock.acquire()) {
return current.getLastSyncedCheckpoint().globalCheckpoint;
return current.getLastSyncedCheckpoint();
}
}

View File

@ -350,15 +350,14 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
*
* @return <code>true</code> if this call caused an actual sync operation
*/
public boolean syncUpTo(long offset) throws IOException {
boolean synced = false;
final boolean syncUpTo(long offset) throws IOException {
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
LongArrayList flushedSequenceNumbers = null;
synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
// double checked locking - we don't want to fsync unless we have to and now that we have
// the lock we should check again since if this code is busy we might have fsynced enough already
final Checkpoint checkpointToSync;
final LongArrayList flushedSequenceNumbers;
synchronized (this) {
ensureOpen();
try {
@ -380,17 +379,15 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
closeWithTragicEvent(ex);
throw ex;
}
flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept);
assert lastSyncedCheckpoint.offset <= checkpointToSync.offset :
"illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock
synced = true;
return true;
}
}
if (flushedSequenceNumbers != null) {
flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept);
}
}
return synced;
return false;
}
@Override

View File

@ -110,6 +110,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -124,6 +125,7 @@ import java.util.stream.Stream;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
@ -3272,4 +3274,66 @@ public class TranslogTests extends ESTestCase {
}
}
}
public void testSyncConcurrently() throws Exception {
Path path = createTempDir("translog");
TranslogConfig config = getTranslogConfig(path);
String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
Set<Long> persistedSeqNos = ConcurrentCollections.newConcurrentSet();
AtomicLong lastGlobalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
LongSupplier globalCheckpointSupplier = () -> {
if (randomBoolean()) {
return lastGlobalCheckpoint.addAndGet(randomIntBetween(1, 100));
} else {
return lastGlobalCheckpoint.get();
}
};
try (Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()),
globalCheckpointSupplier, primaryTerm::get, persistedSeqNos::add)) {
Thread[] threads = new Thread[between(2, 8)];
Phaser phaser = new Phaser(threads.length);
AtomicLong nextSeqNo = new AtomicLong();
for (int t = 0; t < threads.length; t++) {
threads[t] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
int iterations = randomIntBetween(10, 100);
for (int i = 0; i < iterations; i++) {
List<Translog.Operation> ops = IntStream.range(0, between(1, 10))
.mapToObj(n -> new Translog.Index("test", "1", nextSeqNo.incrementAndGet(), primaryTerm.get(), new byte[]{1}))
.collect(Collectors.toList());
try {
Translog.Location location = null;
for (Translog.Operation op : ops) {
location = translog.add(op);
}
assertNotNull(location);
long globalCheckpoint = lastGlobalCheckpoint.get();
if (randomBoolean()) {
translog.ensureSynced(location);
} else {
translog.sync();
}
for (Translog.Operation op : ops) {
assertThat("seq# " + op.seqNo() + " was not marked as persisted", persistedSeqNos, hasItem(op.seqNo()));
}
Checkpoint checkpoint = translog.getLastSyncedCheckpoint();
assertThat(checkpoint.offset, greaterThanOrEqualTo(location.translogLocation));
assertThat(checkpoint.globalCheckpoint, greaterThanOrEqualTo(globalCheckpoint));
for (Translog.Operation op : ops) {
assertThat(checkpoint.minSeqNo, lessThanOrEqualTo(op.seqNo()));
assertThat(checkpoint.maxSeqNo, greaterThanOrEqualTo(op.seqNo()));
}
} catch (Exception e) {
throw new AssertionError(e);
}
}
});
threads[t].start();
}
for (Thread thread : threads) {
thread.join();
}
}
}
}