SOLR-4905: Allow fromIndex parameter to JoinQParserPlugin to refer to a single-sharded collection that has a replica on all nodes

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1656622 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2015-02-03 03:07:26 +00:00
parent 8ac45d4479
commit 2dfb0a02a8
3 changed files with 263 additions and 4 deletions

View File

@ -93,6 +93,10 @@ New Features
deprecated in favour of close(). (Mark Miller, Tomás Fernández Löbbe, Alan
Woodward)
* SOLR-4905: Allow fromIndex parameter to JoinQParserPlugin to refer to a single-sharded
collection that has a replica on all nodes where there is a replica in the to index
(Jack Lo, Timothy Potter)
Bug Fixes
----------------------

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.index.LeafReaderContext;
@ -44,7 +45,13 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.StringHelper;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@ -80,13 +87,45 @@ public class JoinQParserPlugin extends QParserPlugin {
if (fromIndex != null && !fromIndex.equals(req.getCore().getCoreDescriptor().getName()) ) {
CoreContainer container = req.getCore().getCoreDescriptor().getCoreContainer();
final SolrCore fromCore = container.getCore(fromIndex);
RefCounted<SolrIndexSearcher> fromHolder = null;
if (fromCore == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cross-core join: no such core " + fromIndex);
// if in SolrCloud mode, fromIndex should be the name of a single-sharded collection
if (container.isZooKeeperAware()) {
ZkController zkController = container.getZkController();
if (!zkController.getClusterState().hasCollection(fromIndex)) {
// collection not found ... but it might be an alias?
String resolved = null;
Aliases aliases = zkController.getZkStateReader().getAliases();
if (aliases != null) {
Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
resolved = (collectionAliases != null) ? collectionAliases.get(fromIndex) : null;
if (resolved != null) {
// ok, was an alias, but if the alias points to multiple collections, then we don't support that yet
if (resolved.split(",").length > 1)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: Collection alias '" + fromIndex +
"' maps to multiple collections ("+resolved+
"), which is not currently supported for joins.");
}
}
if (resolved == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: Collection '" + fromIndex + "' not found!");
// ok, resolved to an alias
fromIndex = resolved;
}
// the fromIndex is a local replica for a single-sharded collection with replicas
// across all nodes that have replicas for the collection we're joining with
fromIndex = findLocalReplicaForFromIndex(zkController, fromIndex);
}
final SolrCore fromCore = container.getCore(fromIndex);
if (fromCore == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Cross-core join: no such core " + fromIndex);
RefCounted<SolrIndexSearcher> fromHolder = null;
LocalSolrQueryRequest otherReq = new LocalSolrQueryRequest(fromCore, params);
try {
QParser parser = QParser.getParser(v, "lucene", otherReq);
@ -109,6 +148,39 @@ public class JoinQParserPlugin extends QParserPlugin {
}
};
}
protected String findLocalReplicaForFromIndex(ZkController zkController, String fromIndex) {
String fromReplica = null;
String nodeName = zkController.getNodeName();
for (Slice slice : zkController.getClusterState().getActiveSlices(fromIndex)) {
if (fromReplica != null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: multiple shards not yet supported " + fromIndex);
for (Replica replica : slice.getReplicas()) {
if (replica.getNodeName().equals(nodeName)) {
fromReplica = replica.getStr(ZkStateReader.CORE_NAME_PROP);
// found local replica, but is it Active?
ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
if (!ZkStateReader.ACTIVE.equals(replicaCoreProps.getState()))
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: "+fromIndex+" has a local replica ("+fromReplica+
") on "+nodeName+", but it is "+replicaCoreProps.getState());
break;
}
}
}
if (fromReplica == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: No active replicas for "+fromIndex+
" found in node " + nodeName);
return fromReplica;
}
}

View File

@ -0,0 +1,183 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.junit.After;
import org.junit.Before;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Tests using fromIndex that points to a collection in SolrCloud mode.
*/
public class DistribJoinFromCollectionTest extends AbstractFullDistribZkTestBase {
public DistribJoinFromCollectionTest() {
super();
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
}
@Override
@After
public void tearDown() throws Exception {
try {
super.tearDown();
} catch (Exception exc) {}
resetExceptionIgnores();
}
@Test
public void test() throws Exception {
// create a collection holding data for the "to" side of the JOIN
String toColl = "to_2x2";
createCollection(toColl, 2, 2, 2);
ensureAllReplicasAreActive(toColl, "shard1", 2, 2, 30);
ensureAllReplicasAreActive(toColl, "shard2", 2, 2, 30);
// get the set of nodes where replicas for the "to" collection exist
Set<String> nodeSet = new HashSet<>();
ClusterState cs = cloudClient.getZkStateReader().getClusterState();
for (Slice slice : cs.getActiveSlices(toColl))
for (Replica replica : slice.getReplicas())
nodeSet.add(replica.getNodeName());
assertTrue(nodeSet.size() > 0);
// deploy the "from" collection to all nodes where the "to" collection exists
String fromColl = "from_1x2";
createCollection(null, fromColl, 1, nodeSet.size(), 1, null, StringUtils.join(nodeSet,","));
ensureAllReplicasAreActive(fromColl, "shard1", 1, nodeSet.size(), 30);
// both to and from collections are up and active, index some docs ...
Integer toDocId = indexDoc(toColl, 1001, "a", null, "b");
indexDoc(fromColl, 2001, "a", "c", null);
Thread.sleep(1000); // so the commits fire
// verify the join with fromIndex works
String joinQ = "{!join from=join_s fromIndex="+fromColl+" to=join_s}match_s:c";
QueryRequest qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s"));
QueryResponse rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
SolrDocumentList hits = rsp.getResults();
assertTrue("Expected 1 doc", hits.getNumFound() == 1);
SolrDocument doc = hits.get(0);
assertEquals(toDocId, doc.getFirstValue("id"));
assertEquals("b", doc.getFirstValue("get_s"));
// create an alias for the fromIndex and then query through the alias
String alias = fromColl+"Alias";
CollectionAdminRequest.CreateAlias request = new CollectionAdminRequest.CreateAlias();
request.setAliasName(alias);
request.setAliasedCollections(fromColl);
request.process(cloudClient);
joinQ = "{!join from=join_s fromIndex="+alias+" to=join_s}match_s:c";
qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s"));
rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
hits = rsp.getResults();
assertTrue("Expected 1 doc", hits.getNumFound() == 1);
doc = hits.get(0);
assertEquals(toDocId, doc.getFirstValue("id"));
assertEquals("b", doc.getFirstValue("get_s"));
// verify join doesn't work if no match in the "from" index
joinQ = "{!join from=join_s fromIndex="+fromColl+" to=join_s}match_s:d";
qr = new QueryRequest(params("collection", toColl, "q", joinQ, "fl", "id,get_s"));
rsp = new QueryResponse(cloudClient.request(qr), cloudClient);
hits = rsp.getResults();
assertTrue("Expected no hits", hits.getNumFound() == 0);
log.info("DistribJoinFromCollectionTest logic complete ... deleting the " + toColl + " and " + fromColl + " collections");
// try to clean up
for (String c : new String[]{ toColl, fromColl }) {
try {
CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
req.setCollectionName(c);
req.process(cloudClient);
} catch (Exception e) {
// don't fail the test
log.warn("Could not delete collection {} after test completed due to: "+e, c);
}
}
log.info("DistribJoinFromCollectionTest succeeded ... shutting down now!");
}
protected Integer indexDoc(String collection, int id, String joinField, String matchField, String getField) throws Exception {
UpdateRequest up = new UpdateRequest();
up.setCommitWithin(50);
up.setParam("collection", collection);
SolrInputDocument doc = new SolrInputDocument();
Integer docId = new Integer(id);
doc.addField("id", docId);
doc.addField("join_s", joinField);
if (matchField != null)
doc.addField("match_s", matchField);
if (getField != null)
doc.addField("get_s", getField);
up.add(doc);
cloudClient.request(up);
return docId;
}
}