From b12c2f61c5baeb7ba200748834ff69beec5351f5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 15 May 2018 18:00:53 -0400 Subject: [PATCH] Use exact numDocs in synced-flush and metadata snapshot (#30228) Since #29458, we use a searcher to calculate the number of documents for a commit stats. Sadly, that approach is flawed. The searcher might no longer point to the last commit if it's refreshed. As synced-flush requires an exact numDocs to work correctly, we have to exclude all soft-deleted docs. This commit makes synced-flush stop using CommitStats but read an exact numDocs directly from an index commit. Relates #29458 Relates #29530 --- .../elasticsearch/common/lucene/Lucene.java | 11 +++++ .../org/elasticsearch/index/store/Store.java | 12 +++++- .../indices/flush/SyncedFlushService.java | 17 +++++--- .../recovery/PeerRecoveryTargetService.java | 4 +- .../PeerRecoveryTargetServiceTests.java | 32 ++++++++++++++ .../test/InternalTestCluster.java | 43 +++++++++++++------ 6 files changed, 98 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 0fbb0c0e2b2..25138a29096 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -44,6 +44,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -149,6 +150,16 @@ public class Lucene { return numDocs; } + /** + * Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs. + * This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required. + */ + public static int getExactNumDocs(IndexCommit commit) throws IOException { + try (DirectoryReader reader = DirectoryReader.open(commit)) { + return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); + } + } + /** * Reads the segments infos from the given commit, failing if it fails to load */ diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index f97553895f3..e0dd31c03b2 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -862,7 +862,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getNumDocs(segmentCommitInfos); + numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos)); commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { @@ -945,6 +945,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len); } + private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException { + List commits = DirectoryReader.listCommits(directory); + for (IndexCommit commit : commits) { + if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) { + return commit; + } + } + throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found"); + } + @Override public Iterator iterator() { return metadata.values().iterator(); diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 52e0ac8ab86..dfe804cb144 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.flush; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -41,13 +42,13 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -467,15 +468,19 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL } } - private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) { + private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); indexShard.flush(flushRequest); - final CommitStats commitStats = indexShard.commitStats(); - final Engine.CommitId commitId = commitStats.getRawCommitId(); - logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); - return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId()); + try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { + final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit()); + final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit()); + final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId()); + final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID); + logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs); + return new PreSyncedFlushResponse(commitId, numDocs, syncId); + } } private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index cb49eed25f8..3e09312bec8 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -289,7 +289,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde * @param recoveryTarget the target of the recovery * @return a snapshot of the store metadata */ - private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { + static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) { try { return recoveryTarget.indexShard().snapshotStoreMetadata(); } catch (final org.apache.lucene.index.IndexNotFoundException e) { @@ -312,7 +312,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); + final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget); logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); final long startingSeqNo; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 3b50fa64915..9c4c1c1e736 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -31,8 +32,10 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -108,4 +111,33 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { closeShards(replica); } } + + public void testExactNumDocsInStoreMetadataSnapshot() throws Exception { + final IndexShard replica = newShard(false); + recoveryEmptyReplica(replica); + long flushedDocs = 0; + final int numDocs = scaledRandomIntBetween(1, 20); + final Set docIds = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + docIds.add(id); + indexDoc(replica, "_doc", id); + if (randomBoolean()) { + replica.flush(new FlushRequest()); + flushedDocs = docIds.size(); + } + } + for (String id : randomSubsetOf(docIds)) { + deleteDoc(replica, "_doc", id); + docIds.remove(id); + if (randomBoolean()) { + replica.flush(new FlushRequest()); + flushedDocs = docIds.size(); + } + } + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs)); + recoveryTarget.decRef(); + closeShards(replica); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 8e9fdeac76a..0c4ffd62cdd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -26,6 +26,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -76,7 +78,9 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -1104,8 +1108,7 @@ public final class InternalTestCluster extends TestCluster { // ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures. assertNoPendingIndexOperations(); //check that shards that have same sync id also contain same number of documents - // norelease - AwaitsFix: https://github.com/elastic/elasticsearch/pull/30228 - // assertSameSyncIdSameDocs(); + assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); } @@ -1116,16 +1119,16 @@ public final class InternalTestCluster extends TestCluster { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - CommitStats commitStats = indexShard.commitStats(); - if (commitStats != null) { // null if the engine is closed or if the shard is recovering - String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); - if (syncId != null) { - long liveDocsOnShard = commitStats.getNumDocs(); - if (docsOnShards.get(syncId) != null) { - assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard)); - } else { - docsOnShards.put(syncId, liveDocsOnShard); - } + Tuple commitStats = commitStats(indexShard); + if (commitStats != null) { + String syncId = commitStats.v1(); + long liveDocsOnShard = commitStats.v2(); + if (docsOnShards.get(syncId) != null) { + assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), + equalTo(liveDocsOnShard)); + } else { + docsOnShards.put(syncId, liveDocsOnShard); } } } @@ -1133,6 +1136,22 @@ public final class InternalTestCluster extends TestCluster { } } + private Tuple commitStats(IndexShard indexShard) { + try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { + final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID); + // Only read if sync_id exists + if (Strings.hasText(syncId)) { + return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit())); + } else { + return null; + } + } catch (IllegalIndexShardStateException ex) { + return null; // Shard is closed or not started yet. + } catch (IOException ex) { + throw new AssertionError(ex); + } + } + private void assertNoPendingIndexOperations() throws Exception { assertBusy(() -> { final Collection nodesAndClients = nodes.values();