SOLR-13749 Cross Collection Join Filter

This commit is contained in:
Gus Heck 2020-01-25 00:48:22 -05:00
parent 6501577eb5
commit dbe38b97f5
13 changed files with 1389 additions and 116 deletions

View File

@ -81,6 +81,8 @@ New Features
* SOLR-14130: Add postlogs command line tool for indexing Solr logs (Joel Bernstein)
* SOLR-13749: New cross collection join filter (XCJF) (Dan Fox, Kevin Watters, via Gus Heck)
Improvements
---------------------
* SOLR-14120: Define JavaScript methods 'includes' and 'startsWith' to ensure AdminUI can be displayed when using

View File

@ -29,6 +29,8 @@ import org.apache.solr.search.join.BlockJoinChildQParserPlugin;
import org.apache.solr.search.join.BlockJoinParentQParserPlugin;
import org.apache.solr.search.join.FiltersQParserPlugin;
import org.apache.solr.search.join.GraphQParserPlugin;
import org.apache.solr.search.join.HashRangeQParserPlugin;
import org.apache.solr.search.join.XCJFQParserPlugin;
import org.apache.solr.search.mlt.MLTQParserPlugin;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
@ -85,6 +87,8 @@ public abstract class QParserPlugin implements NamedListInitializedPlugin, SolrI
map.put(PayloadCheckQParserPlugin.NAME, new PayloadCheckQParserPlugin());
map.put(BoolQParserPlugin.NAME, new BoolQParserPlugin());
map.put(MinHashQParserPlugin.NAME, new MinHashQParserPlugin());
map.put(XCJFQParserPlugin.NAME, new XCJFQParserPlugin());
map.put(HashRangeQParserPlugin.NAME, new HashRangeQParserPlugin());
standardPlugins = Collections.unmodifiableMap(map);
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import org.apache.lucene.search.Query;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
import org.apache.solr.search.SyntaxError;
@SuppressWarnings("WeakerAccess")
public class HashRangeQParser extends QParser {
public static final String FIELD = "f";
public static final String LOWER_BOUND = "l";
public static final String UPPER_BOUND = "u";
public HashRangeQParser(String qstr, SolrParams localParams, SolrParams params, SolrQueryRequest req) {
super(qstr, localParams, params, req);
}
@Override
public Query parse() throws SyntaxError {
String field = localParams.get(FIELD);
int lower = localParams.getInt(LOWER_BOUND);
int upper = localParams.getInt(UPPER_BOUND);
return new HashRangeQuery(field, lower, upper);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QParserPlugin;
/**
* Matches documents where the specified field hashes to a value within the given range.
* <br>Can be used to create a filter that will only match documents falling within a certain shard's hash range.
*/
public class HashRangeQParserPlugin extends QParserPlugin {
public static final String NAME = "hash_range";
@Override
public QParser createParser(String qstr, SolrParams localParams, SolrParams params, SolrQueryRequest req) {
return new HashRangeQParser(qstr, localParams, params, req);
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.util.Hash;
import org.apache.solr.search.SolrCache;
import org.apache.solr.search.SolrIndexSearcher;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
public class HashRangeQuery extends Query {
protected final String field;
protected final int lower;
protected final int upper;
public static final String CACHE_KEY_PREFIX = "hash_";
public HashRangeQuery(String field, int lower, int upper) {
this.field = field;
this.lower = lower;
this.upper = upper;
}
@Override
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
return new ConstantScoreWeight(this, boost) {
@Override
public boolean isCacheable(LeafReaderContext context) {
return DocValues.isCacheable(context, field);
}
@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
SortedDocValues docValues = context.reader().getSortedDocValues(field);
int[] cache = getCache(context);
TwoPhaseIterator iterator = new TwoPhaseIterator(docValues) {
@Override
public boolean matches() throws IOException {
int hash = cache != null ? cache[docValues.docID()] : hash(docValues);
return hash >= lower && hash <= upper;
}
@Override
public float matchCost() {
return cache != null ? 2 : 100;
}
};
return new ConstantScoreScorer(this, boost, scoreMode, iterator);
}
private int[] getCache(LeafReaderContext context) throws IOException {
IndexReader.CacheHelper cacheHelper = context.reader().getReaderCacheHelper();
if (cacheHelper == null) {
return null;
}
@SuppressWarnings("unchecked")
final SolrCache<IndexReader.CacheKey, int[]> cache =
((SolrIndexSearcher) searcher).getCache(CACHE_KEY_PREFIX + field);
if (cache == null) {
return null;
}
IndexReader.CacheKey cacheKey = cacheHelper.getKey();
synchronized (cacheKey) {
int[] hashes = cache.get(cacheKey);
if (hashes == null) {
hashes = new int[context.reader().maxDoc()];
SortedDocValues docValues = context.reader().getSortedDocValues(field);
int doc;
while ((doc = docValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
hashes[doc] = hash(docValues);
}
cache.put(cacheKey, hashes);
}
return hashes;
}
}
private int hash(SortedDocValues docValues) throws IOException {
BytesRef bytesRef = docValues.binaryValue();
return Hash.murmurhash3_x86_32(bytesRef.bytes, bytesRef.offset, bytesRef.length, 0);
}
};
}
@Override
public void visit(QueryVisitor visitor) {
visitor.visitLeaf(this);
}
@Override
public String toString(String field) {
return String.format(Locale.ROOT, "{!hash_range f=%s l=%d u=%d}", this.field, lower, upper);
}
@Override
public boolean equals(Object other) {
return sameClassAs(other) &&
equalsTo(getClass().cast(other));
}
private boolean equalsTo(HashRangeQuery other) {
return Objects.equals(field, other.field) &&
Objects.equals(lower, other.lower) &&
Objects.equals(upper, other.upper);
}
@Override
public int hashCode() {
final int prime = 31;
int result = classHash();
result = prime * result + Objects.hashCode(field);
result = prime * result + Objects.hashCode(lower);
result = prime * result + Objects.hashCode(upper);
return result;
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import org.apache.lucene.search.Query;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QueryParsing;
import org.apache.solr.search.SyntaxError;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@SuppressWarnings("WeakerAccess")
public class XCJFQParser extends QParser {
public static final String ZK_HOST = "zkHost";
public static final String SOLR_URL = "solrUrl";
public static final String COLLECTION = "collection";
public static final String FROM = "from";
public static final String TO = "to";
public static final String ROUTED_BY_JOIN_KEY = "routed";
public static final String TTL = "ttl";
public static final int TTL_DEFAULT = 60 * 60; // in seconds
private static final Set<String> OWN_PARAMS = new HashSet<>(Arrays.asList(
QueryParsing.TYPE, QueryParsing.V, ZK_HOST, SOLR_URL, COLLECTION, FROM, TO, ROUTED_BY_JOIN_KEY, TTL));
private final String routerField;
private final Set<String> solrUrlWhitelist;
public XCJFQParser(String qstr, SolrParams localParams, SolrParams params, SolrQueryRequest req, String routerField, Set<String> solrUrlWhiteList) {
super(qstr, localParams, params, req);
this.routerField = routerField;
// If specified in the config, this will limit which solr url's the parser can connect to.
this.solrUrlWhitelist = solrUrlWhiteList;
}
@Override
public Query parse() throws SyntaxError {
String query = localParams.get(QueryParsing.V);
String zkHost = localParams.get(ZK_HOST);
String solrUrl = localParams.get(SOLR_URL);
// Test if this is a valid solr url.
if (solrUrl != null) {
if (solrUrlWhitelist == null) {
throw new SyntaxError("White list must be configured to use solrUrl parameter.");
}
if (!solrUrlWhitelist.contains(solrUrl)) {
throw new SyntaxError("Solr Url was not in the whitelist. Please check your configuration.");
}
}
String collection = localParams.get(COLLECTION);
String fromField = localParams.get(FROM);
String toField = localParams.get(TO);
boolean routedByJoinKey = localParams.getBool(ROUTED_BY_JOIN_KEY, toField.equals(routerField));
int ttl = localParams.getInt(TTL, TTL_DEFAULT);
ModifiableSolrParams otherParams = new ModifiableSolrParams();
for (Iterator<String> it = localParams.getParameterNamesIterator(); it.hasNext(); ) {
String paramName = it.next();
if (!OWN_PARAMS.contains(paramName)) {
otherParams.set(paramName, localParams.getParams(paramName));
}
}
return new XCJFQuery(query, zkHost, solrUrl, collection, fromField, toField, routedByJoinKey, ttl, otherParams);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import java.util.HashSet;
import java.util.List;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QParserPlugin;
/**
* Cross-collection join filter. Runs a query against a remote Solr collection to obtain a
* set of join keys, then applies that set of join keys as a filter against the local collection.
* <br>Example: {!xcjf collection="remoteCollection" from="fromField" to="toField" v="*:*"}
*/
public class XCJFQParserPlugin extends QParserPlugin {
public static final String NAME = "xcjf";
private String routerField;
private HashSet<String> solrUrlWhitelist;
@Override
public QParser createParser(String qstr, SolrParams localParams, SolrParams params, SolrQueryRequest req) {
return new XCJFQParser(qstr, localParams, params, req, routerField, solrUrlWhitelist);
}
@Override
public void init(NamedList args) {
routerField = (String) args.get("routerField");
solrUrlWhitelist = new HashSet<>();
if (args.get("solrUrl") != null) {
//noinspection unchecked
for (String s : (List<String>) args.get("solrUrl")) {
if (!StringUtils.isEmpty(s))
solrUrlWhitelist.add(s);
}
} else {
solrUrlWhitelist = null;
}
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -0,0 +1,380 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.ConstantScoreScorer;
import org.apache.lucene.search.ConstantScoreWeight;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryVisitor;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.SolrStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.schema.FieldType;
import org.apache.solr.search.BitDocSet;
import org.apache.solr.search.DocSet;
import org.apache.solr.search.DocSetUtil;
import org.apache.solr.search.Filter;
import org.apache.solr.search.SolrIndexSearcher;
public class XCJFQuery extends Query {
protected final String query;
protected final String zkHost;
protected final String solrUrl;
protected final String collection;
protected final String fromField;
protected final String toField;
protected final boolean routedByJoinKey;
protected final long timestamp;
protected final int ttl;
protected SolrParams otherParams;
protected String otherParamsString;
public XCJFQuery(String query, String zkHost, String solrUrl, String collection, String fromField, String toField,
boolean routedByJoinKey, int ttl, SolrParams otherParams) {
this.query = query;
this.zkHost = zkHost;
this.solrUrl = solrUrl;
this.collection = collection;
this.fromField = fromField;
this.toField = toField;
this.routedByJoinKey = routedByJoinKey;
this.timestamp = System.nanoTime();
this.ttl = ttl;
this.otherParams = otherParams;
// SolrParams doesn't implement equals(), so use this string to compare them
if (otherParams != null) {
this.otherParamsString = otherParams.toString();
}
}
private interface JoinKeyCollector {
void collect(Object value) throws IOException;
DocSet getDocSet() throws IOException;
}
private class TermsJoinKeyCollector implements JoinKeyCollector {
FieldType fieldType;
SolrIndexSearcher searcher;
TermsEnum termsEnum;
BytesRefBuilder bytes;
PostingsEnum postingsEnum;
FixedBitSet bitSet;
public TermsJoinKeyCollector(FieldType fieldType, Terms terms, SolrIndexSearcher searcher) throws IOException {
this.fieldType = fieldType;
this.searcher = searcher;
termsEnum = terms.iterator();
bytes = new BytesRefBuilder();
bitSet = new FixedBitSet(searcher.maxDoc());
}
@Override
public void collect(Object value) throws IOException {
fieldType.readableToIndexed((String) value, bytes);
if (termsEnum.seekExact(bytes.get())) {
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
bitSet.or(postingsEnum);
}
}
@Override
public DocSet getDocSet() throws IOException {
if (searcher.getIndexReader().hasDeletions()) {
bitSet.and(searcher.getLiveDocSet().getBits());
}
return new BitDocSet(bitSet);
}
}
private class PointJoinKeyCollector extends GraphPointsCollector implements JoinKeyCollector {
SolrIndexSearcher searcher;
public PointJoinKeyCollector(SolrIndexSearcher searcher) {
super(searcher.getSchema().getField(toField), null, null);
this.searcher = searcher;
}
@Override
public void collect(Object value) throws IOException {
if (value instanceof Long || value instanceof Integer) {
set.add(((Number) value).longValue());
} else {
throw new UnsupportedOperationException("Unsupported field type for XCJFQuery");
}
}
@Override
public DocSet getDocSet() throws IOException {
Query query = getResultQuery(searcher.getSchema().getField(toField), false);
if (query == null) {
return DocSet.EMPTY;
}
return DocSetUtil.createDocSet(searcher, query, null);
}
}
private class XCJFQueryWeight extends ConstantScoreWeight {
private SolrIndexSearcher searcher;
private ScoreMode scoreMode;
private Filter filter;
public XCJFQueryWeight(SolrIndexSearcher searcher, ScoreMode scoreMode, float score) {
super(XCJFQuery.this, score);
this.scoreMode = scoreMode;
this.searcher = searcher;
}
private String createHashRangeFq() {
if (routedByJoinKey) {
ClusterState clusterState = searcher.getCore().getCoreContainer().getZkController().getClusterState();
CloudDescriptor desc = searcher.getCore().getCoreDescriptor().getCloudDescriptor();
Slice slice = clusterState.getCollection(desc.getCollectionName()).getSlicesMap().get(desc.getShardId());
DocRouter.Range range = slice.getRange();
// In CompositeIdRouter, the routing prefix only affects the top 16 bits
int min = range.min & 0xffff0000;
int max = range.max | 0x0000ffff;
return String.format(Locale.ROOT, "{!hash_range f=%s l=%d u=%d}", fromField, min, max);
} else {
return null;
}
}
private TupleStream createCloudSolrStream(SolrClientCache solrClientCache) throws IOException {
String streamZkHost;
if (zkHost != null) {
streamZkHost = zkHost;
} else {
streamZkHost = searcher.getCore().getCoreContainer().getZkController().getZkServerAddress();
}
ModifiableSolrParams params = new ModifiableSolrParams(otherParams);
params.set(CommonParams.Q, query);
String fq = createHashRangeFq();
if (fq != null) {
params.add(CommonParams.FQ, fq);
}
params.set(CommonParams.FL, fromField);
params.set(CommonParams.SORT, fromField + " asc");
params.set(CommonParams.QT, "/export");
params.set(CommonParams.WT, CommonParams.JAVABIN);
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);
TupleStream cloudSolrStream = new CloudSolrStream(streamZkHost, collection, params);
TupleStream uniqueStream = new UniqueStream(cloudSolrStream, new FieldEqualitor(fromField));
uniqueStream.setStreamContext(streamContext);
return uniqueStream;
}
private TupleStream createSolrStream() {
StreamExpression searchExpr = new StreamExpression("search")
.withParameter(collection)
.withParameter(new StreamExpressionNamedParameter(CommonParams.Q, query));
String fq = createHashRangeFq();
if (fq != null) {
searchExpr.withParameter(new StreamExpressionNamedParameter(CommonParams.FQ, fq));
}
searchExpr.withParameter(new StreamExpressionNamedParameter(CommonParams.FL, fromField))
.withParameter(new StreamExpressionNamedParameter(CommonParams.SORT, fromField + " asc"))
.withParameter(new StreamExpressionNamedParameter(CommonParams.QT, "/export"));
for (Map.Entry<String,String[]> entry : otherParams) {
for (String value : entry.getValue()) {
searchExpr.withParameter(new StreamExpressionNamedParameter(entry.getKey(), value));
}
}
StreamExpression uniqueExpr = new StreamExpression("unique");
uniqueExpr.withParameter(searchExpr)
.withParameter(new StreamExpressionNamedParameter("over", fromField));
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("expr", uniqueExpr.toString());
params.set(CommonParams.QT, "/stream");
params.set(CommonParams.WT, CommonParams.JAVABIN);
return new SolrStream(solrUrl + "/" + collection, params);
}
private DocSet getDocSet() throws IOException {
SolrClientCache solrClientCache = new SolrClientCache();
TupleStream solrStream;
if (zkHost != null || solrUrl == null) {
solrStream = createCloudSolrStream(solrClientCache);
} else {
solrStream = createSolrStream();
}
FieldType fieldType = searcher.getSchema().getFieldType(toField);
JoinKeyCollector collector;
if (fieldType.isPointField()) {
collector = new PointJoinKeyCollector(searcher);
} else {
Terms terms = searcher.getSlowAtomicReader().terms(toField);
if (terms == null) {
return DocSet.EMPTY;
}
collector = new TermsJoinKeyCollector(fieldType, terms, searcher);
}
try {
solrStream.open();
while (true) {
Tuple tuple = solrStream.read();
if (tuple.EXCEPTION) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, tuple.getException());
}
if (tuple.EOF) {
break;
}
Object value = tuple.get(fromField);
collector.collect(value);
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally {
solrStream.close();
solrClientCache.close();
}
return collector.getDocSet();
}
@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
if (filter == null) {
filter = getDocSet().getTopFilter();
}
DocIdSet readerSet = filter.getDocIdSet(context, null);
if (readerSet == null) {
return null;
}
DocIdSetIterator readerSetIterator = readerSet.iterator();
if (readerSetIterator == null) {
return null;
}
return new ConstantScoreScorer(this, score(), scoreMode, readerSetIterator);
}
@Override
public boolean isCacheable(LeafReaderContext ctx) {
return false;
}
}
@Override
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
return new XCJFQueryWeight((SolrIndexSearcher) searcher, scoreMode, boost);
}
@Override
public void visit(QueryVisitor visitor) {
visitor.visitLeaf(this);
}
@Override
public int hashCode() {
final int prime = 31;
int result = classHash();
result = prime * result + Objects.hashCode(query);
result = prime * result + Objects.hashCode(zkHost);
result = prime * result + Objects.hashCode(solrUrl);
result = prime * result + Objects.hashCode(collection);
result = prime * result + Objects.hashCode(fromField);
result = prime * result + Objects.hashCode(toField);
result = prime * result + Objects.hashCode(routedByJoinKey);
result = prime * result + Objects.hashCode(otherParamsString);
// timestamp and ttl should not be included in hash code
return result;
}
@Override
public boolean equals(Object other) {
return sameClassAs(other) &&
equalsTo(getClass().cast(other));
}
private boolean equalsTo(XCJFQuery other) {
return Objects.equals(query, other.query) &&
Objects.equals(zkHost, other.zkHost) &&
Objects.equals(solrUrl, other.solrUrl) &&
Objects.equals(collection, other.collection) &&
Objects.equals(fromField, other.fromField) &&
Objects.equals(toField, other.toField) &&
Objects.equals(routedByJoinKey, other.routedByJoinKey) &&
Objects.equals(otherParamsString, other.otherParamsString) &&
TimeUnit.SECONDS.convert(Math.abs(timestamp - other.timestamp), TimeUnit.NANOSECONDS) < Math.min(ttl, other.ttl);
}
@Override
public String toString(String field) {
return String.format(Locale.ROOT, "{!xcjf collection=%s from=%s to=%s routed=%b ttl=%d}%s",
collection, fromField, toField, routedByJoinKey, ttl, query.toString());
}
}

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="xcjf" version="1.1">
<fieldType name="string" class="solr.StrField" docValues="true"/>
<fieldType name="pint" class="solr.IntPointField" docValues="true"/>
<fieldType name="plong" class="solr.LongPointField" docValues="true"/>
<field name="id" type="string" required="true"/>
<field name="_version_" type="plong" indexed="true" stored="true"/>
<dynamicField name="*_i" type="pint"/>
<dynamicField name="*_l" type="plong"/>
<dynamicField name="*_s" type="string"/>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,72 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<dataDir>${solr.data.dir:}</dataDir>
<requestHandler name="/select" class="solr.SearchHandler" />
<query>
<filterCache class="solr.CaffeineCache"
size="512"
initialSize="512"
autowarmCount="0" />
<queryResultCache class="solr.CaffeineCache"
size="512"
initialSize="512"
autowarmCount="0" />
<documentCache class="solr.CaffeineCache"
size="512"
initialSize="512"
autowarmCount="0" />
</query>
<cache name="hash_product_id_s"
class="solr.CaffeineCache"
size="128"
initialSize="0"
regenerator="solr.NoOpRegenerator"/>
<queryParser name="xcjf" class="org.apache.solr.search.join.XCJFQParserPlugin">
<str name="routerField">product_id_s</str>
<arr name="solrUrl">
<str>${test.xcjf.solr.url.1:}</str>
<str>${test.xcjf.solr.url.2:}</str>
<str>${test.xcjf.solr.url.3:}</str>
</arr>
</queryParser>
<queryParser name="xcjf_nonrouted" class="org.apache.solr.search.join.XCJFQParserPlugin" />
<queryParser name="xcjf_whitelist" class="org.apache.solr.search.join.XCJFQParserPlugin">
<str name="routerField">product_id_s</str>
<arr name="solrUrl">
<str>${test.xcjf.solr.url.1:}</str>
<str>${test.xcjf.solr.url.2:}</str>
<str>${test.xcjf.solr.url.3:}</str>
</arr>
</queryParser>
</config>

View File

@ -1291,6 +1291,18 @@ public class QueryEqualityTest extends SolrTestCaseJ4 {
);
}
public void testXCJFQuery() throws Exception {
assertQueryEquals("xcjf",
"{!xcjf collection=abc from=x_id to=x_id}*:*",
"{!xcjf collection=abc from=x_id to=x_id v='*:*'}");
}
public void testHashRangeQuery() throws Exception {
assertQueryEquals("hash_range",
"{!hash_range f=x_id l=107347968 u=214695935}",
"{!hash_range l='107347968' u='214695935' f='x_id'}");
}
// Override req to add df param
public static SolrQueryRequest req(String... q) {
return SolrTestCaseJ4.req(q, "df", "text");

View File

@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.BeforeClass;
import org.junit.Test;
public class XCJFQueryTest extends SolrCloudTestCase {
private static final int NUM_NODES = 3;
private static final int NUM_SHARDS = 3;
private static final int NUM_REPLICAS = 1;
private static final int NUM_PRODUCTS = 200;
private static final String[] SIZES = new String[]{"S", "M", "L", "XL"};
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(NUM_NODES)
.addConfig("xcjf", configset("xcjf"))
.withSolrXml(TEST_PATH().resolve("solr.xml"))
.configure();
CollectionAdminRequest.createCollection("products", "xcjf", NUM_SHARDS, NUM_REPLICAS)
.process(cluster.getSolrClient());
CollectionAdminRequest.createCollection("parts", "xcjf", NUM_SHARDS, NUM_REPLICAS)
.process(cluster.getSolrClient());
}
public static void setupIndexes(boolean routeByKey) throws IOException, SolrServerException {
clearCollection("products");
clearCollection("parts");
buildIndexes(routeByKey);
assertResultCount("products", "*:*", NUM_PRODUCTS, true);
assertResultCount("parts", "*:*", NUM_PRODUCTS * 10 / 4, true);
}
private static void clearCollection(String collection) throws IOException, SolrServerException {
UpdateRequest update = new UpdateRequest();
update.deleteByQuery("*:*");
update.process(cluster.getSolrClient(), collection);
}
private static void buildIndexes(boolean routeByKey) throws IOException, SolrServerException {
List<SolrInputDocument> productDocs = new ArrayList<>();
List<SolrInputDocument> partDocs = new ArrayList<>();
for (int productId = 0; productId < NUM_PRODUCTS; ++productId) {
int sizeNum = productId % SIZES.length;
String size = SIZES[sizeNum];
productDocs.add(new SolrInputDocument(
"id", buildId(productId, String.valueOf(productId), routeByKey),
"product_id_i", String.valueOf(productId),
"product_id_l", String.valueOf(productId),
"product_id_s", String.valueOf(productId),
"size_s", size));
// Index 1 parts document for each small product, 2 for each medium, 3 for each large, etc.
for (int partNum = 0; partNum <= sizeNum; partNum++) {
String partId = String.format(Locale.ROOT, "%d_%d", productId, partNum);
partDocs.add(new SolrInputDocument(
"id", buildId(productId, partId, routeByKey),
"product_id_i", String.valueOf(productId),
"product_id_l", String.valueOf(productId),
"product_id_s", String.valueOf(productId)));
}
}
indexDocs("products", productDocs);
cluster.getSolrClient().commit("products");
indexDocs("parts", partDocs);
cluster.getSolrClient().commit("parts");
}
private static String buildId(int productId, String id, boolean routeByKey) {
return routeByKey ? productId + "!" + id : id;
}
private static void indexDocs(String collection, Collection<SolrInputDocument> docs) throws IOException, SolrServerException {
UpdateRequest update = new UpdateRequest();
update.add(docs);
update.process(cluster.getSolrClient(), collection);
}
private String getSolrUrl() {
List<JettySolrRunner> runners = cluster.getJettySolrRunners();
JettySolrRunner runner = runners.get(random().nextInt(runners.size()));
return runner.getBaseUrl().toString();
}
@Test
public void testXcjfRoutedCollection() throws Exception {
setupIndexes(true);
testXcjfQuery("{!xcjf collection=products from=product_id_i to=product_id_i}size_s:M",true);
int i = 0;
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
i++;
String url = runner.getBaseUrl().toString();
System.setProperty("test.xcjf.solr.url." + i, url);
}
try {
// now we need to re-upload our config , now that we know a valid solr url for the cluster.
CloudSolrClient client = cluster.getSolrClient();
((ZkClientClusterStateProvider) client.getClusterStateProvider()).uploadConfig(configset("xcjf"), "xcjf");
// reload the cores with the updated whitelisted solr url config.
CollectionAdminRequest.Reload.reloadCollection("products").process(client);
CollectionAdminRequest.Reload.reloadCollection("parts").process(client);
Thread.sleep(10000);
testXcjfQuery("{!xcjf collection=products from=product_id_i to=product_id_i}size_s:M",true);
testXcjfQuery(String.format(Locale.ROOT,
"{!xcjf solrUrl=\"%s\" collection=products from=product_id_i to=product_id_i}size_s:M", getSolrUrl()),
true);
testXcjfQuery("{!xcjf collection=products from=product_id_l to=product_id_l}size_s:M",
true);
testXcjfQuery(String.format(Locale.ROOT,
"{!xcjf solrUrl=\"%s\" collection=products from=product_id_l to=product_id_l}size_s:M",
getSolrUrl()),
true);
testXcjfQuery("{!xcjf collection=products from=product_id_s to=product_id_s}size_s:M",
true);
testXcjfQuery(String.format(Locale.ROOT,
"{!xcjf solrUrl=\"%s\" collection=products from=product_id_s to=product_id_s}size_s:M",
getSolrUrl()),
true);
testXcjfQuery(String.format(Locale.ROOT,
"{!xcjf zkHost=\"%s\" collection=products from=product_id_s to=product_id_s}size_s:M",
cluster.getSolrClient().getZkHost()),
true);
// Test the ability to set other parameters on xcjf and have them passed through
assertResultCount("parts",
"{!xcjf collection=products from=product_id_s to=product_id_s fq=product_id_s:1}size_s:M",
2, true);
assertResultCount("parts",
String.format(Locale.ROOT,
"{!xcjf solrUrl=\"%s\" collection=products from=product_id_s to=product_id_s fq=product_id_s:1}size_s:M",
getSolrUrl()), 2, true);
} finally {
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
i++;
System.getProperties().remove("test.xcjf.solr.url." + i);
}
}
}
@Test
public void testXcjfNonroutedCollection() throws Exception {
setupIndexes(false);
// This query will expect the collection to have been routed on product_id, so it should return
// incomplete results.
testXcjfQuery("{!xcjf collection=products from=product_id_s to=product_id_s}size_s:M",
false);
// Now if we set routed=false we should get a complete set of results.
testXcjfQuery("{!xcjf collection=products from=product_id_s to=product_id_s routed=false}size_s:M",
true);
// The xcjf_nonrouted query parser doesn't assume that the collection was routed on product_id,
// so we should get the full set of results.
testXcjfQuery("{!xcjf_nonrouted collection=products from=product_id_s to=product_id_s}size_s:M",
true);
// But if we set routed=true, we are now assuming again that the collection was routed on product_id,
// so we should get incomplete results.
testXcjfQuery("{!xcjf_nonrouted collection=products from=product_id_s to=product_id_s routed=true}size_s:M",
false);
}
@Test
public void testSolrUrlWhitelist() throws Exception {
setupIndexes(false);
// programmatically add the current jetty solr url to the solrUrl whitelist property in the solrconfig.xml
int i = 0;
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
i++;
System.setProperty("test.xcjf.solr.url." + i, runner.getBaseUrl().toString());
}
try {
// now we need to re-upload our config , now that we know a valid solr url for the cluster.
CloudSolrClient client = cluster.getSolrClient();
((ZkClientClusterStateProvider) client.getClusterStateProvider()).uploadConfig(configset("xcjf"), "xcjf");
// reload the cores with the updated whitelisted solr url config.
CollectionAdminRequest.Reload.reloadCollection("products").process(client);
CollectionAdminRequest.Reload.reloadCollection("parts").process(client);
final ModifiableSolrParams params = new ModifiableSolrParams();
// a bogus solrUrl
params.add("q", "");
params.add("rows", "0");
// we expect an exception because bogus url isn't valid.
try {
// This should throw an exception.
// verify the xcfj_whitelist definition has the current valid urls and works.
testXcjfQuery(String.format(Locale.ROOT,
"{!xcjf_whitelist solrUrl=\"%s\" collection=products from=product_id_i to=product_id_i}size_s:M",
"http://bogus.example.com:8983/solr"),
true);
fail("The query invovling bogus.example.com should not succeed");
} catch (Exception e) {
// should get here.
String message = e.getMessage();
assertTrue("message was " + message, message.contains("SyntaxError: Solr Url was not in the whitelist"));
}
// verify the xcfj_whitelist definition has the current valid urls and works.
testXcjfQuery(String.format(Locale.ROOT,
"{!xcjf_whitelist solrUrl=\"%s\" collection=products from=product_id_i to=product_id_i}size_s:M",
getSolrUrl()),
true);
} finally {
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
i++;
System.getProperties().remove("test.xcjf.solr.url." + i);
}
}
}
public void testXcjfQuery(String query, boolean expectFullResults) throws Exception {
assertResultCount("parts", query, NUM_PRODUCTS / 2, expectFullResults);
}
private static void assertResultCount(String collection, String query, long expectedCount, boolean expectFullResults)
throws IOException, SolrServerException {
final ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", query);
params.add("rows", "0");
QueryResponse resp = cluster.getSolrClient().query(collection, params);
if (expectFullResults) {
assertEquals(expectedCount, resp.getResults().getNumFound());
} else {
assertTrue(resp.getResults().getNumFound() < expectedCount);
}
}
}

View File

@ -549,6 +549,45 @@ http://localhost:8983/solr/alt_graph/query?fl=id&q={!graph+from=id+to=out_edge+m
{ "id":"H" } ]
}
----
== Hash Range Query Parser
The hash range query parser will return documents that have a field that contains a value that would be hashed to a particular range. This is used by the XCJF query parser. This query parser has a per segment cache for each field that this query parser will operate on.
When specifying a min/max hash range and a field name with the hash range query parser, only documents who contain a field value that hashes into that range will be returned. If you want to query for a very large result set, you can query for various hash ranges to return a fraction of the documents with each range request. In the XCJF case, the hash_range query parser is used to ensure that each shard only gets the set of join keys that would end up on that shard.
This query parser uses the MurmurHash3_x86_32. This is the same as the default hashing for the default composite ID router in Solr.
=== Hash Range Parameters
`f`::
The field name to operate on. This field should have docValues enabled and should be single-valued
`l`::
The lower bound of the hash range for the query
`u`::
The upper bound for the hash range for the query
=== Hash Range Example
[source,text]
----
{!hash_range f="field_name" l="0" u="12345"}
----
=== Hash Range Cache Config
The hash range query parser uses a special cache to improve the speedup of the queries. The following should be added to the solrconfig.xml for the various fields that you want to perform the hash range query on. Note the name of the cache should be the field name prefixed by "hash_".
[source,xml]
----
<cache name="hash_field_name"
class="solr.LRUCache"
size="128"
initialSize="0"
regenerator="solr.NoOpRegenerator"/>
----
== Join Query Parser
@ -1018,6 +1057,71 @@ An optional parameter used to determine which of several query implementations s
{!terms f=categoryId method=booleanQuery separator=" "}8 6 7 5309
----
== XCJF Query Parser
The Cross Collection Join filter is a query parser plugin that will execute a query against a remote Solr collection to get back a set of join keys that will be used to as a filter query against the local Solr collection. The XCJF query parser will create an XCJFQuery object. The XCJFQuery will first query a remote solr collection and get back a streaming expression result of the join keys. As the join keys are streamed to the node, a bitset of the matching documents in the local index is built up. This avoids keeping the full set of join keys in memory at any given time. This bitset is then inserted into the filter cache upon successful execution as with the normal behavior of the solr filter cache.
If the local index is sharded according to the join key field, the XCJF query can leverage a secondary query parser called the "hash_range" query parser. The hash_range query parser is responsible for returning only the documents that hash to a given range of values. This allows the XCJFQuery to query the remote solr collection and return only the join keys that would match a specific shard in the local solr collection. This has the benefit of making sure that network traffic doesn't increase as the number of shards increases and allows for much greater scalability.
XCJF parser works with both String and Point types of fields. The fields that are being used for the join key must be single value and have docValues enabled. It's advised to shard the local collection by the join key as this allows for the optimization mentioned above to be utilized. The XCJF should not be generally used as part of the "q", but rather it is designed to be used as a filter query "fq" parameter to ensure proper caching. The remote solr collection that is being queried should have a single value field for the join key with docValues enabled. The remote solr collection does not have any specific sharding requirements.
=== XCJF Query Parser definition in solrconfig.xml
The XCJF has some configuration options that can be specified in the solrconfig.xml
`routerField`::
If the documents are routed to shards using the CompositeID router by the join field, then that field name should be specified in the configuration here. This will allow the parser to optimize the resulting HashRange query.
`solrUrl`::
If specified, this array of strings specifies the white listed Solr URLs that you can pass to the solrUrl query parameter. Without this configuration the solrUrl parameter cannot be used. This restriction is necessary to prevent an attacker from using solr to explore the network.
[source,xml]
----
<queryParser name="xcjf" class="org.apache.solr.search.join.XCJFQParserPlugin">
<!-- Define which field has the routing information for the collection to use the hash range query parser. -->
<str name="routerField">joinfield_id_s</str>
<!-- Demo only, most users will want to remove this parameter -->
<arr name="solrUrl">
<str>http://othersolr.example.com:8983/solr</str>
</arr>
</queryParser>
----
=== XCJF Query Parameters
`collection`::
The name of the external Solr collection to be queried to retrieve the set of join key values ( required )
`zkHost`::
The connection string to be used to connect to Zookeeper. zkHost and solrUrl are both optional parameters, and at most one of them should be specified. If neither of zkHost or solrUrl are specified, the local Zookeeper cluster will be used. ( optional )
`solrUrl`::
The URL of the external Solr node to be queried. Must be a character for character exact match of a whitelisted url. ( optional, disabled by default for security )
`from`::
The join key field name in the external collection ( required )
`to`::
The join key field name in the local collection
`v`::
The query substituted in as a local param. This is the query string that will match documents in the remote collection.
`routed`::
true / false. If true, the XCJF query will use each shard's hash range to determine the set of join keys to retrieve for that shard. This parameter improves the performance of the cross-collection join, but it depends on the local collection being routed by the toField. If this parameter is not specified, the XCJF query will try to determine the correct value automatically.
`ttl`::
The length of time that an XCJF query in the cache will be considered valid, in seconds. Defaults to 3600 (one hour). The XCJF query will not be aware of changes to the remote collection, so if the remote collection is updated, cached XCJF queries may give inaccurate results. After the ttl period has expired, the XCJF query will re-execute the join against the remote collection.
`All others`
Any normal Solr parameter can also be specified/passed through as a local param.
=== XCJF Query Examples
[source,text]
----
http://localhost:8983/solr/localCollection/query?fl=id&q={!xcjf collection="otherCollection" from="fromField" to="toField" v="*:*"}
----
== XML Query Parser
The {solr-javadocs}/solr-core/org/apache/solr/search/XmlQParserPlugin.html[XmlQParserPlugin] extends the {solr-javadocs}/solr-core/org/apache/solr/search/QParserPlugin.html[QParserPlugin] and supports the creation of queries from XML. Example: