mirror of https://github.com/apache/lucene.git
SOLR-8436: filters for realtime-get
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1721585 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3430150819
commit
c5e2b3508e
|
@ -121,6 +121,8 @@ New Features
|
|||
* SOLR-6398: Add IterativeMergeStrategy to support running Parallel Iterative Algorithms inside of Solr
|
||||
(Joel Bernstein)
|
||||
|
||||
* SOLR-8436: Real-time get now supports filters. (yonik)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
* SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been
|
||||
|
|
|
@ -27,10 +27,13 @@ import java.util.Map;
|
|||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.StorableField;
|
||||
import org.apache.lucene.index.StoredDocument;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
|
@ -44,6 +47,7 @@ import org.apache.solr.common.cloud.ClusterState;
|
|||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
@ -58,9 +62,11 @@ import org.apache.solr.response.transform.DocTransformer;
|
|||
import org.apache.solr.schema.FieldType;
|
||||
import org.apache.solr.schema.IndexSchema;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
import org.apache.solr.search.QParser;
|
||||
import org.apache.solr.search.ReturnFields;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.search.SolrReturnFields;
|
||||
import org.apache.solr.search.SyntaxError;
|
||||
import org.apache.solr.update.DocumentBuilder;
|
||||
import org.apache.solr.update.PeerSync;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
|
@ -131,6 +137,28 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
return;
|
||||
}
|
||||
|
||||
// parse any existing filters
|
||||
try {
|
||||
String[] fqs = req.getParams().getParams(CommonParams.FQ);
|
||||
if (fqs!=null && fqs.length!=0) {
|
||||
List<Query> filters = rb.getFilters();
|
||||
// if filters already exists, make a copy instead of modifying the original
|
||||
filters = filters == null ? new ArrayList<Query>(fqs.length) : new ArrayList<>(filters);
|
||||
for (String fq : fqs) {
|
||||
if (fq != null && fq.trim().length()!=0) {
|
||||
QParser fqp = QParser.getParser(fq, null, req);
|
||||
filters.add(fqp.getQuery());
|
||||
}
|
||||
}
|
||||
if (!filters.isEmpty()) {
|
||||
rb.setFilters( filters );
|
||||
}
|
||||
}
|
||||
} catch (SyntaxError e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
|
||||
}
|
||||
|
||||
|
||||
String[] allIds = id==null ? new String[0] : id;
|
||||
|
||||
if (ids != null) {
|
||||
|
@ -173,6 +201,20 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
|
||||
switch (oper) {
|
||||
case UpdateLog.ADD:
|
||||
|
||||
if (rb.getFilters() != null) {
|
||||
// we have filters, so we need to check those against the indexed form of the doc
|
||||
if (searcherHolder != null) {
|
||||
// close handles to current searchers
|
||||
searcher = null;
|
||||
searcherHolder.decref();
|
||||
searcherHolder = null;
|
||||
}
|
||||
ulog.openRealtimeSearcher(); // force open a new realtime searcher
|
||||
o = null; // pretend we never found this record and fall through to use the searcher
|
||||
break;
|
||||
}
|
||||
|
||||
SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), core.getLatestSchema());
|
||||
if(transformer!=null) {
|
||||
transformer.transform(doc, -1, 0); // unknown docID
|
||||
|
@ -184,7 +226,7 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
default:
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
|
||||
}
|
||||
continue;
|
||||
if (o != null) continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -194,9 +236,26 @@ public class RealTimeGetComponent extends SearchComponent
|
|||
searcher = searcherHolder.get();
|
||||
}
|
||||
|
||||
// SolrCore.verbose("RealTimeGet using searcher ", searcher);
|
||||
int docid = -1;
|
||||
long segAndId = searcher.lookupId(idBytes.get());
|
||||
if (segAndId >= 0) {
|
||||
int segid = (int) segAndId;
|
||||
LeafReaderContext ctx = searcher.getTopReaderContext().leaves().get((int) (segAndId >> 32));
|
||||
docid = segid + ctx.docBase;
|
||||
|
||||
if (rb.getFilters() != null) {
|
||||
for (Query q : rb.getFilters()) {
|
||||
Scorer scorer = searcher.createWeight(q, false).scorer(ctx);
|
||||
if (scorer == null || segid != scorer.iterator().advance(segid)) {
|
||||
// filter doesn't match.
|
||||
docid = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes.get()));
|
||||
if (docid < 0) continue;
|
||||
StoredDocument luceneDocument = searcher.doc(docid, rsp.getReturnFields().getLuceneFieldNames());
|
||||
SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
|
||||
|
|
|
@ -444,23 +444,10 @@ public class UpdateLog implements PluginInfoInitialized {
|
|||
}
|
||||
|
||||
} else {
|
||||
// replicate the deleteByQuery logic. See deleteByQuery for comments.
|
||||
|
||||
if (map != null) map.clear();
|
||||
if (prevMap != null) prevMap.clear();
|
||||
if (prevMap2 != null) prevMap2.clear();
|
||||
|
||||
try {
|
||||
RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
|
||||
holder.decref();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
|
||||
}
|
||||
|
||||
openRealtimeSearcher();
|
||||
if (trace) {
|
||||
log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " clearCaches=true");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -508,32 +495,33 @@ public class UpdateLog implements PluginInfoInitialized {
|
|||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
|
||||
// given that we just did a delete-by-query, we don't know what documents were
|
||||
// affected and hence we must purge our caches.
|
||||
if (map != null) map.clear();
|
||||
if (prevMap != null) prevMap.clear();
|
||||
if (prevMap2 != null) prevMap2.clear();
|
||||
|
||||
openRealtimeSearcher();
|
||||
trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
|
||||
|
||||
// oldDeletes.clear();
|
||||
|
||||
// We must cause a new IndexReader to be opened before anything looks at these caches again
|
||||
// so that a cache miss will read fresh data.
|
||||
//
|
||||
// TODO: FUTURE: open a new searcher lazily for better throughput with delete-by-query commands
|
||||
try {
|
||||
RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
|
||||
holder.decref();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
|
||||
if (trace) {
|
||||
LogPtr ptr = new LogPtr(pos, cmd.getVersion());
|
||||
log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Opens a new realtime searcher and clears the id caches */
|
||||
public void openRealtimeSearcher() {
|
||||
synchronized (this) {
|
||||
// We must cause a new IndexReader to be opened before anything looks at these caches again
|
||||
// so that a cache miss will read fresh data.
|
||||
try {
|
||||
RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
|
||||
holder.decref();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error opening realtime searcher", e);
|
||||
return;
|
||||
}
|
||||
|
||||
LogPtr ptr = new LogPtr(pos, cmd.getVersion());
|
||||
|
||||
if (trace) {
|
||||
log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
|
||||
}
|
||||
if (map != null) map.clear();
|
||||
if (prevMap != null) prevMap.clear();
|
||||
if (prevMap2 != null) prevMap2.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.solr.search;
|
|||
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.noggit.ObjectBuilder;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
|
@ -108,6 +110,64 @@ public class TestRealTimeGet extends TestRTGBase {
|
|||
);
|
||||
|
||||
|
||||
SolrQueryRequest req = req();
|
||||
RefCounted<SolrIndexSearcher> realtimeHolder = req.getCore().getRealtimeSearcher();
|
||||
|
||||
//
|
||||
// filters
|
||||
//
|
||||
assertU(adoc("id", "12"));
|
||||
assertU(adoc("id", "13"));
|
||||
|
||||
// this should not need to open another realtime searcher
|
||||
assertJQ(req("qt","/get","id","11", "fl","id", "fqX","id:11") // nocommit
|
||||
,"=={doc:{id:'11'}}"
|
||||
);
|
||||
|
||||
// assert that the same realtime searcher is still in effect (i.e. that we didn't
|
||||
// open a new searcher when we didn't have to).
|
||||
RefCounted<SolrIndexSearcher> realtimeHolder2 = req.getCore().getRealtimeSearcher();
|
||||
assertEquals(realtimeHolder.get(), realtimeHolder2.get()); // Autocommit could possibly cause this to fail?
|
||||
realtimeHolder2.decref();
|
||||
|
||||
// filter most likely different segment
|
||||
assertJQ(req("qt","/get","id","12", "fl","id", "fq","id:11")
|
||||
,"=={doc:null}"
|
||||
);
|
||||
|
||||
// filter most likely same different segment
|
||||
assertJQ(req("qt","/get","id","12", "fl","id", "fq","id:13")
|
||||
,"=={doc:null}"
|
||||
);
|
||||
|
||||
assertJQ(req("qt","/get","id","12", "fl","id", "fq","id:12")
|
||||
,"=={doc:{id:'12'}}"
|
||||
);
|
||||
|
||||
assertU(adoc("id", "14"));
|
||||
assertU(adoc("id", "15"));
|
||||
|
||||
// id list, with some in index and some not, first id from index. Also test mutiple fq params.
|
||||
assertJQ(req("qt","/get","ids","12,14,13,15", "fl","id", "fq","id:[10 TO 14]", "fq","id:[13 TO 19]")
|
||||
,"/response/docs==[{id:'14'},{id:'13'}]"
|
||||
);
|
||||
|
||||
assertU(adoc("id", "16"));
|
||||
assertU(adoc("id", "17"));
|
||||
|
||||
// id list, with some in index and some not, first id from tlog
|
||||
assertJQ(req("qt","/get","ids","17,16,15,14", "fl","id", "fq","id:[15 TO 16]")
|
||||
,"/response/docs==[{id:'16'},{id:'15'}]"
|
||||
);
|
||||
|
||||
// more complex filter
|
||||
assertJQ(req("qt","/get","ids","17,16,15,14", "fl","id", "fq","{!frange l=15 u=16}id")
|
||||
,"/response/docs==[{id:'16'},{id:'15'}]"
|
||||
);
|
||||
|
||||
realtimeHolder.decref();
|
||||
req.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -411,6 +471,7 @@ public class TestRealTimeGet extends TestRTGBase {
|
|||
final int deleteByQueryPercent = 1+random().nextInt(5);
|
||||
final int optimisticPercent = 1+random().nextInt(50); // percent change that an update uses optimistic locking
|
||||
final int optimisticCorrectPercent = 25+random().nextInt(70); // percent change that a version specified will be correct
|
||||
final int filteredGetPercent = random().nextInt( random().nextInt(20)+1 ); // percent of time that a get will be filtered... we normally don't want too high.
|
||||
final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
|
||||
int nWriteThreads = 5 + random().nextInt(25);
|
||||
|
||||
|
@ -640,9 +701,17 @@ public class TestRealTimeGet extends TestRTGBase {
|
|||
if (VERBOSE) {
|
||||
verbose("querying id", id);
|
||||
}
|
||||
|
||||
boolean filteredOut = false;
|
||||
SolrQueryRequest sreq;
|
||||
if (realTime) {
|
||||
sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
|
||||
ModifiableSolrParams p = params("wt","json", "qt","/get", "ids",Integer.toString(id));
|
||||
if (rand.nextInt(100) < filteredGetPercent) {
|
||||
int idToFilter = rand.nextBoolean() ? id : rand.nextInt(ndocs);
|
||||
filteredOut = idToFilter != id;
|
||||
p.add("fq", "id:"+idToFilter);
|
||||
}
|
||||
sreq = req(p);
|
||||
} else {
|
||||
sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
|
||||
}
|
||||
|
@ -652,11 +721,12 @@ public class TestRealTimeGet extends TestRTGBase {
|
|||
List doclist = (List)(((Map)rsp.get("response")).get("docs"));
|
||||
if (doclist.size() == 0) {
|
||||
// there's no info we can get back with a delete, so not much we can check without further synchronization
|
||||
// This is also correct when filteredOut==true
|
||||
} else {
|
||||
assertEquals(1, doclist.size());
|
||||
long foundVal = (Long)(((Map)doclist.get(0)).get(field));
|
||||
long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
|
||||
if (foundVal < Math.abs(info.val)
|
||||
if (filteredOut || foundVal < Math.abs(info.val)
|
||||
|| (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
|
||||
verbose("ERROR, id=", id, "found=",response,"model",info);
|
||||
assertTrue(false);
|
||||
|
|
Loading…
Reference in New Issue