mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
add internal source to acquire searcher
add a source indicating why the searcher was acquired
This commit is contained in:
parent
167538ef0d
commit
78af818d72
@ -157,7 +157,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
|
||||
// shardStatus.estimatedFlushableMemorySize = indexShard.estimateFlushableMemorySize();
|
||||
shardStatus.translogId = indexShard.translog().currentId();
|
||||
shardStatus.translogOperations = indexShard.translog().estimatedNumberOfOperations();
|
||||
Engine.Searcher searcher = indexShard.acquireSearcher();
|
||||
Engine.Searcher searcher = indexShard.acquireSearcher("indices_status");
|
||||
try {
|
||||
shardStatus.docs = new DocsStatus();
|
||||
shardStatus.docs.numDocs = searcher.reader().numDocs();
|
||||
|
@ -179,7 +179,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
|
||||
} else {
|
||||
SearchContext.setCurrent(new DefaultSearchContext(0,
|
||||
new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()),
|
||||
null, indexShard.acquireSearcher(), indexService, indexShard,
|
||||
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
|
||||
scriptService, cacheRecycler));
|
||||
try {
|
||||
ParsedQuery parsedQuery = queryParserService.parse(request.querySource());
|
||||
|
@ -163,7 +163,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
|
||||
new ShardSearchRequest().types(request.types())
|
||||
.filteringAliases(request.filteringAliases())
|
||||
.nowInMillis(request.nowInMillis()),
|
||||
shardTarget, indexShard.acquireSearcher(), indexService, indexShard,
|
||||
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
|
||||
scriptService, cacheRecycler);
|
||||
SearchContext.setCurrent(context);
|
||||
|
||||
|
@ -149,7 +149,7 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction<Su
|
||||
protected ShardSuggestResponse shardOperation(ShardSuggestRequest request) throws ElasticSearchException {
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
IndexShard indexShard = indexService.shardSafe(request.shardId());
|
||||
final Engine.Searcher searcher = indexShard.acquireSearcher();
|
||||
final Engine.Searcher searcher = indexShard.acquireSearcher("suggest");
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
BytesReference suggest = request.suggest();
|
||||
|
@ -82,13 +82,13 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
GetResult get(Get get) throws EngineException;
|
||||
|
||||
/**
|
||||
* Retruns a new searcher instance. The consumer of this
|
||||
* Returns a new searcher instance. The consumer of this
|
||||
* API is responsible for releasing the returned seacher in a
|
||||
* safe manner, preferrablly in a try/finally block.
|
||||
* safe manner, preferably in a try/finally block.
|
||||
*
|
||||
* @see Searcher#release()
|
||||
*/
|
||||
Searcher acquireSearcher() throws EngineException;
|
||||
Searcher acquireSearcher(String source) throws EngineException;
|
||||
|
||||
List<Segment> segments();
|
||||
|
||||
@ -160,6 +160,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
|
||||
static interface Searcher extends Releasable {
|
||||
|
||||
/**
|
||||
* The source that caused this searcher to be acquired.
|
||||
*/
|
||||
String source();
|
||||
|
||||
IndexReader reader();
|
||||
|
||||
IndexSearcher searcher();
|
||||
@ -167,12 +172,19 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||
|
||||
static class SimpleSearcher implements Searcher {
|
||||
|
||||
private final String source;
|
||||
private final IndexSearcher searcher;
|
||||
|
||||
public SimpleSearcher(IndexSearcher searcher) {
|
||||
public SimpleSearcher(String source, IndexSearcher searcher) {
|
||||
this.source = source;
|
||||
this.searcher = searcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String source() {
|
||||
return this.source();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexReader reader() {
|
||||
return searcher.getIndexReader();
|
||||
|
@ -337,7 +337,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
}
|
||||
|
||||
// no version, get the version from the index, we know that we refresh on flush
|
||||
Searcher searcher = acquireSearcher();
|
||||
Searcher searcher = acquireSearcher("get");
|
||||
final Versions.DocIdAndVersion docIdAndVersion;
|
||||
try {
|
||||
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
|
||||
@ -676,19 +676,19 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Searcher acquireSearcher() throws EngineException {
|
||||
public final Searcher acquireSearcher(String source) throws EngineException {
|
||||
SearcherManager manager = this.searcherManager;
|
||||
try {
|
||||
IndexSearcher searcher = manager.acquire();
|
||||
return newSearcher(searcher, manager);
|
||||
return newSearcher(source, searcher, manager);
|
||||
} catch (IOException ex) {
|
||||
logger.error("failed to accquire searcher for shard [{}]", ex, shardId);
|
||||
logger.error("failed to acquire searcher, source {}", ex, source);
|
||||
throw new EngineException(shardId, ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected Searcher newSearcher(IndexSearcher searcher, SearcherManager manager) {
|
||||
return new RobinSearcher(searcher, manager);
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
|
||||
return new RobinSearcher(source, searcher, manager);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1128,7 +1128,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
Map<String, Segment> segments = new HashMap<String, Segment>();
|
||||
|
||||
// first, go over and compute the search ones...
|
||||
Searcher searcher = acquireSearcher();
|
||||
Searcher searcher = acquireSearcher("segments");
|
||||
try {
|
||||
for (AtomicReaderContext reader : searcher.reader().leaves()) {
|
||||
assert reader.reader() instanceof SegmentReader;
|
||||
@ -1279,7 +1279,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
}
|
||||
|
||||
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
|
||||
Searcher searcher = acquireSearcher();
|
||||
Searcher searcher = acquireSearcher("load_version");
|
||||
try {
|
||||
return Versions.loadVersion(searcher.reader(), uid);
|
||||
} finally {
|
||||
@ -1402,14 +1402,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
|
||||
static class RobinSearcher implements Searcher {
|
||||
|
||||
private final String source;
|
||||
private final IndexSearcher searcher;
|
||||
private final SearcherManager manager;
|
||||
|
||||
private RobinSearcher(IndexSearcher searcher, SearcherManager manager) {
|
||||
private RobinSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
|
||||
this.source = source;
|
||||
this.searcher = searcher;
|
||||
this.manager = manager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String source() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexReader reader() {
|
||||
return searcher.getIndexReader();
|
||||
@ -1478,7 +1485,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
// fresh index writer, just do on all of it
|
||||
newSearcher = searcher;
|
||||
} else {
|
||||
currentSearcher = acquireSearcher();
|
||||
currentSearcher = acquireSearcher("search_factory");
|
||||
// figure out the newSearcher, with only the new readers that are relevant for us
|
||||
List<IndexReader> readers = Lists.newArrayList();
|
||||
for (AtomicReaderContext newReaderContext : searcher.getIndexReader().leaves()) {
|
||||
@ -1502,8 +1509,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
||||
|
||||
if (newSearcher != null) {
|
||||
IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId,
|
||||
new SimpleSearcher(searcher),
|
||||
new SimpleSearcher(newSearcher));
|
||||
new SimpleSearcher("warmer", searcher),
|
||||
new SimpleSearcher("warmer", newSearcher));
|
||||
warmer.warm(context);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
|
@ -227,7 +227,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
||||
private void loadQueries(IndexShard shard) {
|
||||
try {
|
||||
shard.refresh(new Engine.Refresh().force(true).source("percolator_load_queries"));
|
||||
Engine.Searcher searcher = shard.acquireSearcher();
|
||||
Engine.Searcher searcher = shard.acquireSearcher("percolator_load_queries");
|
||||
try {
|
||||
Query query = new XConstantScoreQuery(
|
||||
indexCache.filter().cache(
|
||||
|
@ -98,7 +98,7 @@ public interface IndexShard extends IndexShardComponent {
|
||||
|
||||
FieldDataStats fieldDataStats(String... fields);
|
||||
|
||||
CompletionStats completionStats(String ... fields);
|
||||
CompletionStats completionStats(String... fields);
|
||||
|
||||
PercolatorQueriesRegistry percolateRegistry();
|
||||
|
||||
@ -136,7 +136,7 @@ public interface IndexShard extends IndexShardComponent {
|
||||
|
||||
void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException;
|
||||
|
||||
Engine.Searcher acquireSearcher();
|
||||
Engine.Searcher acquireSearcher(String source);
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if this shard can ignore a recovery attempt made to it (since the already doing/done it)
|
||||
|
@ -457,7 +457,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
@Override
|
||||
public DocsStats docStats() {
|
||||
try {
|
||||
final Engine.Searcher searcher = acquireSearcher();
|
||||
final Engine.Searcher searcher = acquireSearcher("doc_stats");
|
||||
try {
|
||||
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
|
||||
} finally {
|
||||
@ -533,7 +533,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
public CompletionStats completionStats(String... fields) {
|
||||
CompletionStats completionStats = new CompletionStats();
|
||||
try {
|
||||
final Engine.Searcher currentSearcher = acquireSearcher();
|
||||
final Engine.Searcher currentSearcher = acquireSearcher("completion_stats");
|
||||
try {
|
||||
PostingsFormat postingsFormat = this.codecService.postingsFormatService().get(Completion090PostingsFormat.CODEC_NAME).get();
|
||||
if (postingsFormat instanceof Completion090PostingsFormat) {
|
||||
@ -591,9 +591,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Searcher acquireSearcher() {
|
||||
public Engine.Searcher acquireSearcher(String source) {
|
||||
readAllowed();
|
||||
return engine.acquireSearcher();
|
||||
return engine.acquireSearcher(source);
|
||||
}
|
||||
|
||||
public void close(String reason) {
|
||||
|
@ -58,7 +58,7 @@ public class ShardTermVectorService extends AbstractIndexShardComponent {
|
||||
}
|
||||
|
||||
public TermVectorResponse getTermVector(TermVectorRequest request) {
|
||||
final Engine.Searcher searcher = indexShard.acquireSearcher();
|
||||
final Engine.Searcher searcher = indexShard.acquireSearcher("term_vector");
|
||||
IndexReader topLevelReader = searcher.reader();
|
||||
final TermVectorResponse termVectorResponse = new TermVectorResponse(request.index(), request.type(), request.id());
|
||||
final Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
|
||||
|
@ -179,7 +179,7 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ
|
||||
private void purgeShards(List<IndexShard> shardsToPurge) {
|
||||
for (IndexShard shardToPurge : shardsToPurge) {
|
||||
Query query = NumericRangeQuery.newLongRange(TTLFieldMapper.NAME, null, System.currentTimeMillis(), false, true);
|
||||
Engine.Searcher searcher = shardToPurge.acquireSearcher();
|
||||
Engine.Searcher searcher = shardToPurge.acquireSearcher("indices_ttl");
|
||||
try {
|
||||
logger.debug("[{}][{}] purging shard", shardToPurge.routingEntry().index(), shardToPurge.routingEntry().id());
|
||||
ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector(shardToPurge.routingEntry().index());
|
||||
|
@ -117,6 +117,11 @@ public class PercolateContext extends SearchContext {
|
||||
final IndexReader topLevelReader = docSearcher.getIndexReader();
|
||||
AtomicReaderContext readerContext = topLevelReader.leaves().get(0);
|
||||
docEngineSearcher = new Engine.Searcher() {
|
||||
@Override
|
||||
public String source() {
|
||||
return "percolate";
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexReader reader() {
|
||||
return topLevelReader;
|
||||
|
@ -112,7 +112,7 @@ public class PercolatorService extends AbstractComponent {
|
||||
return new ExtendedMemoryIndex(true, maxReuseBytes);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
percolatorTypes = new TByteObjectHashMap<PercolatorType>(6);
|
||||
percolatorTypes.put(countPercolator.id(), countPercolator);
|
||||
percolatorTypes.put(queryCountPercolator.id(), queryCountPercolator);
|
||||
@ -416,7 +416,7 @@ public class PercolatorService extends AbstractComponent {
|
||||
@Override
|
||||
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
|
||||
long count = 0;
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher();
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher("percolate");
|
||||
try {
|
||||
Count countCollector = count(logger, context);
|
||||
queryBasedPercolating(percolatorSearcher, context, countCollector);
|
||||
@ -450,7 +450,8 @@ public class PercolatorService extends AbstractComponent {
|
||||
|
||||
// Use a custom impl of AbstractBigArray for Object[]?
|
||||
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize == 0 ? numMatches : requestedSize);
|
||||
outer: for (PercolateShardResponse response : shardResults) {
|
||||
outer:
|
||||
for (PercolateShardResponse response : shardResults) {
|
||||
Text index = new StringText(response.getIndex());
|
||||
for (int i = 0; i < response.matches().length; i++) {
|
||||
float score = response.scores().length == 0 ? NO_SCORE : response.scores()[i];
|
||||
@ -515,7 +516,7 @@ public class PercolatorService extends AbstractComponent {
|
||||
|
||||
@Override
|
||||
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher();
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher("percolate");
|
||||
try {
|
||||
Match match = match(logger, context, highlightPhase);
|
||||
queryBasedPercolating(percolatorSearcher, context, match);
|
||||
@ -548,7 +549,7 @@ public class PercolatorService extends AbstractComponent {
|
||||
|
||||
@Override
|
||||
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher();
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher("percolate");
|
||||
try {
|
||||
MatchAndScore matchAndScore = matchAndScore(logger, context, highlightPhase);
|
||||
queryBasedPercolating(percolatorSearcher, context, matchAndScore);
|
||||
@ -658,7 +659,7 @@ public class PercolatorService extends AbstractComponent {
|
||||
|
||||
@Override
|
||||
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher();
|
||||
Engine.Searcher percolatorSearcher = context.indexShard().acquireSearcher("percolate");
|
||||
try {
|
||||
MatchAndSort matchAndSort = QueryCollector.matchAndSort(logger, context);
|
||||
queryBasedPercolating(percolatorSearcher, context, matchAndSort);
|
||||
|
@ -456,7 +456,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
||||
|
||||
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
|
||||
|
||||
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher() : searcher;
|
||||
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
|
||||
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler);
|
||||
SearchContext.setCurrent(context);
|
||||
try {
|
||||
@ -566,7 +566,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final int[] EMPTY_DOC_IDS = new int[0];
|
||||
|
||||
/**
|
||||
|
@ -299,7 +299,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
|
||||
@Test
|
||||
public void testSimpleOperations() throws Exception {
|
||||
Engine.Searcher searchResult = engine.acquireSearcher();
|
||||
Engine.Searcher searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
searchResult.release();
|
||||
|
||||
@ -310,7 +310,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
searchResult.release();
|
||||
@ -321,7 +321,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
assertThat(getResult.source().source.toBytesArray(), equalTo(B_1.toBytesArray()));
|
||||
assertThat(getResult.docIdAndVersion(), nullValue());
|
||||
getResult.release();
|
||||
|
||||
|
||||
// but, not there non realtime
|
||||
getResult = engine.get(new Engine.Get(false, newUid("1")));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
@ -330,7 +330,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
|
||||
// now its there...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
searchResult.release();
|
||||
@ -340,7 +340,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
assertThat(getResult.exists(), equalTo(true));
|
||||
assertThat(getResult.docIdAndVersion(), notNullValue());
|
||||
getResult.release();
|
||||
|
||||
|
||||
// now do an update
|
||||
document = testDocument();
|
||||
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||
@ -349,7 +349,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.index(new Engine.Index(null, newUid("1"), doc));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
|
||||
@ -361,11 +361,11 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
assertThat(getResult.source().source.toBytesArray(), equalTo(B_2.toBytesArray()));
|
||||
assertThat(getResult.docIdAndVersion(), nullValue());
|
||||
getResult.release();
|
||||
|
||||
|
||||
// refresh and it should be updated
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
|
||||
@ -375,7 +375,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||
|
||||
// its not deleted yet
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
|
||||
@ -385,11 +385,11 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
getResult = engine.get(new Engine.Get(true, newUid("1")));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
getResult.release();
|
||||
|
||||
|
||||
// refresh and it should be deleted
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
|
||||
@ -402,7 +402,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
|
||||
@ -412,7 +412,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
|
||||
// now its there...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
|
||||
@ -436,7 +436,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.index(new Engine.Index(null, newUid("1"), doc));
|
||||
|
||||
// its not updated yet...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
|
||||
@ -445,7 +445,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
// refresh and it should be updated
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
|
||||
@ -456,7 +456,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
|
||||
@Test
|
||||
public void testSearchResultRelease() throws Exception {
|
||||
Engine.Searcher searchResult = engine.acquireSearcher();
|
||||
Engine.Searcher searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
searchResult.release();
|
||||
|
||||
@ -465,7 +465,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||
|
||||
// its not there...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
|
||||
searchResult.release();
|
||||
@ -474,7 +474,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
|
||||
// now its there...
|
||||
searchResult = engine.acquireSearcher();
|
||||
searchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
|
||||
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
|
||||
// don't release the search result yet...
|
||||
@ -482,7 +482,7 @@ public class RobinEngineTests extends ElasticSearchTestCase {
|
||||
// delete, refresh and do a new search, it should not be there
|
||||
engine.delete(new Engine.Delete("test", "1", newUid("1")));
|
||||
engine.refresh(new Engine.Refresh().force(false));
|
||||
Engine.Searcher updateSearchResult = engine.acquireSearcher();
|
||||
Engine.Searcher updateSearchResult = engine.acquireSearcher("test");
|
||||
MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
|
||||
updateSearchResult.release();
|
||||
|
||||
|
@ -54,16 +54,17 @@ import java.util.concurrent.ConcurrentMap;
|
||||
public final class MockRobinEngine extends RobinEngine implements Engine {
|
||||
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<AssertingSearcher, RuntimeException>();
|
||||
private final Random random;
|
||||
|
||||
@Inject
|
||||
public MockRobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
|
||||
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer, Store store,
|
||||
SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
|
||||
MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
|
||||
CodecService codecService) throws EngineException {
|
||||
super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store,
|
||||
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer, Store store,
|
||||
SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
|
||||
MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
|
||||
CodecService codecService) throws EngineException {
|
||||
super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store,
|
||||
deletionPolicy, translog, mergePolicyProvider, mergeScheduler, analysisService, similarityService, codecService);
|
||||
final long seed = indexSettings.getAsLong(ElasticSearchTestCase.INDEX_SEED_SETTING, 0l);
|
||||
if (logger.isTraceEnabled()){
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Using [{}] for shard [{}] seed: [{}]", this.getClass().getName(), shardId, seed);
|
||||
}
|
||||
random = new Random(seed);
|
||||
@ -82,30 +83,35 @@ public final class MockRobinEngine extends RobinEngine implements Engine {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Searcher newSearcher(IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||
// this executes basic query checks and asserts that weights are normalized only once etc.
|
||||
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(random, searcher.getTopReaderContext());
|
||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
||||
return new AssertingSearcher(super.newSearcher(assertingIndexSearcher, manager), shardId);
|
||||
return new AssertingSearcher(super.newSearcher(source, assertingIndexSearcher, manager), shardId);
|
||||
}
|
||||
|
||||
public static final class AssertingSearcher implements Searcher {
|
||||
private final Searcher searcher;
|
||||
private final ShardId shardId;
|
||||
|
||||
|
||||
public AssertingSearcher(Searcher searcher, ShardId shardId) {
|
||||
this.searcher = searcher;
|
||||
this.shardId = shardId;
|
||||
INFLIGHT_ENGINE_SEARCHERS.put(this, new RuntimeException("Unreleased Searcher"));
|
||||
INFLIGHT_ENGINE_SEARCHERS.put(this, new RuntimeException("Unreleased Searcher, source [" + searcher.source() + "]"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String source() {
|
||||
return searcher.source();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() throws ElasticSearchException {
|
||||
RuntimeException remove = INFLIGHT_ENGINE_SEARCHERS.remove(this);
|
||||
assert remove != null : "Released Searcher more than once";
|
||||
return searcher.release();
|
||||
assert remove != null : "Released Searcher more than once, source [" + searcher.source() + "]";
|
||||
return searcher.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -117,7 +123,7 @@ public final class MockRobinEngine extends RobinEngine implements Engine {
|
||||
public IndexSearcher searcher() {
|
||||
return searcher.searcher();
|
||||
}
|
||||
|
||||
|
||||
public ShardId shardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user