Use exact numDocs in synced-flush and metadata snapshot (#30228)

Since #29458, we use a searcher to calculate the number of documents for
a commit stats. Sadly, that approach is flawed. The searcher might no
longer point to the last commit if it's refreshed. As synced-flush
requires an exact numDocs to work correctly, we have to exclude all
soft-deleted docs.

This commit makes synced-flush stop using CommitStats but read an exact
numDocs directly from an index commit.

Relates #29458
Relates #29530
This commit is contained in:
Nhat Nguyen 2018-05-15 18:00:53 -04:00 committed by GitHub
parent 2a2c23be2f
commit b12c2f61c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 21 deletions

View File

@ -44,6 +44,7 @@ import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
@ -149,6 +150,16 @@ public class Lucene {
return numDocs;
}
/**
* Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs.
* This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required.
*/
public static int getExactNumDocs(IndexCommit commit) throws IOException {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
}
}
/**
* Reads the segments infos from the given commit, failing if it fails to load
*/

View File

@ -862,7 +862,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos));
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
for (SegmentCommitInfo info : segmentCommitInfos) {
@ -945,6 +945,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len);
}
private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
for (IndexCommit commit : commits) {
if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) {
return commit;
}
}
throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found");
}
@Override
public Iterator<StoreFileMetaData> iterator() {
return metadata.values().iterator();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.flush;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -41,13 +42,13 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -467,15 +468,19 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
}
}
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId());
indexShard.flush(flushRequest);
final CommitStats commitStats = indexShard.commitStats();
final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit());
final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit());
final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId());
final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs);
return new PreSyncedFlushResponse(commitId, numDocs, syncId);
}
}
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {

View File

@ -289,7 +289,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
@ -312,7 +312,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
final long startingSeqNo;

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
@ -31,8 +32,10 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
@ -108,4 +111,33 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
closeShards(replica);
}
}
public void testExactNumDocsInStoreMetadataSnapshot() throws Exception {
final IndexShard replica = newShard(false);
recoveryEmptyReplica(replica);
long flushedDocs = 0;
final int numDocs = scaledRandomIntBetween(1, 20);
final Set<String> docIds = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
docIds.add(id);
indexDoc(replica, "_doc", id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
for (String id : randomSubsetOf(docIds)) {
deleteDoc(replica, "_doc", id);
docIds.remove(id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs));
recoveryTarget.decRef();
closeShards(replica);
}
}

View File

@ -26,6 +26,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -76,7 +78,9 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
@ -1104,8 +1108,7 @@ public final class InternalTestCluster extends TestCluster {
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
assertNoPendingIndexOperations();
//check that shards that have same sync id also contain same number of documents
// norelease - AwaitsFix: https://github.com/elastic/elasticsearch/pull/30228
// assertSameSyncIdSameDocs();
assertSameSyncIdSameDocs();
assertOpenTranslogReferences();
}
@ -1116,16 +1119,16 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
Tuple<String, Integer> commitStats = commitStats(indexShard);
if (commitStats != null) {
String syncId = commitStats.v1();
long liveDocsOnShard = commitStats.v2();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name +
". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId),
equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
}
@ -1133,6 +1136,22 @@ public final class InternalTestCluster extends TestCluster {
}
}
private Tuple<String, Integer> commitStats(IndexShard indexShard) {
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID);
// Only read if sync_id exists
if (Strings.hasText(syncId)) {
return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit()));
} else {
return null;
}
} catch (IllegalIndexShardStateException ex) {
return null; // Shard is closed or not started yet.
} catch (IOException ex) {
throw new AssertionError(ex);
}
}
private void assertNoPendingIndexOperations() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();