Currently, we do not exclude soft-deleted documents when opening index reader in the FrozenEngine. Backport of #51192
This commit is contained in:
parent
79cf0894fa
commit
43ed244a04
|
@ -54,7 +54,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
|
|||
super(config, null, null, true, Function.identity());
|
||||
this.stats = new SegmentsStats();
|
||||
Directory directory = store.directory();
|
||||
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
|
||||
try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings())) {
|
||||
for (LeafReaderContext ctx : reader.getContext().leaves()) {
|
||||
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
|
||||
fillSegmentStats(segmentReader, true, stats);
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.lucene.Lucene;
|
|||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
|
@ -68,7 +69,7 @@ public class ReadOnlyEngine extends Engine {
|
|||
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
|
||||
* ID field if we are reading form memory maps.
|
||||
*/
|
||||
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
|
||||
private static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
|
||||
BlockTreeTermsReader.FSTLoadMode.AUTO.name());
|
||||
private final SegmentInfos lastCommittedSegmentInfos;
|
||||
private final SeqNoStats seqNoStats;
|
||||
|
@ -534,4 +535,13 @@ public class ReadOnlyEngine extends Engine {
|
|||
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
|
||||
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
|
||||
}
|
||||
|
||||
protected DirectoryReader openDirectory(Directory dir, IndexSettings indexSettings) throws IOException {
|
||||
final DirectoryReader reader = DirectoryReader.open(dir, OFF_HEAP_READER_ATTRIBUTES);
|
||||
if (indexSettings.isSoftDeleteEnabled()) {
|
||||
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
|
||||
} else {
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public final class FrozenEngine extends ReadOnlyEngine {
|
|||
|
||||
boolean success = false;
|
||||
Directory directory = store.directory();
|
||||
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
|
||||
try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings())) {
|
||||
canMatchReader = ElasticsearchDirectoryReader.wrap(new RewriteCachingDirectoryReader(directory, reader.leaves()),
|
||||
config.getShardId());
|
||||
// we record the segment stats here - that's what the reader needs when it's open and it give the user
|
||||
|
@ -167,7 +167,7 @@ public final class FrozenEngine extends ReadOnlyEngine {
|
|||
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
|
||||
listeners.beforeRefresh();
|
||||
}
|
||||
final DirectoryReader dirReader = DirectoryReader.open(engineConfig.getStore().directory(), OFF_HEAP_READER_ATTRIBUTES);
|
||||
final DirectoryReader dirReader = openDirectory(engineConfig.getStore().directory(), engineConfig.getIndexSettings());
|
||||
reader = lastOpenedReader = wrapReader(dirReader, Function.identity());
|
||||
processReader(reader);
|
||||
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
|
||||
|
|
|
@ -33,6 +33,8 @@ import java.util.concurrent.CyclicBarrier;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FrozenEngineTests extends EngineTestCase {
|
||||
|
||||
public void testAcquireReleaseReset() throws IOException {
|
||||
|
@ -328,4 +330,30 @@ public class FrozenEngineTests extends EngineTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSearchers() throws Exception {
|
||||
IOUtils.close(engine, store);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
try (Store store = createStore()) {
|
||||
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null,
|
||||
globalCheckpoint::get, new NoneCircuitBreakerService());
|
||||
final int totalDocs;
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
applyOperations(engine, generateHistoryOnReplica(between(10, 1000), false, randomBoolean(), randomBoolean()));
|
||||
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
|
||||
engine.syncTranslog();
|
||||
engine.flush();
|
||||
engine.refresh("test");
|
||||
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
|
||||
totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length;
|
||||
}
|
||||
}
|
||||
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
|
||||
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
|
||||
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
|
||||
assertThat(topDocs.scoreDocs.length, equalTo(totalDocs));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.document.RestGetAction;
|
||||
import org.elasticsearch.rest.action.document.RestIndexAction;
|
||||
|
@ -304,7 +305,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
|
|||
assertRollUpJob("rollup-job-test");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testSlmStats() throws IOException {
|
||||
SnapshotLifecyclePolicy slmPolicy = new SnapshotLifecyclePolicy("test-policy", "test-policy", "* * * 31 FEB ? *", "test-repo",
|
||||
Collections.singletonMap("indices", Collections.singletonList("*")), null);
|
||||
|
@ -823,4 +824,40 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void testFrozenIndexAfterRestarted() throws Exception {
|
||||
final String index = "test_frozen_index";
|
||||
if (isRunningAgainstOldCluster()) {
|
||||
Settings.Builder settings = Settings.builder();
|
||||
if (minimumNodeVersion().onOrAfter(Version.V_6_5_0) && randomBoolean()) {
|
||||
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
|
||||
}
|
||||
createIndex(index, settings.build());
|
||||
ensureGreen(index);
|
||||
int numDocs = randomIntBetween(10, 500);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
int id = randomIntBetween(0, 100);
|
||||
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
|
||||
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
|
||||
assertOK(client().performRequest(indexRequest));
|
||||
if (rarely()) {
|
||||
flush(index, randomBoolean());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ensureGreen(index);
|
||||
final int totalHits = (int) XContentMapValues.extractValue("hits.total.value",
|
||||
entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
|
||||
assertOK(client().performRequest(new Request("POST", index + "/_freeze")));
|
||||
ensureGreen(index);
|
||||
assertNoFileBasedRecovery(index, n -> true);
|
||||
final Request request = new Request("GET", "/" + index + "/_search");
|
||||
request.addParameter("ignore_throttled", "false");
|
||||
assertThat(XContentMapValues.extractValue("hits.total.value", entityAsMap(client().performRequest(request))),
|
||||
equalTo(totalHits));
|
||||
assertOK(client().performRequest(new Request("POST", index + "/_unfreeze")));
|
||||
ensureGreen(index);
|
||||
assertNoFileBasedRecovery(index, n -> true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue