Use separate searchers for "search visibility" vs "move indexing buffer to disk (#26972)

Today, when ES detects it's using too much heap vs the configured indexing
buffer (default 10% of JVM heap) it opens a new searcher to force Lucene to move
the bytes to disk, clear version map, etc.

But this has the unexpected side effect of making newly indexed/deleted
documents visible to future searches, which is not nice for users who are trying
to prevent that, e.g. #3593.

This is also an indirect spinoff from #26802 where we potentially pay a big
price on rebuilding caches etc. when updates / realtime-get is used. We are
refreshing the internal reader for realtime gets which causes for instance
global ords to be rebuild. I think we can gain quite a bit if we'd use a reader
that is only used for GETs and not for searches etc. that way we can also solve
problems of searchers being refreshed unexpectedly aside of replica recovery /
relocation.

Closes #15768
Closes #26912
This commit is contained in:
Simon Willnauer 2017-10-12 17:19:43 +02:00 committed by GitHub
parent e1679bfe5e
commit 21eb9bdf6a
8 changed files with 220 additions and 94 deletions

View File

@ -305,9 +305,9 @@ public class TermVectorsResponse extends ActionResponse implements ToXContentObj
long sumDocFreq = curTerms.getSumDocFreq();
int docCount = curTerms.getDocCount();
long sumTotalTermFrequencies = curTerms.getSumTotalTermFreq();
if (docCount > 0) {
assert ((sumDocFreq > 0)) : "docCount >= 0 but sumDocFreq ain't!";
assert ((sumTotalTermFrequencies > 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!";
if (docCount >= 0) {
assert ((sumDocFreq >= 0)) : "docCount >= 0 but sumDocFreq ain't!";
assert ((sumTotalTermFrequencies >= 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!";
builder.startObject(FieldStrings.FIELD_STATISTICS);
builder.field(FieldStrings.SUM_DOC_FREQ, sumDocFreq);
builder.field(FieldStrings.DOC_COUNT, docCount);

View File

@ -90,7 +90,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.BiFunction;
public abstract class Engine implements Closeable {
@ -465,8 +465,9 @@ public abstract class Engine implements Closeable {
PENDING_OPERATIONS
}
protected final GetResult getFromSearcher(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
final Searcher searcher = searcherFactory.apply("get");
protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory,
SearcherScope scope) throws EngineException {
final Searcher searcher = searcherFactory.apply("get", scope);
final DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
@ -494,23 +495,40 @@ public abstract class Engine implements Closeable {
}
}
public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;
public abstract GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException;
/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
*
* @see Searcher#close()
*/
public final Searcher acquireSearcher(String source) throws EngineException {
return acquireSearcher(source, SearcherScope.EXTERNAL);
}
/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
* @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes
*
* @see Searcher#close()
*/
public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
boolean success = false;
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
store.incRef();
try {
final SearcherManager manager = getSearcherManager(); // can never be null
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
@ -536,6 +554,10 @@ public abstract class Engine implements Closeable {
}
}
public enum SearcherScope {
EXTERNAL, INTERNAL
}
/** returns the translog for this engine */
public abstract Translog getTranslog();
@ -768,7 +790,7 @@ public abstract class Engine implements Closeable {
the store is closed so we need to make sure we increment it here
*/
try {
return getSearcherManager().isSearcherCurrent() == false;
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
@ -1306,7 +1328,7 @@ public abstract class Engine implements Closeable {
}
}
protected abstract SearcherManager getSearcherManager();
protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
/**
* Method to close the engine while the write lock is held.

View File

@ -93,7 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
public class InternalEngine extends Engine {
@ -108,20 +108,18 @@ public class InternalEngine extends Engine {
private final IndexWriter indexWriter;
private final SearcherFactory searcherFactory;
private final SearcherManager searcherManager;
private final SearcherManager externalSearcherManager;
private final SearcherManager internalSearcherManager;
private final Lock flushLock = new ReentrantLock();
private final ReentrantLock optimizeLock = new ReentrantLock();
// A uid (in the form of BytesRef) to the version map
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
private final LiveVersionMap versionMap;
private final LiveVersionMap versionMap = new LiveVersionMap();
private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
private volatile SegmentInfos lastCommittedSegmentInfos;
private final IndexThrottle throttle;
@ -153,7 +151,6 @@ public class InternalEngine extends Engine {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
@ -163,7 +160,8 @@ public class InternalEngine extends Engine {
store.incRef();
IndexWriter writer = null;
Translog translog = null;
SearcherManager manager = null;
SearcherManager externalSearcherManager = null;
SearcherManager internalSearcherManager = null;
EngineMergeScheduler scheduler = null;
boolean success = false;
try {
@ -171,7 +169,6 @@ public class InternalEngine extends Engine {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
final SeqNoStats seqNoStats;
switch (openMode) {
@ -215,20 +212,21 @@ public class InternalEngine extends Engine {
throw e;
}
}
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
this.internalSearcherManager = internalSearcherManager;
this.externalSearcherManager = externalSearcherManager;
internalSearcherManager.addListener(versionMap);
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) {
searcherManager.addListener(listener);
this.externalSearcherManager.addListener(listener);
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler);
versionMap.clear();
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
@ -345,6 +343,7 @@ public class InternalEngine extends Engine {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
flush(true, true);
refresh("translog_recovery");
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
@ -441,14 +440,16 @@ public class InternalEngine extends Engine {
return uuid;
}
private SearcherManager createSearcherManager() throws EngineException {
private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
try {
try {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
searcherManager = new SearcherManager(directoryReader, searcherFactory);
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
if (readSegmentsInfo) {
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
}
success = true;
return searcherManager;
} catch (IOException e) {
@ -468,10 +469,11 @@ public class InternalEngine extends Engine {
}
@Override
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid());
if (versionValue != null) {
@ -482,12 +484,16 @@ public class InternalEngine extends Engine {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
refresh("realtime_get");
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}
// no version, get the version from the index, we know that we refresh on flush
return getFromSearcher(get, searcherFactory);
return getFromSearcher(get, searcherFactory, scope);
}
}
@ -1187,17 +1193,34 @@ public class InternalEngine extends Engine {
@Override
public void refresh(String source) throws EngineException {
refresh(source, SearcherScope.EXTERNAL);
}
final void refresh(String source, SearcherScope scope) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
searcherManager.maybeRefreshBlocking();
switch (scope) {
case EXTERNAL:
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
externalSearcherManager.maybeRefreshBlocking();
// the break here is intentional we never refresh both internal / external together
break;
case INTERNAL:
internalSearcherManager.maybeRefreshBlocking();
break;
default:
throw new IllegalArgumentException("unknown scope: " + scope);
}
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed", e);
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
@ -1208,36 +1231,20 @@ public class InternalEngine extends Engine {
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
maybePruneDeletedTombstones();
versionMapRefreshPending.set(false);
mergeScheduler.refreshConfig();
}
@Override
public void writeIndexingBuffer() throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are writing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
// refresh API), and another for version map interactions. See #15768.
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
final long indexingBufferBytes = indexWriter.ramBytesUsed();
final boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes / 4 < versionMapBytes);
if (useRefresh) {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
refresh("write indexing buffer");
} else {
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
indexWriter.flush();
}
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
refresh("write indexing buffer", SearcherScope.INTERNAL);
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
@ -1302,10 +1309,11 @@ public class InternalEngine extends Engine {
maybeFailEngine("renew sync commit", ex);
throw new EngineException(shardId, "failed to renew sync commit", ex);
}
if (renewed) { // refresh outside of the write lock
refresh("renew sync commit");
if (renewed) {
// refresh outside of the write lock
// we have to refresh internal searcher here to ensure we release unreferenced segments.
refresh("renew sync commit", SearcherScope.INTERNAL);
}
return renewed;
}
@ -1347,7 +1355,7 @@ public class InternalEngine extends Engine {
commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
refresh("version_table_flush", SearcherScope.INTERNAL);
translog.trimUnreferencedReaders();
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
@ -1651,8 +1659,9 @@ public class InternalEngine extends Engine {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
try {
this.versionMap.clear();
internalSearcherManager.removeListener(versionMap);
try {
IOUtils.close(searcherManager);
IOUtils.close(externalSearcherManager, internalSearcherManager);
} catch (Exception e) {
logger.warn("Failed to close SearcherManager", e);
}
@ -1684,8 +1693,15 @@ public class InternalEngine extends Engine {
}
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
switch (scope) {
case INTERNAL:
return internalSearcherManager;
case EXTERNAL:
return externalSearcherManager;
default:
throw new IllegalStateException("unknown scope: " + scope);
}
}
private Releasable acquireLock(BytesRef uid) {
@ -1698,7 +1714,7 @@ public class InternalEngine extends Engine {
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_version")) {
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
}
}

View File

@ -59,8 +59,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
private volatile Maps maps = new Maps();
private ReferenceManager<?> mgr;
/** Bytes consumed for each BytesRef UID:
* In this base value, we account for the {@link BytesRef} object itself as
* well as the header of the byte[] array it holds, and some lost bytes due
@ -98,21 +96,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
/** Tracks bytes used by tombstones (deletes) */
final AtomicLong ramBytesUsedTombstones = new AtomicLong();
/** Sync'd because we replace old mgr. */
synchronized void setManager(ReferenceManager<?> newMgr) {
if (mgr != null) {
mgr.removeListener(this);
}
mgr = newMgr;
// In case InternalEngine closes & opens a new IndexWriter/SearcherManager, all deletes are made visible, so we clear old and
// current here. This is safe because caller holds writeLock here (so no concurrent adds/deletes can be happeninge):
maps = new Maps();
// So we are notified when reopen starts and finishes
mgr.addListener(this);
}
@Override
public void beforeRefresh() throws IOException {
// Start sending all updates after this point to the new
@ -249,11 +232,6 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index
// is being closed:
//ramBytesUsedTombstones.set(0);
if (mgr != null) {
mgr.removeListener(this);
mgr = null;
}
}
@Override

View File

@ -1005,6 +1005,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
final long time = System.nanoTime();
final Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
engine.refresh("flush"); // TODO this is technically wrong we should remove this in 7.0
flushMetric.inc(System.nanoTime() - time);
return commitId;
}
@ -1032,8 +1033,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (logger.isTraceEnabled()) {
logger.trace("force merge with {}", forceMerge);
}
getEngine().forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
Engine engine = getEngine();
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
forceMerge.onlyExpungeDeletes(), false, false);
if (forceMerge.flush()) {
engine.refresh("force_merge"); // TODO this is technically wrong we should remove this in 7.0
}
}
/**
@ -1046,9 +1051,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion();
// we just want to upgrade the segments, not actually forge merge to a single segment
getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
final Engine engine = getEngine();
engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
engine.refresh("upgrade"); // TODO this is technically wrong we should remove this in 7.0
org.apache.lucene.util.Version version = minimumCompatibleVersion();
if (logger.isTraceEnabled()) {
logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version);
@ -1127,11 +1135,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// fail the engine. This will cause this shard to also be removed from the node's index service.
getEngine().failEngine(reason, e);
}
public Engine.Searcher acquireSearcher(String source) {
return acquireSearcher(source, Engine.SearcherScope.EXTERNAL);
}
private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
readAllowed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source);
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
boolean success = false;
try {
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher);

View File

@ -86,6 +86,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
@ -942,7 +943,7 @@ public class InternalEngineTests extends ESTestCase {
engine.index(indexForDoc(doc));
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory));
final AtomicBoolean flushFinished = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(2);
@ -977,7 +978,7 @@ public class InternalEngineTests extends ESTestCase {
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
// create a document
Document document = testDocumentWithTextField();
@ -1002,6 +1003,12 @@ public class InternalEngineTests extends ESTestCase {
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
// but not real time is not yet visible
getResult = engine.get(newGet(false, doc), searcherFactory);
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// refresh and it should be there
engine.refresh("test");
@ -1237,6 +1244,7 @@ public class InternalEngineTests extends ESTestCase {
assertTrue(engine.tryRenewSyncCommit());
assertEquals(1, engine.segments(false).size());
} else {
engine.refresh("test");
assertBusy(() -> assertEquals(1, engine.segments(false).size()));
}
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
@ -1311,6 +1319,38 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getVersion(), equalTo(1L));
}
/**
* simulates what an upsert / update API does
*/
public void testVersionedUpdate() throws IOException {
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED);
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
assertEquals(1, get.version());
}
Engine.Index update_1 = new Engine.Index(newUid(doc), doc, 1);
Engine.IndexResult update_1_result = engine.index(update_1);
assertThat(update_1_result.getVersion(), equalTo(2L));
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
assertEquals(2, get.version());
}
Engine.Index update_2 = new Engine.Index(newUid(doc), doc, 2);
Engine.IndexResult update_2_result = engine.index(update_2);
assertThat(update_2_result.getVersion(), equalTo(3L));
try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) {
assertEquals(3, get.version());
}
}
public void testVersioningNewIndex() throws IOException {
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
@ -1337,12 +1377,14 @@ public class InternalEngineTests extends ESTestCase {
assertEquals(numDocs, test.reader().numDocs());
}
engine.forceMerge(true, 1, false, false, false);
engine.refresh("test");
assertEquals(engine.segments(true).size(), 1);
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
engine.forceMerge(true, 10, true, false, false); //expunge deletes
engine.refresh("test");
assertEquals(engine.segments(true).size(), 1);
try (Engine.Searcher test = engine.acquireSearcher("test")) {
@ -1354,7 +1396,7 @@ public class InternalEngineTests extends ESTestCase {
index = indexForDoc(doc);
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
engine.forceMerge(true, 10, false, false, false); //expunge deletes
engine.refresh("test");
assertEquals(engine.segments(true).size(), 1);
try (Engine.Searcher test = engine.acquireSearcher("test")) {
assertEquals(numDocs - 2, test.reader().numDocs());
@ -1561,6 +1603,7 @@ public class InternalEngineTests extends ESTestCase {
}
if (randomBoolean()) {
engine.flush();
engine.refresh("test");
}
firstOp = false;
}
@ -1716,11 +1759,12 @@ public class InternalEngineTests extends ESTestCase {
}
if (randomBoolean()) {
engine.flush();
engine.refresh("test");
}
if (rarely()) {
// simulate GC deletes
engine.refresh("gc_simulation");
engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL);
engine.clearDeletedTombstones();
if (docDeleted) {
lastOpVersion = Versions.NOT_FOUND;
@ -1805,6 +1849,7 @@ public class InternalEngineTests extends ESTestCase {
}
if (randomBoolean()) {
engine.flush();
engine.refresh("test");
}
}
@ -1884,7 +1929,7 @@ public class InternalEngineTests extends ESTestCase {
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
final Term uidTerm = newUid(doc);
engine.index(indexForDoc(doc));
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread(() -> {
startGun.countDown();
@ -2314,7 +2359,7 @@ public class InternalEngineTests extends ESTestCase {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
engine.config().setEnableGcDeletes(false);
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
// Add document
Document document = testDocument();
@ -2644,6 +2689,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getVersion(), equalTo(1L));
if (flush) {
engine.flush();
engine.refresh("test");
}
doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
@ -3847,7 +3893,7 @@ public class InternalEngineTests extends ESTestCase {
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
final Term uid = newUid(doc);
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Searcher> searcherFactory = engine::acquireSearcher;
for (int i = 0; i < numberOfOperations; i++) {
if (randomBoolean()) {
final Engine.Index index = new Engine.Index(
@ -4203,4 +4249,58 @@ public class InternalEngineTests extends ESTestCase {
IOUtils.close(recoveringEngine);
}
}
public void assertSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
assertEquals(rightLeaves.size(), leftLeaves.size());
for (int i = 0; i < leftLeaves.size(); i++) {
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
}
}
public void assertNotSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
if (rightLeaves.size() == leftLeaves.size()) {
for (int i = 0; i < leftLeaves.size(); i++) {
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
return; // all is well
}
}
fail("readers are same");
}
}
public void testRefreshScopedSearcher() throws IOException {
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertSameReader(getSearcher, searchSearcher);
}
for (int i = 0; i < 10; i++) {
final String docId = Integer.toString(i);
final ParsedDocument doc =
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
engine.index(primaryResponse);
}
assertTrue(engine.refreshNeeded());
engine.refresh("test", Engine.SearcherScope.INTERNAL);
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertEquals(10, getSearcher.reader().numDocs());
assertEquals(0, searchSearcher.reader().numDocs());
assertNotSameReader(getSearcher, searchSearcher);
}
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertEquals(10, getSearcher.reader().numDocs());
assertEquals(10, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}
}
}

View File

@ -1160,7 +1160,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(shard, "test", "test");
try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test",
new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) {
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount + 1));
assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount));
}
closeShards(shard);
}

View File

@ -270,7 +270,6 @@ public class RefreshListenersTests extends ESTestCase {
* Uses a bunch of threads to index, wait for refresh, and non-realtime get documents to validate that they are visible after waiting
* regardless of what crazy sequence of events causes the refresh listener to fire.
*/
@TestLogging("_root:debug,org.elasticsearch.index.engine.Engine.DW:trace")
public void testLotsOfThreads() throws Exception {
int threadCount = between(3, 10);
maxListeners = between(1, threadCount * 2);