SOLR-7775: \{\!join score=.. fromIndex=...\} supports single-sharded replicated SolrCloudollection

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1697157 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mikhail Khludnev 2015-08-23 09:02:38 +00:00
parent bc9da338af
commit e31c84e061
4 changed files with 186 additions and 135 deletions

View File

@ -123,6 +123,10 @@ New Features
Example: description:HDTV OR filter(+promotion:tv +promotion_date:[NOW/DAY TO NOW/DAY+7DAY])
(yonik)
* SOLR-7775: Allow fromIndex parameter to ScoreJoinQParserPlugin {!join score=.. fromIndex=..}..
to refer to a single-sharded collection that has a replica on all nodes where there is a
replica in the to index (Andrei Beliakov via Mikhail Khludnev)
Bug Fixes
----------------------

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
@ -44,12 +43,7 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.StringHelper;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@ -85,10 +79,12 @@ public class JoinQParserPlugin extends QParserPlugin {
}
Query parseJoin() throws SyntaxError {
String fromField = getParam("from");
String fromIndex = getParam("fromIndex");
String toField = getParam("to");
String v = localParams.get("v");
final String fromField = getParam("from");
final String fromIndex = getParam("fromIndex");
final String toField = getParam("to");
final String v = localParams.get("v");
final String coreName;
Query fromQuery;
long fromCoreOpenTime = 0;
@ -96,42 +92,13 @@ public class JoinQParserPlugin extends QParserPlugin {
CoreContainer container = req.getCore().getCoreDescriptor().getCoreContainer();
// if in SolrCloud mode, fromIndex should be the name of a single-sharded collection
if (container.isZooKeeperAware()) {
ZkController zkController = container.getZkController();
if (!zkController.getClusterState().hasCollection(fromIndex)) {
// collection not found ... but it might be an alias?
String resolved = null;
Aliases aliases = zkController.getZkStateReader().getAliases();
if (aliases != null) {
Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
resolved = (collectionAliases != null) ? collectionAliases.get(fromIndex) : null;
if (resolved != null) {
// ok, was an alias, but if the alias points to multiple collections, then we don't support that yet
if (resolved.split(",").length > 1)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: Collection alias '" + fromIndex +
"' maps to multiple collections ("+resolved+
"), which is not currently supported for joins.");
}
}
coreName = ScoreJoinQParserPlugin.getCoreName(fromIndex, container);
if (resolved == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: Collection '" + fromIndex + "' not found!");
// ok, resolved to an alias
fromIndex = resolved;
}
// the fromIndex is a local replica for a single-sharded collection with replicas
// across all nodes that have replicas for the collection we're joining with
fromIndex = findLocalReplicaForFromIndex(zkController, fromIndex);
}
final SolrCore fromCore = container.getCore(fromIndex);
if (fromCore == null)
final SolrCore fromCore = container.getCore(coreName);
if (fromCore == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Cross-core join: no such core " + fromIndex);
"Cross-core join: no such core " + coreName);
}
RefCounted<SolrIndexSearcher> fromHolder = null;
LocalSolrQueryRequest otherReq = new LocalSolrQueryRequest(fromCore, params);
@ -146,48 +113,18 @@ public class JoinQParserPlugin extends QParserPlugin {
if (fromHolder != null) fromHolder.decref();
}
} else {
coreName = null;
QParser fromQueryParser = subQuery(v, null);
fromQuery = fromQueryParser.getQuery();
}
JoinQuery jq = new JoinQuery(fromField, toField, fromIndex, fromQuery);
JoinQuery jq = new JoinQuery(fromField, toField, coreName == null ? fromIndex : coreName, fromQuery);
jq.fromCoreOpenTime = fromCoreOpenTime;
return jq;
}
};
}
protected String findLocalReplicaForFromIndex(ZkController zkController, String fromIndex) {
String fromReplica = null;
String nodeName = zkController.getNodeName();
for (Slice slice : zkController.getClusterState().getActiveSlices(fromIndex)) {
if (fromReplica != null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: multiple shards not yet supported " + fromIndex);
for (Replica replica : slice.getReplicas()) {
if (replica.getNodeName().equals(nodeName)) {
fromReplica = replica.getStr(ZkStateReader.CORE_NAME_PROP);
// found local replica, but is it Active?
if (replica.getState() != Replica.State.ACTIVE)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: "+fromIndex+" has a local replica ("+fromReplica+
") on "+nodeName+", but it is "+replica.getState());
break;
}
}
}
if (fromReplica == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: No active replicas for "+fromIndex+
" found in node " + nodeName);
return fromReplica;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.solr.search.join;
import java.io.IOException;
import java.util.Map;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
@ -25,7 +26,12 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.join.JoinUtil;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.uninverting.UninvertingReader;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@ -59,10 +65,9 @@ import org.apache.solr.util.RefCounted;
* Thus, it only supports {@link DocValuesType#SORTED}, {@link DocValuesType#SORTED_SET}, {@link DocValuesType#BINARY}. </li>
* <li>fromIndex - optional parameter, a core name where subordinate query should run (and <code>from</code> values are collected) rather than current core.
* <br>Example:<code>q={!join from=manu_id_s to=id score=total fromIndex=products}foo</code>
* <br>Follow up <a href="https://issues.apache.org/jira/browse/SOLR-7775">SOLR-7775</a> for SolrCloud collections support.</li>
* <li>to - "primary key" field name which is searched for values collected from subordinate query.
* it should be declared as <code>indexed="true"</code>. Now it's treated as a single value field.</li>
* <li>score - one of {@link ScoreMode}: None,Avg,Total,Max. Lowercase is also accepted.</li>
* <li>score - one of {@link ScoreMode}: <code>none,avg,total,max,min</code>. Capital case is also accepted.</li>
* </ul>
*/
public class ScoreJoinQParserPlugin extends QParserPlugin {
@ -235,11 +240,12 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
if (fromIndex != null && (!fromIndex.equals(myCore) || byPassShortCircutCheck)) {
CoreContainer container = req.getCore().getCoreDescriptor().getCoreContainer();
final SolrCore fromCore = container.getCore(fromIndex);
final String coreName = getCoreName(fromIndex, container);
final SolrCore fromCore = container.getCore(coreName);
RefCounted<SolrIndexSearcher> fromHolder = null;
if (fromCore == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cross-core join: no such core " + fromIndex);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cross-core join: no such core " + coreName);
}
long fromCoreOpenTime = 0;
@ -253,7 +259,7 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
if (fromHolder != null) {
fromCoreOpenTime = fromHolder.get().getOpenNanoTime();
}
return new OtherCoreJoinQuery(fromQuery, fromField, fromIndex, fromCoreOpenTime,
return new OtherCoreJoinQuery(fromQuery, fromField, coreName, fromCoreOpenTime,
scoreMode, toField);
} finally {
otherReq.close();
@ -268,6 +274,84 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
}
};
}
/**
* Returns an String with the name of a core.
* <p>
* This method searches the core with fromIndex name in the core's container.
* If fromIndex isn't name of collection or alias it's returns fromIndex without changes.
* If fromIndex is name of alias but if the alias points to multiple collections it's throw
* SolrException.ErrorCode.BAD_REQUEST because multiple shards not yet supported.
*
* @param fromIndex name of the index
* @param container the core container for searching the core with fromIndex name or alias
* @return the string with name of core
*/
public static String getCoreName(final String fromIndex, CoreContainer container) {
if (container.isZooKeeperAware()) {
ZkController zkController = container.getZkController();
final String resolved =
zkController.getClusterState().hasCollection(fromIndex)
? fromIndex : resolveAlias(fromIndex, zkController);
if (resolved == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: Collection '" + fromIndex + "' not found!");
}
return findLocalReplicaForFromIndex(zkController, resolved);
}
return fromIndex;
}
private static String resolveAlias(String fromIndex, ZkController zkController) {
final Aliases aliases = zkController.getZkStateReader().getAliases();
if (aliases != null) {
final String resolved;
Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
resolved = (collectionAliases != null) ? collectionAliases.get(fromIndex) : null;
if (resolved != null) {
if (resolved.split(",").length > 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: Collection alias '" + fromIndex +
"' maps to multiple collections (" + resolved +
"), which is not currently supported for joins.");
}
return resolved;
}
}
return null;
}
private static String findLocalReplicaForFromIndex(ZkController zkController, String fromIndex) {
String fromReplica = null;
String nodeName = zkController.getNodeName();
for (Slice slice : zkController.getClusterState().getActiveSlices(fromIndex)) {
if (fromReplica != null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: multiple shards not yet supported " + fromIndex);
for (Replica replica : slice.getReplicas()) {
if (replica.getNodeName().equals(nodeName)) {
fromReplica = replica.getStr(ZkStateReader.CORE_NAME_PROP);
// found local replica, but is it Active?
if (replica.getState() != Replica.State.ACTIVE)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: "+fromIndex+" has a local replica ("+fromReplica+
") on "+nodeName+", but it is "+replica.getState());
break;
}
}
}
if (fromReplica == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: No active replicas for "+fromIndex+
" found in node " + nodeName);
return fromReplica;
}
}

View File

@ -17,13 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
@ -36,31 +30,16 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.junit.After;
import org.junit.Before;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.*;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Tests using fromIndex that points to a collection in SolrCloud mode.
@ -114,38 +93,11 @@ public class DistribJoinFromCollectionTest extends AbstractFullDistribZkTestBase
Thread.sleep(1000); // so the commits fire
// verify the join with fromIndex works
String joinQ = "{!join from=join_s fromIndex="+fromColl+" to=join_s}match_s:c";
QueryRequest qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s"));
QueryResponse rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
SolrDocumentList hits = rsp.getResults();
assertTrue("Expected 1 doc", hits.getNumFound() == 1);
SolrDocument doc = hits.get(0);
assertEquals(toDocId, doc.getFirstValue("id"));
assertEquals("b", doc.getFirstValue("get_s"));
//without score
testJoins(toColl, fromColl, toDocId, false);
// create an alias for the fromIndex and then query through the alias
String alias = fromColl+"Alias";
CollectionAdminRequest.CreateAlias request = new CollectionAdminRequest.CreateAlias();
request.setAliasName(alias);
request.setAliasedCollections(fromColl);
request.process(cloudClient);
joinQ = "{!join from=join_s fromIndex="+alias+" to=join_s}match_s:c";
qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s"));
rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
hits = rsp.getResults();
assertTrue("Expected 1 doc", hits.getNumFound() == 1);
doc = hits.get(0);
assertEquals(toDocId, doc.getFirstValue("id"));
assertEquals("b", doc.getFirstValue("get_s"));
// verify join doesn't work if no match in the "from" index
joinQ = "{!join from=join_s fromIndex="+fromColl+" to=join_s}match_s:d";
qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s"));
rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
hits = rsp.getResults();
assertTrue("Expected no hits", hits.getNumFound() == 0);
//with score
testJoins(toColl, fromColl, toDocId, true);
log.info("DistribJoinFromCollectionTest logic complete ... deleting the " + toColl + " and " + fromColl + " collections");
@ -157,13 +109,87 @@ public class DistribJoinFromCollectionTest extends AbstractFullDistribZkTestBase
req.process(cloudClient);
} catch (Exception e) {
// don't fail the test
log.warn("Could not delete collection {} after test completed due to: "+e, c);
log.warn("Could not delete collection {} after test completed due to: " + e, c);
}
}
log.info("DistribJoinFromCollectionTest succeeded ... shutting down now!");
}
private void testJoins(String toColl, String fromColl, Integer toDocId, boolean isScoresTest)
throws SolrServerException, IOException {
// verify the join with fromIndex works
final String[] scoreModes = {"avg","max","min","total"};
String joinQ = "{!join " + anyScoreMode(isScoresTest, scoreModes)
+ "from=join_s fromIndex=" + fromColl + " to=join_s}match_s:c";
QueryRequest qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s,score"));
QueryResponse rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
SolrDocumentList hits = rsp.getResults();
assertTrue("Expected 1 doc", hits.getNumFound() == 1);
SolrDocument doc = hits.get(0);
assertEquals(toDocId, doc.getFirstValue("id"));
assertEquals("b", doc.getFirstValue("get_s"));
assertScore(isScoresTest, doc);
//negative test before creating an alias
checkAbsentFromIndex(fromColl, toColl, isScoresTest, scoreModes);
// create an alias for the fromIndex and then query through the alias
String alias = fromColl+"Alias";
CollectionAdminRequest.CreateAlias request = new CollectionAdminRequest.CreateAlias();
request.setAliasName(alias);
request.setAliasedCollections(fromColl);
request.process(cloudClient);
joinQ = "{!join " + anyScoreMode(isScoresTest, scoreModes)
+ "from=join_s fromIndex=" + alias + " to=join_s}match_s:c";
qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s,score"));
rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
hits = rsp.getResults();
assertTrue("Expected 1 doc", hits.getNumFound() == 1);
doc = hits.get(0);
assertEquals(toDocId, doc.getFirstValue("id"));
assertEquals("b", doc.getFirstValue("get_s"));
assertScore(isScoresTest, doc);
//negative test after creating an alias
checkAbsentFromIndex(fromColl, toColl, isScoresTest, scoreModes);
// verify join doesn't work if no match in the "from" index
joinQ = "{!join " + (anyScoreMode(isScoresTest, scoreModes))
+ "from=join_s fromIndex=" + fromColl + " to=join_s}match_s:d";
qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s,score"));
rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
hits = rsp.getResults();
assertTrue("Expected no hits", hits.getNumFound() == 0);
assertScore(isScoresTest, doc);
}
private void assertScore(boolean isScoresTest, SolrDocument doc) {
if (isScoresTest) {
assertThat(doc.getFirstValue("score").toString(), not("1.0"));
} else {
assertEquals("1.0", doc.getFirstValue("score").toString());
}
}
private String anyScoreMode(boolean isScoresTest, String[] scoreModes) {
return isScoresTest ? "score=" + (scoreModes[random().nextInt(scoreModes.length)]) + " " : "";
}
private void checkAbsentFromIndex(String fromColl, String toColl, boolean isScoresTest, String[] scoreModes) throws SolrServerException, IOException {
final String wrongName = fromColl + "WrongName";
final String joinQ = "{!join " + (anyScoreMode(isScoresTest, scoreModes))
+ "from=join_s fromIndex=" + wrongName + " to=join_s}match_s:c";
final QueryRequest qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s,score"));
try {
cloudClient.request(qr);
} catch (HttpSolrClient.RemoteSolrException ex) {
assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, ex.code());
assertTrue(ex.getMessage().contains(wrongName));
}
}
protected Integer indexDoc(String collection, int id, String joinField, String matchField, String getField) throws Exception {
UpdateRequest up = new UpdateRequest();
up.setCommitWithin(50);