LUCENE-10422: Read-only monitor implementation (#679)

This commit adds a read-only monitor implementation that can
search the QueryIndex of another monitor without supporting adding
new queries.
This commit is contained in:
mogui 2022-03-21 17:42:03 +01:00 committed by GitHub
parent f239c0e03c
commit be99178956
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 948 additions and 405 deletions

View File

@ -56,7 +56,9 @@ API Changes
New Features
---------------------
(No changes)
* LUCENE-10422: Monitor Improvements: `Monitor` can use a custom `Directory`
implementation. `Monitor` can be created with a readonly `QueryIndex` in order to
have readonly `Monitor` instances. (Niko Usai)
Improvements
---------------------

View File

@ -26,8 +26,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.LeafReader;
@ -38,7 +36,6 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.NamedThreadFactory;
/**
* A Monitor contains a set of {@link Query} objects with associated IDs, and efficiently matches
@ -51,14 +48,8 @@ public class Monitor implements Closeable {
private final QueryIndex queryIndex;
private final List<MonitorUpdateListener> listeners = new ArrayList<>();
private final long commitBatchSize;
private final ScheduledExecutorService purgeExecutor;
private long lastPurged = -1;
/**
* Create a non-persistent Monitor instance with the default term-filtering Presearcher
*
@ -100,22 +91,11 @@ public class Monitor implements Closeable {
this.analyzer = analyzer;
this.presearcher = presearcher;
this.queryIndex = new QueryIndex(configuration, presearcher);
long purgeFrequency = configuration.getPurgeFrequency();
this.purgeExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge"));
this.purgeExecutor.scheduleAtFixedRate(
() -> {
try {
purgeCache();
} catch (Throwable e) {
listeners.forEach(l -> l.onPurgeError(e));
}
},
purgeFrequency,
purgeFrequency,
configuration.getPurgeFrequencyUnits());
if (configuration.isReadOnly()) {
this.queryIndex = new ReadonlyQueryIndex(configuration);
} else {
this.queryIndex = new WritableQueryIndex(configuration, presearcher);
}
this.commitBatchSize = configuration.getQueryUpdateBufferSize();
}
@ -127,12 +107,13 @@ public class Monitor implements Closeable {
* @param listener listener to register
*/
public void addQueryIndexUpdateListener(MonitorUpdateListener listener) {
listeners.add(listener);
queryIndex.addListener(listener);
}
/** @return Statistics for the internal query index and cache */
public QueryCacheStats getQueryCacheStats() {
return new QueryCacheStats(queryIndex.numDocs(), queryIndex.cacheSize(), lastPurged);
public QueryCacheStats getQueryCacheStats() throws IOException {
return new QueryCacheStats(
queryIndex.numDocs(), queryIndex.cacheSize(), queryIndex.getLastPurged());
}
/** Statistics for the query cache and query index */
@ -159,17 +140,17 @@ public class Monitor implements Closeable {
*
* <p>This is normally called from a background thread at a rate set by configurePurgeFrequency().
*
* <p>When Monitor is in read-only mode, cache is NEVER purged automatically you MUST call it when
* you want new changes.
*
* @throws IOException on IO errors
*/
public void purgeCache() throws IOException {
queryIndex.purgeCache();
lastPurged = System.nanoTime();
listeners.forEach(MonitorUpdateListener::onPurge);
}
@Override
public void close() throws IOException {
purgeExecutor.shutdown();
queryIndex.close();
}
@ -192,7 +173,6 @@ public class Monitor implements Closeable {
private void commit(List<MonitorQuery> updates) throws IOException {
queryIndex.commit(updates);
listeners.forEach(l -> l.afterUpdate(updates));
}
/**
@ -213,7 +193,6 @@ public class Monitor implements Closeable {
*/
public void deleteById(List<String> queryIds) throws IOException {
queryIndex.deleteQueries(queryIds);
listeners.forEach(l -> l.afterDelete(queryIds));
}
/**
@ -233,7 +212,6 @@ public class Monitor implements Closeable {
*/
public void clear() throws IOException {
queryIndex.clear();
listeners.forEach(MonitorUpdateListener::afterClear);
}
/**
@ -287,7 +265,7 @@ public class Monitor implements Closeable {
}
/** @return the number of queries (after decomposition) stored in this Monitor */
public int getDisjunctCount() {
public int getDisjunctCount() throws IOException {
return queryIndex.numDocs();
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.IOSupplier;
/** Encapsulates various configuration settings for a Monitor's query index */
public class MonitorConfiguration {
@ -35,8 +36,9 @@ public class MonitorConfiguration {
private long purgeFrequency = 5;
private TimeUnit purgeFrequencyUnits = TimeUnit.MINUTES;
private QueryDecomposer queryDecomposer = new QueryDecomposer();
private Path indexPath = null;
private MonitorQuerySerializer serializer;
private boolean readOnly = false;
private IOSupplier<Directory> directoryProvider = () -> new ByteBuffersDirectory();
private static IndexWriterConfig defaultIndexWriterConfig() {
IndexWriterConfig iwc = new IndexWriterConfig(new KeywordAnalyzer());
@ -47,16 +49,49 @@ public class MonitorConfiguration {
return iwc;
}
public MonitorConfiguration setIndexPath(Path indexPath, MonitorQuerySerializer serializer) {
this.indexPath = indexPath;
public boolean isReadOnly() {
return readOnly;
}
public IOSupplier<Directory> getDirectoryProvider() {
return directoryProvider;
}
/**
* Sets a custom directory, with a custom serializer.
*
* <p>You have also the chance to configure the Monitor as read-only.
*
* @param directoryProvider lambda to provide the index Directory implementation
* @param serializer the serializer used to store the queries
* @param readOnly set the monitor as read-only
* @return MonitorCOnfiguration
*/
public MonitorConfiguration setDirectoryProvider(
IOSupplier<Directory> directoryProvider,
MonitorQuerySerializer serializer,
Boolean readOnly) {
this.directoryProvider = directoryProvider;
this.serializer = serializer;
this.readOnly = readOnly;
return this;
}
public MonitorConfiguration setDirectoryProvider(
IOSupplier<Directory> directoryProvider, MonitorQuerySerializer serializer) {
this.directoryProvider = directoryProvider;
this.serializer = serializer;
return this;
}
public MonitorConfiguration setIndexPath(Path indexPath, MonitorQuerySerializer serializer) {
this.serializer = serializer;
this.directoryProvider = () -> FSDirectory.open(indexPath);
return this;
}
public IndexWriter buildIndexWriter() throws IOException {
Directory directory =
indexPath == null ? new ByteBuffersDirectory() : FSDirectory.open(indexPath);
return new IndexWriter(directory, getIndexWriterConfig());
return new IndexWriter(directoryProvider.get(), getIndexWriterConfig());
}
protected IndexWriterConfig getIndexWriterConfig() {

View File

@ -19,233 +19,32 @@ package org.apache.lucene.monitor;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.*;
import java.util.function.BiPredicate;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.IOUtils;
class QueryIndex implements Closeable {
abstract class QueryIndex implements Closeable {
static final class FIELDS {
static final String query_id = "_query_id";
static final String cache_id = "_cache_id";
static final String mq = "_mq";
}
private final IndexWriter writer;
private final SearcherManager manager;
private final QueryDecomposer decomposer;
private final MonitorQuerySerializer serializer;
private final Presearcher presearcher;
/* Used to cache updates while a purge is ongoing */
private volatile Map<String, QueryCacheEntry> purgeCache = null;
/* Used to lock around the creation of the purgeCache */
private final ReadWriteLock purgeLock = new ReentrantReadWriteLock();
private final Object commitLock = new Object();
/* The current query cache */
private volatile ConcurrentMap<String, QueryCacheEntry> queries = new ConcurrentHashMap<>();
// NB this is not final because it can be replaced by purgeCache()
protected SearcherManager manager;
protected QueryDecomposer decomposer;
protected MonitorQuerySerializer serializer;
// package-private for testing
final Map<IndexReader.CacheKey, QueryTermFilter> termFilters = new HashMap<>();
QueryIndex(MonitorConfiguration config, Presearcher presearcher) throws IOException {
this.writer = config.buildIndexWriter();
this.manager = new SearcherManager(writer, true, true, new TermsHashBuilder());
this.decomposer = config.getQueryDecomposer();
this.serializer = config.getQuerySerializer();
this.presearcher = presearcher;
populateQueryCache(serializer, decomposer);
}
protected final List<MonitorUpdateListener> listeners = new ArrayList<>();
private void populateQueryCache(MonitorQuerySerializer serializer, QueryDecomposer decomposer)
throws IOException {
if (serializer == null) {
// No query serialization happening here - check that the cache is empty
IndexSearcher searcher = manager.acquire();
try {
if (searcher.count(new MatchAllDocsQuery()) != 0) {
throw new IllegalStateException(
"Attempting to open a non-empty monitor query index with no MonitorQuerySerializer");
}
} finally {
manager.release(searcher);
}
return;
}
Set<String> ids = new HashSet<>();
List<Exception> errors = new ArrayList<>();
purgeCache(
newCache ->
scan(
(id, cacheEntry, dataValues) -> {
if (ids.contains(id)) {
// this is a branch of a query that has already been reconstructed, but
// then split by decomposition - we don't need to parse it again
return;
}
ids.add(id);
try {
MonitorQuery mq = serializer.deserialize(dataValues.mq.binaryValue());
for (QueryCacheEntry entry : QueryCacheEntry.decompose(mq, decomposer)) {
newCache.put(entry.cacheId, entry);
}
} catch (Exception e) {
errors.add(e);
}
}));
if (errors.size() > 0) {
IllegalStateException e =
new IllegalStateException("Couldn't parse some queries from the index");
for (Exception parseError : errors) {
e.addSuppressed(parseError);
}
throw e;
}
}
abstract void commit(List<MonitorQuery> updates) throws IOException;
private class TermsHashBuilder extends SearcherFactory {
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
searcher.setQueryCache(null);
termFilters.put(reader.getReaderCacheHelper().getKey(), new QueryTermFilter(reader));
reader.getReaderCacheHelper().addClosedListener(termFilters::remove);
return searcher;
}
}
void commit(List<MonitorQuery> updates) throws IOException {
List<Indexable> indexables = buildIndexables(updates);
synchronized (commitLock) {
purgeLock.readLock().lock();
try {
if (indexables.size() > 0) {
Set<String> ids = new HashSet<>();
for (Indexable update : indexables) {
ids.add(update.queryCacheEntry.queryId);
}
for (String id : ids) {
writer.deleteDocuments(new Term(FIELDS.query_id, id));
}
for (Indexable update : indexables) {
this.queries.put(update.queryCacheEntry.cacheId, update.queryCacheEntry);
writer.addDocument(update.document);
if (purgeCache != null)
purgeCache.put(update.queryCacheEntry.cacheId, update.queryCacheEntry);
}
}
writer.commit();
manager.maybeRefresh();
} finally {
purgeLock.readLock().unlock();
}
}
}
private static class Indexable {
final QueryCacheEntry queryCacheEntry;
final Document document;
private Indexable(QueryCacheEntry queryCacheEntry, Document document) {
this.queryCacheEntry = queryCacheEntry;
this.document = document;
}
}
private static final BytesRef EMPTY = new BytesRef();
private List<Indexable> buildIndexables(List<MonitorQuery> updates) {
List<Indexable> indexables = new ArrayList<>();
for (MonitorQuery mq : updates) {
if (serializer != null && mq.getQueryString() == null) {
throw new IllegalArgumentException(
"Cannot add a MonitorQuery with a null string representation to a non-ephemeral Monitor");
}
BytesRef serialized = serializer == null ? EMPTY : serializer.serialize(mq);
for (QueryCacheEntry qce : QueryCacheEntry.decompose(mq, decomposer)) {
Document doc = presearcher.indexQuery(qce.matchQuery, mq.getMetadata());
doc.add(new StringField(FIELDS.query_id, qce.queryId, Field.Store.NO));
doc.add(new SortedDocValuesField(FIELDS.cache_id, new BytesRef(qce.cacheId)));
doc.add(new SortedDocValuesField(FIELDS.query_id, new BytesRef(qce.queryId)));
doc.add(new BinaryDocValuesField(FIELDS.mq, serialized));
indexables.add(new Indexable(qce, doc));
}
}
return indexables;
}
interface QueryBuilder {
Query buildQuery(BiPredicate<String, BytesRef> termAcceptor) throws IOException;
}
static class QueryTermFilter implements BiPredicate<String, BytesRef> {
private final Map<String, BytesRefHash> termsHash = new HashMap<>();
QueryTermFilter(IndexReader reader) throws IOException {
for (LeafReaderContext ctx : reader.leaves()) {
for (FieldInfo fi : ctx.reader().getFieldInfos()) {
BytesRefHash terms = termsHash.computeIfAbsent(fi.name, f -> new BytesRefHash());
Terms t = Terms.getTerms(ctx.reader(), fi.name);
TermsEnum te = t.iterator();
BytesRef term;
while ((term = te.next()) != null) {
terms.add(term);
}
}
}
}
@Override
public boolean test(String field, BytesRef term) {
BytesRefHash bytes = termsHash.get(field);
if (bytes == null) {
return false;
}
return bytes.find(term) != -1;
}
}
MonitorQuery getQuery(String queryId) throws IOException {
public MonitorQuery getQuery(String queryId) throws IOException {
if (serializer == null) {
throw new IllegalStateException(
"Cannot get queries from an index with no MonitorQuerySerializer");
@ -257,7 +56,7 @@ class QueryIndex implements Closeable {
return serializer.deserialize(bytesHolder[0]);
}
void scan(QueryCollector matcher) throws IOException {
public void scan(QueryCollector matcher) throws IOException {
search(new MatchAllDocsQuery(), matcher);
}
@ -266,122 +65,27 @@ class QueryIndex implements Closeable {
return search(builder, matcher);
}
long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException {
IndexSearcher searcher = null;
try {
Map<String, QueryCacheEntry> queries;
abstract long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException;
purgeLock.readLock().lock();
try {
searcher = manager.acquire();
queries = this.queries;
} finally {
purgeLock.readLock().unlock();
}
public abstract void purgeCache() throws IOException;
MonitorQueryCollector collector = new MonitorQueryCollector(queries, matcher);
long buildTime = System.nanoTime();
Query query =
queryBuilder.buildQuery(
termFilters.get(searcher.getIndexReader().getReaderCacheHelper().getKey()));
buildTime = System.nanoTime() - buildTime;
searcher.search(query, collector);
return buildTime;
} finally {
if (searcher != null) {
manager.release(searcher);
}
}
abstract void purgeCache(CachePopulator populator) throws IOException;
abstract int numDocs() throws IOException;
public abstract int cacheSize();
abstract void deleteQueries(List<String> ids) throws IOException;
abstract void clear() throws IOException;
public abstract long getLastPurged();
public void addListener(MonitorUpdateListener listener) {
listeners.add(listener);
}
interface CachePopulator {
void populateCacheWithIndex(Map<String, QueryCacheEntry> newCache) throws IOException;
}
void purgeCache() throws IOException {
purgeCache(
newCache ->
scan(
(id, query, dataValues) -> {
if (query != null) newCache.put(query.cacheId, query);
}));
}
/**
* Remove unused queries from the query cache.
*
* <p>This is normally called from a background thread at a rate set by configurePurgeFrequency().
*
* @throws IOException on IO errors
*/
private synchronized void purgeCache(CachePopulator populator) throws IOException {
// Note on implementation
// The purge works by scanning the query index and creating a new query cache populated
// for each query in the index. When the scan is complete, the old query cache is swapped
// for the new, allowing it to be garbage-collected.
// In order to not drop cached queries that have been added while a purge is ongoing,
// we use a ReadWriteLock to guard the creation and removal of an register log. Commits take
// the read lock. If the register log has been created, then a purge is ongoing, and queries
// are added to the register log within the read lock guard.
// The purge takes the write lock when creating the register log, and then when swapping out
// the old query cache. Within the second write lock guard, the contents of the register log
// are added to the new query cache, and the register log itself is removed.
final ConcurrentMap<String, QueryCacheEntry> newCache = new ConcurrentHashMap<>();
purgeLock.writeLock().lock();
try {
purgeCache = new ConcurrentHashMap<>();
} finally {
purgeLock.writeLock().unlock();
}
populator.populateCacheWithIndex(newCache);
purgeLock.writeLock().lock();
try {
newCache.putAll(purgeCache);
purgeCache = null;
queries = newCache;
} finally {
purgeLock.writeLock().unlock();
}
}
// ---------------------------------------------
// Proxy trivial operations...
// ---------------------------------------------
@Override
public void close() throws IOException {
IOUtils.close(manager, writer, writer.getDirectory());
}
int numDocs() {
return writer.getDocStats().numDocs;
}
int cacheSize() {
return queries.size();
}
void deleteQueries(Iterable<String> ids) throws IOException {
for (String id : ids) {
writer.deleteDocuments(new Term(FIELDS.query_id, id));
}
commit(Collections.emptyList());
}
void clear() throws IOException {
writer.deleteAll();
commit(Collections.emptyList());
}
interface QueryCollector {
public interface QueryCollector {
void matchQuery(String id, QueryCacheEntry query, DataValues dataValues) throws IOException;
@ -390,11 +94,15 @@ class QueryIndex implements Closeable {
}
}
// ---------------------------------------------
// Helper classes...
// ---------------------------------------------
interface QueryBuilder {
Query buildQuery(BiPredicate<String, BytesRef> termAcceptor) throws IOException;
}
static final class DataValues {
interface CachePopulator {
void populateCacheWithIndex(Map<String, QueryCacheEntry> newCache) throws IOException;
}
public static final class DataValues {
SortedDocValues queryId;
SortedDocValues cacheId;
BinaryDocValues mq;
@ -411,43 +119,33 @@ class QueryIndex implements Closeable {
}
}
/** A Collector that decodes the stored query for each document hit. */
static final class MonitorQueryCollector extends SimpleCollector {
static class QueryTermFilter implements BiPredicate<String, BytesRef> {
private final Map<String, QueryCacheEntry> queries;
private final QueryCollector matcher;
private final DataValues dataValues = new DataValues();
private final Map<String, BytesRefHash> termsHash = new HashMap<>();
MonitorQueryCollector(Map<String, QueryCacheEntry> queries, QueryCollector matcher) {
this.queries = queries;
this.matcher = matcher;
QueryTermFilter(IndexReader reader) throws IOException {
for (LeafReaderContext ctx : reader.leaves()) {
for (FieldInfo fi : ctx.reader().getFieldInfos()) {
BytesRefHash terms = termsHash.computeIfAbsent(fi.name, f -> new BytesRefHash());
Terms t = ctx.reader().terms(fi.name);
if (t != null) {
TermsEnum te = t.iterator();
BytesRef term;
while ((term = te.next()) != null) {
terms.add(term);
}
}
}
}
}
@Override
public void setScorer(Scorable scorer) {
this.dataValues.scorer = scorer;
}
@Override
public void collect(int doc) throws IOException {
dataValues.advanceTo(doc);
BytesRef cache_id = dataValues.cacheId.lookupOrd(dataValues.cacheId.ordValue());
BytesRef query_id = dataValues.queryId.lookupOrd(dataValues.queryId.ordValue());
QueryCacheEntry query = queries.get(cache_id.utf8ToString());
matcher.matchQuery(query_id.utf8ToString(), query, dataValues);
}
@Override
public void doSetNextReader(LeafReaderContext context) throws IOException {
this.dataValues.cacheId = context.reader().getSortedDocValues(FIELDS.cache_id);
this.dataValues.queryId = context.reader().getSortedDocValues(FIELDS.query_id);
this.dataValues.mq = context.reader().getBinaryDocValues(FIELDS.mq);
this.dataValues.ctx = context;
}
@Override
public ScoreMode scoreMode() {
return matcher.scoreMode();
public boolean test(String field, BytesRef term) {
BytesRefHash bytes = termsHash.get(field);
if (bytes == null) {
return false;
}
return bytes.find(term) != -1;
}
}
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.monitor;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.NamedThreadFactory;
class ReadonlyQueryIndex extends QueryIndex {
private final ScheduledExecutorService refreshExecutor;
public ReadonlyQueryIndex(MonitorConfiguration configuration) throws IOException {
if (configuration.getDirectoryProvider() == null) {
throw new IllegalStateException(
"You must specify a Directory when configuring a Monitor as read-only.");
}
Directory directory = configuration.getDirectoryProvider().get();
this.manager = new SearcherManager(directory, new TermsHashBuilder(termFilters));
this.decomposer = configuration.getQueryDecomposer();
this.serializer = configuration.getQuerySerializer();
this.refreshExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge"));
long refreshFrequency = configuration.getPurgeFrequency();
this.refreshExecutor.scheduleAtFixedRate(
() -> {
try {
this.purgeCache();
} catch (IOException e) {
listeners.forEach(l -> l.onPurgeError(e));
}
},
refreshFrequency,
refreshFrequency,
configuration.getPurgeFrequencyUnits());
}
@Override
public void commit(List<MonitorQuery> updates) throws IOException {
throw new UnsupportedOperationException("Monitor is readOnly cannot commit");
}
@Override
public long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException {
IndexSearcher searcher = null;
try {
searcher = manager.acquire();
LazyMonitorQueryCollector collector =
new LazyMonitorQueryCollector(matcher, serializer, decomposer);
long buildTime = System.nanoTime();
Query query =
queryBuilder.buildQuery(
termFilters.get(searcher.getIndexReader().getReaderCacheHelper().getKey()));
buildTime = System.nanoTime() - buildTime;
searcher.search(query, collector);
return buildTime;
} finally {
if (searcher != null) {
manager.release(searcher);
}
}
}
@Override
public void purgeCache() throws IOException {
manager.maybeRefresh();
listeners.forEach(MonitorUpdateListener::onPurge);
}
@Override
void purgeCache(CachePopulator populator) {
throw new UnsupportedOperationException("Monitor is readOnly, it has no cache");
}
@Override
public void close() throws IOException {
refreshExecutor.shutdown();
IOUtils.close(manager);
}
@Override
public int numDocs() throws IOException {
IndexSearcher searcher = null;
int numDocs;
try {
searcher = manager.acquire();
numDocs = searcher.getIndexReader().numDocs();
} finally {
if (searcher != null) {
manager.release(searcher);
}
}
return numDocs;
}
@Override
public int cacheSize() {
return -1;
}
@Override
public void deleteQueries(List<String> ids) throws IOException {
throw new UnsupportedOperationException("Monitor is readOnly cannot delete queries");
}
@Override
public void clear() throws IOException {
throw new UnsupportedOperationException("Monitor is readOnly cannot clear");
}
@Override
public long getLastPurged() {
return -1;
}
// ---------------------------------------------
// Helper classes...
// ---------------------------------------------
/** A Collector that decodes the stored query for each document hit reparsing them everytime. */
static final class LazyMonitorQueryCollector extends SimpleCollector {
private final QueryIndex.QueryCollector matcher;
private final QueryIndex.DataValues dataValues = new QueryIndex.DataValues();
private final MonitorQuerySerializer serializer;
private final QueryDecomposer decomposer;
LazyMonitorQueryCollector(
QueryIndex.QueryCollector matcher,
MonitorQuerySerializer serializer,
QueryDecomposer decomposer) {
this.matcher = matcher;
this.serializer = serializer;
this.decomposer = decomposer;
}
@Override
public void setScorer(Scorable scorer) {
this.dataValues.scorer = scorer;
}
@Override
public void collect(int doc) throws IOException {
dataValues.advanceTo(doc);
BytesRef cache_id = dataValues.cacheId.lookupOrd(dataValues.cacheId.ordValue());
BytesRef query_id = dataValues.queryId.lookupOrd(dataValues.queryId.ordValue());
MonitorQuery mq = serializer.deserialize(dataValues.mq.binaryValue());
QueryCacheEntry query =
QueryCacheEntry.decompose(mq, decomposer).stream()
.filter(queryCacheEntry -> queryCacheEntry.cacheId.equals(cache_id.utf8ToString()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Cached queries not found"));
matcher.matchQuery(query_id.utf8ToString(), query, dataValues);
}
@Override
public void doSetNextReader(LeafReaderContext context) throws IOException {
this.dataValues.cacheId = context.reader().getSortedDocValues(QueryIndex.FIELDS.cache_id);
this.dataValues.queryId = context.reader().getSortedDocValues(QueryIndex.FIELDS.query_id);
this.dataValues.mq = context.reader().getBinaryDocValues(QueryIndex.FIELDS.mq);
this.dataValues.ctx = context;
}
@Override
public ScoreMode scoreMode() {
return matcher.scoreMode();
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.monitor;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
class TermsHashBuilder extends SearcherFactory {
private final Map<IndexReader.CacheKey, QueryIndex.QueryTermFilter> termFilters;
TermsHashBuilder(Map<IndexReader.CacheKey, QueryIndex.QueryTermFilter> termFilters) {
this.termFilters = termFilters;
}
@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
throws IOException {
IndexSearcher searcher = super.newSearcher(reader, previousReader);
searcher.setQueryCache(null);
termFilters.put(reader.getReaderCacheHelper().getKey(), new QueryIndex.QueryTermFilter(reader));
reader.getReaderCacheHelper().addClosedListener(termFilters::remove);
return searcher;
}
}

View File

@ -0,0 +1,367 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.monitor;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.document.*;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.NamedThreadFactory;
class WritableQueryIndex extends QueryIndex {
private final IndexWriter writer;
private final Presearcher presearcher;
/* Used to cache updates while a purge is ongoing */
private volatile Map<String, QueryCacheEntry> purgeCache = null;
/* Used to lock around the creation of the purgeCache */
private final ReadWriteLock purgeLock = new ReentrantReadWriteLock();
private final Object commitLock = new Object();
private final ScheduledExecutorService purgeExecutor;
/* The current query cache */
// NB this is not final because it can be replaced by purgeCache()
protected volatile ConcurrentMap<String, QueryCacheEntry> queries;
protected long lastPurged = -1;
WritableQueryIndex(MonitorConfiguration configuration, Presearcher presearcher)
throws IOException {
this.writer = configuration.buildIndexWriter();
this.queries = new ConcurrentHashMap<>();
this.manager = new SearcherManager(writer, true, true, new TermsHashBuilder(termFilters));
this.decomposer = configuration.getQueryDecomposer();
this.serializer = configuration.getQuerySerializer();
this.presearcher = presearcher;
populateQueryCache(serializer, decomposer);
long purgeFrequency = configuration.getPurgeFrequency();
this.purgeExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge"));
this.purgeExecutor.scheduleAtFixedRate(
() -> {
try {
purgeCache();
} catch (Throwable e) {
listeners.forEach(l -> l.onPurgeError(e));
}
},
purgeFrequency,
purgeFrequency,
configuration.getPurgeFrequencyUnits());
}
@Override
public void commit(List<MonitorQuery> updates) throws IOException {
commitWithoutNotify(updates);
listeners.forEach(l -> l.afterUpdate(updates));
}
private void commitWithoutNotify(List<MonitorQuery> updates) throws IOException {
List<Indexable> indexables = buildIndexables(updates);
synchronized (commitLock) {
purgeLock.readLock().lock();
try {
if (indexables.size() > 0) {
Set<String> ids = new HashSet<>();
for (Indexable update : indexables) {
ids.add(update.queryCacheEntry.queryId);
}
for (String id : ids) {
writer.deleteDocuments(new Term(FIELDS.query_id, id));
}
for (Indexable update : indexables) {
this.queries.put(update.queryCacheEntry.cacheId, update.queryCacheEntry);
writer.addDocument(update.document);
if (purgeCache != null)
purgeCache.put(update.queryCacheEntry.cacheId, update.queryCacheEntry);
}
}
writer.commit();
manager.maybeRefresh();
} finally {
purgeLock.readLock().unlock();
}
}
}
private static class Indexable {
final QueryCacheEntry queryCacheEntry;
final Document document;
private Indexable(QueryCacheEntry queryCacheEntry, Document document) {
this.queryCacheEntry = queryCacheEntry;
this.document = document;
}
}
private void populateQueryCache(MonitorQuerySerializer serializer, QueryDecomposer decomposer)
throws IOException {
if (serializer == null) {
// No query serialization happening here - check that the cache is empty
IndexSearcher searcher = manager.acquire();
try {
if (searcher.count(new MatchAllDocsQuery()) != 0) {
throw new IllegalStateException(
"Attempting to open a non-empty monitor query index with no MonitorQuerySerializer");
}
} finally {
manager.release(searcher);
}
return;
}
Set<String> ids = new HashSet<>();
List<Exception> errors = new ArrayList<>();
purgeCache(
newCache ->
scan(
(id, cacheEntry, dataValues) -> {
if (ids.contains(id)) {
// this is a branch of a query that has already been reconstructed, but
// then split by decomposition - we don't need to parse it again
return;
}
ids.add(id);
try {
MonitorQuery mq = serializer.deserialize(dataValues.mq.binaryValue());
for (QueryCacheEntry entry : QueryCacheEntry.decompose(mq, decomposer)) {
newCache.put(entry.cacheId, entry);
}
} catch (Exception e) {
errors.add(e);
}
}));
if (errors.size() > 0) {
IllegalStateException e =
new IllegalStateException("Couldn't parse some queries from the index");
for (Exception parseError : errors) {
e.addSuppressed(parseError);
}
throw e;
}
}
private static final BytesRef EMPTY = new BytesRef();
private List<Indexable> buildIndexables(List<MonitorQuery> updates) {
List<Indexable> indexables = new ArrayList<>();
for (MonitorQuery mq : updates) {
if (serializer != null && mq.getQueryString() == null) {
throw new IllegalArgumentException(
"Cannot add a MonitorQuery with a null string representation to a non-ephemeral Monitor");
}
BytesRef serialized = serializer == null ? EMPTY : serializer.serialize(mq);
for (QueryCacheEntry qce : QueryCacheEntry.decompose(mq, decomposer)) {
Document doc = presearcher.indexQuery(qce.matchQuery, mq.getMetadata());
doc.add(new StringField(FIELDS.query_id, qce.queryId, Field.Store.NO));
doc.add(new SortedDocValuesField(FIELDS.cache_id, new BytesRef(qce.cacheId)));
doc.add(new SortedDocValuesField(FIELDS.query_id, new BytesRef(qce.queryId)));
doc.add(new BinaryDocValuesField(FIELDS.mq, serialized));
indexables.add(new Indexable(qce, doc));
}
}
return indexables;
}
@Override
public long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException {
IndexSearcher searcher = null;
try {
Map<String, QueryCacheEntry> queries;
purgeLock.readLock().lock();
try {
searcher = manager.acquire();
queries = this.queries;
} finally {
purgeLock.readLock().unlock();
}
MonitorQueryCollector collector = new MonitorQueryCollector(queries, matcher);
long buildTime = System.nanoTime();
Query query =
queryBuilder.buildQuery(
termFilters.get(searcher.getIndexReader().getReaderCacheHelper().getKey()));
buildTime = System.nanoTime() - buildTime;
searcher.search(query, collector);
return buildTime;
} finally {
if (searcher != null) {
manager.release(searcher);
}
}
}
@Override
public void purgeCache() throws IOException {
purgeCache(
newCache ->
scan(
(id, query, dataValues) -> {
if (query != null) newCache.put(query.cacheId, query);
}));
lastPurged = System.nanoTime();
listeners.forEach(MonitorUpdateListener::onPurge);
}
@Override
/**
* Remove unused queries from the query cache.
*
* <p>This is normally called from a background thread at a rate set by configurePurgeFrequency().
*
* @throws IOException on IO errors
*/
synchronized void purgeCache(CachePopulator populator) throws IOException {
// Note on implementation
// The purge works by scanning the query index and creating a new query cache populated
// for each query in the index. When the scan is complete, the old query cache is swapped
// for the new, allowing it to be garbage-collected.
// In order to not drop cached queries that have been added while a purge is ongoing,
// we use a ReadWriteLock to guard the creation and removal of an register log. Commits take
// the read lock. If the register log has been created, then a purge is ongoing, and queries
// are added to the register log within the read lock guard.
// The purge takes the write lock when creating the register log, and then when swapping out
// the old query cache. Within the second write lock guard, the contents of the register log
// are added to the new query cache, and the register log itself is removed.
final ConcurrentMap<String, QueryCacheEntry> newCache = new ConcurrentHashMap<>();
purgeLock.writeLock().lock();
try {
purgeCache = new ConcurrentHashMap<>();
} finally {
purgeLock.writeLock().unlock();
}
populator.populateCacheWithIndex(newCache);
purgeLock.writeLock().lock();
try {
newCache.putAll(purgeCache);
purgeCache = null;
queries = newCache;
} finally {
purgeLock.writeLock().unlock();
}
}
// ---------------------------------------------
// Proxy trivial operations...
// ---------------------------------------------
@Override
public void close() throws IOException {
purgeExecutor.shutdown();
IOUtils.close(manager, writer, writer.getDirectory());
}
@Override
public int numDocs() throws IOException {
return writer.getDocStats().numDocs;
}
@Override
public int cacheSize() {
return queries.size();
}
@Override
public void deleteQueries(List<String> ids) throws IOException {
for (String id : ids) {
writer.deleteDocuments(new Term(FIELDS.query_id, id));
}
commitWithoutNotify(Collections.emptyList());
listeners.forEach(l -> l.afterDelete(ids));
}
@Override
public void clear() throws IOException {
writer.deleteAll();
commitWithoutNotify(Collections.emptyList());
listeners.forEach(MonitorUpdateListener::afterClear);
}
@Override
public long getLastPurged() {
return lastPurged;
}
// ---------------------------------------------
// Helper classes...
// ---------------------------------------------
/** A Collector that decodes the stored query for each document hit. */
static final class MonitorQueryCollector extends SimpleCollector {
private final Map<String, QueryCacheEntry> queries;
private final QueryCollector matcher;
private final DataValues dataValues = new DataValues();
MonitorQueryCollector(Map<String, QueryCacheEntry> queries, QueryCollector matcher) {
this.queries = queries;
this.matcher = matcher;
}
@Override
public void setScorer(Scorable scorer) {
this.dataValues.scorer = scorer;
}
@Override
public void collect(int doc) throws IOException {
dataValues.advanceTo(doc);
BytesRef cache_id = dataValues.cacheId.lookupOrd(dataValues.cacheId.ordValue());
BytesRef query_id = dataValues.queryId.lookupOrd(dataValues.queryId.ordValue());
QueryCacheEntry query = queries.get(cache_id.utf8ToString());
matcher.matchQuery(query_id.utf8ToString(), query, dataValues);
}
@Override
public void doSetNextReader(LeafReaderContext context) throws IOException {
this.dataValues.cacheId = context.reader().getSortedDocValues(FIELDS.cache_id);
this.dataValues.queryId = context.reader().getSortedDocValues(FIELDS.query_id);
this.dataValues.mq = context.reader().getBinaryDocValues(FIELDS.mq);
this.dataValues.ctx = context;
}
@Override
public ScoreMode scoreMode() {
return matcher.scoreMode();
}
}
}

View File

@ -155,7 +155,12 @@ public class TestCachePurging extends MonitorTestBase {
@Override
public void onPurge() {
// It can sometimes take a couple of purge runs to get everything in sync
if (monitor.getQueryCacheStats().cachedQueries == 99) latch.countDown();
try {
if (monitor.getQueryCacheStats().cachedQueries == 99) latch.countDown();
} catch (IOException e) {
// Ignore
throw new RuntimeException(e);
}
}
});

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.monitor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.FSDirectory;
import org.junit.Test;
public class TestMonitorReadonly extends MonitorTestBase {
private static final Analyzer ANALYZER = new WhitespaceAnalyzer();
@Test
public void testReadonlyMonitorThrowsOnInexistentIndex() {
Path indexDirectory = createTempDir();
MonitorConfiguration config =
new MonitorConfiguration()
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse),
true);
assertThrows(
IndexNotFoundException.class,
() -> {
new Monitor(ANALYZER, config);
});
}
@Test
public void testReadonlyMonitorThrowsWhenCallingWriteRequests() throws IOException {
Path indexDirectory = createTempDir();
MonitorConfiguration writeConfig =
new MonitorConfiguration()
.setIndexPath(
indexDirectory, MonitorQuerySerializer.fromParser(MonitorTestBase::parse));
// this will create the index
Monitor writeMonitor = new Monitor(ANALYZER, writeConfig);
writeMonitor.close();
MonitorConfiguration config =
new MonitorConfiguration()
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse),
true);
try (Monitor monitor = new Monitor(ANALYZER, config)) {
assertThrows(
UnsupportedOperationException.class,
() -> {
TermQuery query = new TermQuery(new Term(FIELD, "test"));
monitor.register(
new MonitorQuery("query1", query, query.toString(), Collections.emptyMap()));
});
assertThrows(
UnsupportedOperationException.class,
() -> {
monitor.deleteById("query1");
});
assertThrows(
UnsupportedOperationException.class,
() -> {
monitor.clear();
});
}
}
@Test
public void testSettingCustomDirectory() throws IOException {
Path indexDirectory = createTempDir();
Document doc = new Document();
doc.add(newTextField(FIELD, "This is a Foobar test document", Field.Store.NO));
MonitorConfiguration writeConfig =
new MonitorConfiguration()
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse));
try (Monitor writeMonitor = new Monitor(ANALYZER, writeConfig)) {
TermQuery query = new TermQuery(new Term(FIELD, "test"));
writeMonitor.register(
new MonitorQuery("query1", query, query.toString(), Collections.emptyMap()));
TermQuery query2 = new TermQuery(new Term(FIELD, "Foobar"));
writeMonitor.register(
new MonitorQuery("query2", query2, query.toString(), Collections.emptyMap()));
MatchingQueries<QueryMatch> matches = writeMonitor.match(doc, QueryMatch.SIMPLE_MATCHER);
assertNotNull(matches.getMatches());
assertEquals(2, matches.getMatchCount());
assertNotNull(matches.matches("query2"));
}
}
@Test
public void testMonitorReadOnlyCouldReadOnTheSameIndex() throws IOException {
Path indexDirectory = createTempDir();
Document doc = new Document();
doc.add(newTextField(FIELD, "This is a test document", Field.Store.NO));
MonitorConfiguration writeConfig =
new MonitorConfiguration()
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse));
try (Monitor writeMonitor = new Monitor(ANALYZER, writeConfig)) {
TermQuery query = new TermQuery(new Term(FIELD, "test"));
writeMonitor.register(
new MonitorQuery("query1", query, query.toString(), Collections.emptyMap()));
}
MonitorConfiguration readConfig =
new MonitorConfiguration()
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse),
true);
try (Monitor readMonitor1 = new Monitor(ANALYZER, readConfig)) {
MatchingQueries<QueryMatch> matches = readMonitor1.match(doc, QueryMatch.SIMPLE_MATCHER);
assertNotNull(matches.getMatches());
assertEquals(1, matches.getMatchCount());
assertNotNull(matches.matches("query1"));
}
try (Monitor readMonitor2 = new Monitor(ANALYZER, readConfig)) {
MatchingQueries<QueryMatch> matches = readMonitor2.match(doc, QueryMatch.SIMPLE_MATCHER);
assertNotNull(matches.getMatches());
assertEquals(1, matches.getMatchCount());
assertNotNull(matches.matches("query1"));
assertThrows(
UnsupportedOperationException.class,
() -> {
TermQuery query = new TermQuery(new Term(FIELD, "test"));
readMonitor2.register(
new MonitorQuery("query1", query, query.toString(), Collections.emptyMap()));
});
}
}
@Test
public void testReadonlyMonitorGetsRefreshed() throws IOException, InterruptedException {
Path indexDirectory = createTempDir();
Document doc = new Document();
doc.add(newTextField(FIELD, "This is a test document", Field.Store.NO));
MonitorConfiguration writeConfig =
new MonitorConfiguration()
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse));
try (Monitor writeMonitor = new Monitor(ANALYZER, writeConfig)) {
TermQuery query = new TermQuery(new Term(FIELD, "test"));
writeMonitor.register(
new MonitorQuery("query1", query, query.toString(), Collections.emptyMap()));
MonitorConfiguration readConfig =
new MonitorConfiguration()
.setPurgeFrequency(2, TimeUnit.SECONDS)
.setDirectoryProvider(
() -> FSDirectory.open(indexDirectory),
MonitorQuerySerializer.fromParser(MonitorTestBase::parse),
true);
try (Monitor readMonitor = new Monitor(ANALYZER, readConfig)) {
MatchingQueries<QueryMatch> matches = readMonitor.match(doc, QueryMatch.SIMPLE_MATCHER);
assertNotNull(matches.getMatches());
assertEquals(1, matches.getMatchCount());
assertNotNull(matches.matches("query1"));
TermQuery query2 = new TermQuery(new Term(FIELD, "test"));
writeMonitor.register(
new MonitorQuery("query2", query2, query2.toString(), Collections.emptyMap()));
// Index returns stale result until background refresh thread calls maybeRefresh
MatchingQueries<QueryMatch> matches2 = readMonitor.match(doc, QueryMatch.SIMPLE_MATCHER);
assertNotNull(matches2.getMatches());
assertEquals(1, matches2.getMatchCount());
CountDownLatch latch = new CountDownLatch(1);
readMonitor.addQueryIndexUpdateListener(
new MonitorUpdateListener() {
@Override
public void onPurge() {
latch.countDown();
}
});
assertTrue(
latch.await(readConfig.getPurgeFrequency() + 1, readConfig.getPurgeFrequencyUnits()));
// after frequency results are refreshed
MatchingQueries<QueryMatch> matches3 = readMonitor.match(doc, QueryMatch.SIMPLE_MATCHER);
assertNotNull(matches3.getMatches());
assertEquals(2, matches3.getMatchCount());
}
}
}
}

View File

@ -31,8 +31,8 @@ public class TestQueryTermFilter extends LuceneTestCase {
public void testFiltersAreRemoved() throws IOException {
try (QueryIndex qi =
new QueryIndex(new MonitorConfiguration(), new TermFilteredPresearcher())) {
try (WritableQueryIndex qi =
new WritableQueryIndex(new MonitorConfiguration(), new TermFilteredPresearcher())) {
qi.commit(
Collections.singletonList(new MonitorQuery("1", new TermQuery(new Term(FIELD, "term")))));
assertEquals(1, qi.termFilters.size());