Sequence numbers commit data in Lucene uses Iterable interface (#20793)

Sequence number related data (maximum sequence number, local checkpoint,
and global checkpoint) gets stored in Lucene on each commit. The logical
place to store this data is on each Lucene commit's user commit data
structure (see IndexWriter#setCommitData and the new version
IndexWriter#setLiveCommitData). However, previously we did not store the
maximum sequence number in the commit data because the commit data got
copied over before the Lucene IndexWriter flushed the documents to segments
in the commit.  This means that between the time that the commit data was
set on the IndexWriter and the time that the IndexWriter completes the commit,
documents with higher sequence numbers could have entered the commit.
Hence, we would use FieldStats on the _seq_no field in the documents to get
the maximum sequence number value, but this suffers the drawback that if the
last sequence number in the commit corresponded to a delete document action,
that sequence number would not show up in FieldStats as there would be no
corresponding document in Lucene.

In Lucene 6.2, the commit data was changed to take an Iterable interface, so
that the commit data can be calculated and retrieved *after* all documents
have been flushed, while the commit data itself is being set on the Lucene commit.
This commit changes max_seq_no so it is stored in the commit data instead of
being calculated from FieldStats, taking advantage of the deferred calculation
of the max_seq_no through passing an Iterable that dynamically sets the iterator
data.

* improvements to iterating over commit data (and better safety guarantees)

* Adds sequence number and checkpoint testing for document deletion
intertwined with document indexing.

* improve test code slightly

* Remove caching of max_seq_no in commit data iterator and inline logging

* Adds a test for concurrently indexing and committing segments
to Lucene, ensuring the sequence number related commit data
in each Lucene commit point matches the invariants of
localCheckpoint <= highest sequence number in commit <= maxSeqNo

* fix comments

* addresses code review

* adds clarification on checking commit data on recovery from translog

* remove unneeded method
This commit is contained in:
Ali Beyad 2016-10-12 12:38:26 -04:00 committed by GitHub
parent 568033aba3
commit 7c2e761c87
3 changed files with 248 additions and 56 deletions

View File

@ -43,7 +43,6 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
@ -59,7 +58,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
@ -86,8 +84,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;
public class InternalEngine extends Engine {
/**
@ -121,6 +117,7 @@ public class InternalEngine extends Engine {
private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@ -285,7 +282,7 @@ public class InternalEngine extends Engine {
boolean success = false;
try {
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
success = true;
} finally {
if (success == false) {
@ -310,7 +307,7 @@ public class InternalEngine extends Engine {
private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final Map<String, String> commitUserData = writer.getCommitData();
final Map<String, String> commitUserData = commitDataAsMap(writer);
if (commitUserData.containsKey("translog_id")) {
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
@ -326,32 +323,20 @@ public class InternalEngine extends Engine {
}
private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
final long maxSeqNo;
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
maxSeqNo = (long) stats.getMaxValue();
} else {
maxSeqNo = NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
maxSeqNo = Long.parseLong(entry.getValue());
}
}
final Map<String, String> commitUserData = writer.getCommitData();
final long localCheckpoint;
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
} else {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
}
final long globalCheckpoint;
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}
@ -1323,23 +1308,39 @@ public class InternalEngine extends Engine {
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final Map<String, String> commitData = new HashMap<>(5);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());
commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));
writer.setLiveCommitData(() -> {
/**
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values.
* The maximum sequence number is different - we never want the maximum sequence number to be
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk
* of re-using a sequence number for two different documents when restoring from this commit
* point and subsequently writing new documents to the index. Since we only know which Lucene
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
return commitData.entrySet().iterator();
});
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
indexWriter.setCommitData(commitData);
writer.commit();
} catch (Exception ex) {
try {
@ -1395,7 +1396,8 @@ public class InternalEngine extends Engine {
public SequenceNumbersService seqNoService() {
return seqNoService;
}
@Override
@Override
public DocsStats getDocStats() {
final int numDocs = indexWriter.numDocs();
final int maxDoc = indexWriter.maxDoc();
@ -1441,4 +1443,15 @@ public class InternalEngine extends Engine {
public boolean isRecovering() {
return pendingTranslogRecovery.get();
}
/**
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
return commitData;
}
}

View File

@ -81,6 +81,13 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
return localCheckpointService.generateSeqNo();
}
/**
* Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details.
*/
public long getMaxSeqNo() {
return localCheckpointService.getMaxSeqNo();
}
/**
* marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)}
* more details

View File

@ -32,13 +32,19 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoDeletionPolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
@ -51,10 +57,13 @@ import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -87,6 +96,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
@ -124,6 +134,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@ -138,7 +149,9 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -295,6 +308,13 @@ public class InternalEngineTests extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(),
maxUnsafeAutoIdTimestamp, refreshListener);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp,
ReferenceManager.RefreshListener refreshListener) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final EngineConfig.OpenMode openMode;
@ -313,7 +333,7 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(),
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener,
@ -583,6 +603,10 @@ public class InternalEngineTests extends ESTestCase {
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024));
localCheckpoint.set(
@ -608,6 +632,8 @@ public class InternalEngineTests extends ESTestCase {
assertThat(
Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
} finally {
IOUtils.close(engine);
}
@ -1618,13 +1644,14 @@ public class InternalEngineTests extends ESTestCase {
}
public void testSeqNoAndCheckpoints() throws IOException {
// nocommit: does not test deletes
final int opCount = randomIntBetween(1, 256);
long primarySeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final String[] ids = new String[]{"1", "2", "3"};
final Set<String> indexedIds = new HashSet<>();
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
InternalEngine initialEngine = null;
try {
@ -1633,17 +1660,38 @@ public class InternalEngineTests extends ESTestCase {
.seqNoService()
.updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet());
for (int op = 0; op < opCount; op++) {
final String id = randomFrom(ids);
ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null);
final Engine.Index index = new Engine.Index(newUid("test#" + id), doc,
SequenceNumbersService.UNASSIGNED_SEQ_NO,
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, 0, -1, false);
try {
initialEngine.index(index);
final String id;
boolean versionConflict = false;
// mostly index, sometimes delete
if (rarely() && indexedIds.isEmpty() == false) {
// we have some docs indexed, so delete one of them
id = randomFrom(indexedIds);
final Engine.Delete delete = new Engine.Delete(
"test", id, newUid("test#" + id), SequenceNumbersService.UNASSIGNED_SEQ_NO,
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0, false);
try {
initialEngine.delete(delete);
indexedIds.remove(id);
} catch (VersionConflictEngineException e) {
versionConflict = true;
}
} else {
// index a document
id = randomFrom(ids);
ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null);
final Engine.Index index = new Engine.Index(newUid("test#" + id), doc,
SequenceNumbersService.UNASSIGNED_SEQ_NO,
rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, 0, -1, false);
try {
initialEngine.index(index);
indexedIds.add(id);
} catch (VersionConflictEngineException e) {
versionConflict = true;
}
}
if (versionConflict == false) {
primarySeqNo++;
} catch (VersionConflictEngineException e) {
}
replicaLocalCheckpoint =
@ -1653,6 +1701,7 @@ public class InternalEngineTests extends ESTestCase {
if (rarely()) {
localCheckpoint = primarySeqNo;
maxSeqNo = primarySeqNo;
globalCheckpoint = replicaLocalCheckpoint;
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
initialEngine.flush(true, true);
@ -1661,6 +1710,7 @@ public class InternalEngineTests extends ESTestCase {
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo());
assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint));
@ -1671,6 +1721,9 @@ public class InternalEngineTests extends ESTestCase {
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
equalTo(maxSeqNo));
} finally {
IOUtils.close(initialEngine);
@ -1681,12 +1734,19 @@ public class InternalEngineTests extends ESTestCase {
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine.recoverFromTranslog();
assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo());
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert
// that the committed max seq no is equivalent to what the current primary seq no is, as all data
// we have assigned sequence numbers to should be in the commit
equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1));
@ -1695,6 +1755,118 @@ public class InternalEngineTests extends ESTestCase {
}
}
// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
public void testConcurrentWritesAndCommits() throws Exception {
try (final Store store = createStore();
final InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(),
new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
final int numIndexingThreads = scaledRandomIntBetween(3, 6);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
// create N indexing threads to index documents simultaneously
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
final int threadIdx = threadNum;
Thread indexingThread = new Thread() {
@Override
public void run() {
try {
barrier.await(); // wait for all threads to start at the same time
// index random number of docs
for (int i = 0; i < numDocsPerThread; i++) {
final String id = "thread" + threadIdx + "#" + i;
ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null);
engine.index(new Engine.Index(newUid(id), doc));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
indexingThreads.add(indexingThread);
}
// start the indexing threads
for (Thread thread : indexingThreads) {
thread.start();
}
barrier.await(); // wait for indexing threads to all be ready to start
// create random commit points
boolean doneIndexing;
do {
doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0;
//engine.flush(); // flush and commit
} while (doneIndexing == false);
// now, verify all the commits have the correct docs according to the user commit data
long prevLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) {
Map<String, String> userData = commit.getUserData();
long localCheckpoint = userData.containsKey(InternalEngine.LOCAL_CHECKPOINT_KEY) ?
Long.parseLong(userData.get(InternalEngine.LOCAL_CHECKPOINT_KEY)) :
SequenceNumbersService.NO_OPS_PERFORMED;
long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ?
Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) :
SequenceNumbersService.UNASSIGNED_SEQ_NO;
// local checkpoint and max seq no shouldn't go backwards
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint));
assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo));
try (IndexReader reader = DirectoryReader.open(commit)) {
FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
final long highestSeqNo;
if (stats != null) {
highestSeqNo = (long) stats.getMaxValue();
} else {
highestSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
}
// make sure localCheckpoint <= highest seq no found <= maxSeqNo
assertThat(highestSeqNo, greaterThanOrEqualTo(localCheckpoint));
assertThat(highestSeqNo, lessThanOrEqualTo(maxSeqNo));
// make sure all sequence numbers up to and including the local checkpoint are in the index
FixedBitSet seqNosBitSet = getSeqNosSet(reader, highestSeqNo);
for (int i = 0; i <= localCheckpoint; i++) {
assertTrue("local checkpoint [" + localCheckpoint + "], _seq_no [" + i + "] should be indexed",
seqNosBitSet.get(i));
}
}
prevLocalCheckpoint = localCheckpoint;
prevMaxSeqNo = maxSeqNo;
}
}
}
private static FixedBitSet getSeqNosSet(final IndexReader reader, final long highestSeqNo) throws IOException {
// _seq_no are stored as doc values for the time being, so this is how we get them
// (as opposed to using an IndexSearcher or IndexReader)
final FixedBitSet bitSet = new FixedBitSet((int) highestSeqNo + 1);
final List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return bitSet;
}
for (int i = 0; i < leaves.size(); i++) {
final LeafReader leaf = leaves.get(i).reader();
final NumericDocValues values = leaf.getNumericDocValues(SeqNoFieldMapper.NAME);
if (values == null) {
continue;
}
final Bits bits = leaf.getLiveDocs();
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
if (bits == null || bits.get(docID)) {
final long seqNo = values.get(docID);
assertFalse("should not have more than one document with the same seq_no[" + seqNo + "]", bitSet.get((int) seqNo));
bitSet.set((int) seqNo);
}
}
}
return bitSet;
}
// #8603: make sure we can separately log IFD's messages
public void testIndexWriterIFDInfoStream() throws IllegalAccessException {
assumeFalse("who tests the tester?", VERBOSE);