testConcurrentWriteViewsAndSnapshot: writers should expose the local checkpoint to readers before trimming the translog

This commit is contained in:
Boaz Leskes 2017-07-09 11:29:35 +02:00
parent cb3674c5ee
commit 1f4d8a05d1
1 changed files with 7 additions and 6 deletions

View File

@ -756,7 +756,7 @@ public class TranslogTests extends ESTestCase {
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
// any errors on threads
final List<Exception> errors = new CopyOnWriteArrayList<>();
logger.debug("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps);
logger.info("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps);
for (int i = 0; i < writers.length; i++) {
final String threadName = "writer_" + i;
final int threadId = i;
@ -797,12 +797,13 @@ public class TranslogTests extends ESTestCase {
synchronized (flushMutex) {
// we need not do this concurrently as we need to make sure that the generation
// we're committing - is still present when we're committing
long localCheckpoint = tracker.getCheckpoint() + 1;
long localCheckpoint = tracker.getCheckpoint();
translog.rollGeneration();
// expose the new checkpoint (simulating a commit), before we trim the translog
lastCommittedLocalCheckpoint.set(localCheckpoint);
deletionPolicy.setMinTranslogGenerationForRecovery(
translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration);
translog.trimUnreferencedReaders();
lastCommittedLocalCheckpoint.set(localCheckpoint);
}
}
if (id % 7 == 0) {
@ -812,7 +813,7 @@ public class TranslogTests extends ESTestCase {
}
counter++;
}
logger.debug("--> [{}] done. wrote [{}] ops.", threadName, counter);
logger.info("--> [{}] done. wrote [{}] ops.", threadName, counter);
}
@Override
@ -854,7 +855,7 @@ public class TranslogTests extends ESTestCase {
// captures the last committed checkpoint, while holding the view, simulating
// recovery logic which captures a view and gets a lucene commit
committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get();
logger.debug("--> [{}] opened view from [{}]", threadId, view.viewGenToRelease);
logger.info("--> [{}] opened view from [{}]", threadId, view.viewGenToRelease);
}
@Override
@ -896,7 +897,7 @@ public class TranslogTests extends ESTestCase {
}
}
closeView();
logger.debug("--> [{}] done. tested [{}] snapshots", threadId, iter);
logger.info("--> [{}] done. tested [{}] snapshots", threadId, iter);
}
}, threadId);
readers[i].start();