mirror of https://github.com/apache/lucene.git
SOLR-2592: deleteByQuery routing support
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1418814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7ebb85ab6d
commit
7eb16a69a5
|
@ -41,7 +41,6 @@ import org.apache.solr.client.solrj.util.ClientUtils;
|
|||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -407,16 +406,8 @@ public class HttpShardHandler extends ShardHandler {
|
|||
|
||||
private void addSlices(Map<String,Slice> target, ClusterState state, SolrParams params, String collectionName, String shardKeys, boolean multiCollection) {
|
||||
DocCollection coll = state.getCollection(collectionName);
|
||||
if (shardKeys != null) {
|
||||
List<String> shardKeyList = StrUtils.splitSmart(shardKeys, ",", true);
|
||||
for (String oneShardKey : shardKeyList) {
|
||||
Collection<Slice> someSlices = coll.getRouter().getSearchSlices(oneShardKey, params, coll);
|
||||
ClientUtils.addSlices(target, collectionName, someSlices, multiCollection);
|
||||
}
|
||||
} else {
|
||||
Collection<Slice> someSlices = coll.getRouter().getSearchSlices(null, params, coll);
|
||||
ClientUtils.addSlices(target, collectionName, someSlices, multiCollection);
|
||||
}
|
||||
Collection<Slice> slices = coll.getRouter().getSearchSlices(shardKeys, params , coll);
|
||||
ClientUtils.addSlices(target, collectionName, slices, multiCollection);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.update.processor;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -41,11 +42,11 @@ import org.apache.solr.common.cloud.DocCollection;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
|
@ -761,22 +762,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
if (zkEnabled && DistribPhase.NONE == phase) {
|
||||
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
|
||||
|
||||
Map<String,Slice> slices = zkController.getClusterState().getSlicesMap(collection);
|
||||
if (slices == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Cannot find collection:" + collection + " in "
|
||||
+ zkController.getClusterState().getCollections());
|
||||
}
|
||||
ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
|
||||
DocCollection coll = zkController.getClusterState().getCollection(collection);
|
||||
SolrParams params = req.getParams();
|
||||
Collection<Slice> slices = coll.getRouter().getSearchSlices(params.get(ShardParams.SHARD_KEYS), params, coll);
|
||||
|
||||
List<Node> leaders = new ArrayList<Node>(slices.size());
|
||||
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
|
||||
String sliceName = sliceEntry.getKey();
|
||||
ZkNodeProps leaderProps;
|
||||
for (Slice slice : slices) {
|
||||
String sliceName = slice.getName();
|
||||
Replica leader;
|
||||
try {
|
||||
leaderProps = zkController.getZkStateReader().getLeaderProps(collection, sliceName);
|
||||
leader = zkController.getZkStateReader().getLeaderProps(collection, sliceName);
|
||||
} catch (InterruptedException e) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
|
||||
}
|
||||
|
@ -785,7 +783,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
// should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
|
||||
|
||||
// Am I the leader for this slice?
|
||||
ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leaderProps);
|
||||
ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
|
||||
String leaderNodeName = coreLeaderProps.getCoreNodeName();
|
||||
String coreName = req.getCore().getName();
|
||||
String coreNodeName = zkController.getNodeName() + "_" + coreName;
|
||||
|
@ -799,8 +797,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
params.remove("commit"); // this will be distributed from the local commit
|
||||
cmdDistrib.distribDelete(cmd, leaders, params);
|
||||
outParams.remove("commit"); // this will be distributed from the local commit
|
||||
cmdDistrib.distribDelete(cmd, leaders, outParams);
|
||||
|
||||
if (!leaderForAnyShard) {
|
||||
return;
|
||||
|
|
|
@ -165,8 +165,28 @@ public class ShardRoutingTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
doQuery("b!doc1,c!doc2", "q","*:*", shardKeys,"b!,c!");
|
||||
|
||||
|
||||
doQuery("b!doc1,c!doc2,d!doc3,e!doc4", "q","*:*", shardKeys,"foo/0!");
|
||||
|
||||
doDBQ("*:*", shardKeys,"b!");
|
||||
commit();
|
||||
doQuery("c!doc2,d!doc3,e!doc4", "q","*:*");
|
||||
doAddDoc("b!doc1");
|
||||
|
||||
doDBQ("*:*", shardKeys,"c!");
|
||||
commit();
|
||||
doQuery("b!doc1,d!doc3,e!doc4", "q","*:*");
|
||||
doAddDoc("c!doc2");
|
||||
|
||||
doDBQ("*:*", shardKeys,"c!");
|
||||
commit();
|
||||
doQuery("b!doc1,d!doc3,e!doc4", "q","*:*");
|
||||
doAddDoc("c!doc2");
|
||||
|
||||
doDBQ("*:*", shardKeys,"d!,e!");
|
||||
commit();
|
||||
doQuery("b!doc1,c!doc2", "q","*:*");
|
||||
doAddDoc("d!doc3");
|
||||
doAddDoc("e!doc4");
|
||||
}
|
||||
|
||||
void doAddDoc(String id) throws Exception {
|
||||
|
@ -174,6 +194,7 @@ public class ShardRoutingTest extends AbstractFullDistribZkTestBase {
|
|||
// todo - target diff servers and use cloud clients as well as non-cloud clients
|
||||
}
|
||||
|
||||
// TODO: refactor some of this stuff up into a base class for use by other tests
|
||||
void doQuery(String expectedDocs, String... queryParams) throws Exception {
|
||||
Set<String> expectedIds = new HashSet<String>( StrUtils.splitSmart(expectedDocs, ",", true) );
|
||||
|
||||
|
@ -200,6 +221,14 @@ public class ShardRoutingTest extends AbstractFullDistribZkTestBase {
|
|||
assertEquals(expectedIds, obtainedIds);
|
||||
}
|
||||
|
||||
// TODO: refactor some of this stuff into the SolrJ client... it should be easier to use
|
||||
void doDBQ(String q, String... reqParams) throws Exception {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
req.deleteByQuery(q);
|
||||
req.setParams(params(reqParams));
|
||||
req.process(cloudClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
|
|
|
@ -23,12 +23,10 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.http.impl.client.RoutedRequest;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.DocRouter.Range;
|
||||
import org.apache.solr.common.cloud.PlainIdRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
|
|
@ -87,7 +87,7 @@ public class CompositeIdRouter extends HashBasedRouter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection) {
|
||||
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
|
||||
if (shardKey == null) {
|
||||
// search across whole collection
|
||||
// TODO: this may need modification in the future when shard splitting could cause an overlap
|
||||
|
|
|
@ -22,11 +22,13 @@ import org.apache.solr.common.SolrException;
|
|||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -157,9 +159,29 @@ public abstract class DocRouter {
|
|||
|
||||
/** This method is consulted to determine what slices should be queried for a request when
|
||||
* an explicit shards parameter was not used.
|
||||
* shardKey (normally from shard.keys) and params may be null.
|
||||
* This method only accepts a single shard key (or null). If you have a comma separated list of shard keys,
|
||||
* call getSearchSlices
|
||||
**/
|
||||
public abstract Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection);
|
||||
public abstract Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection);
|
||||
|
||||
|
||||
/** This method is consulted to determine what slices should be queried for a request when
|
||||
* an explicit shards parameter was not used.
|
||||
* This method accepts a multi-valued shardKeys parameter (normally comma separated from the shard.keys request parameter)
|
||||
* and aggregates the slices returned by getSearchSlicesSingle for each shardKey.
|
||||
**/
|
||||
public Collection<Slice> getSearchSlices(String shardKeys, SolrParams params, DocCollection collection) {
|
||||
if (shardKeys == null || shardKeys.indexOf(',') < 0) {
|
||||
return getSearchSlicesSingle(shardKeys, params, collection);
|
||||
}
|
||||
|
||||
List<String> shardKeyList = StrUtils.splitSmart(shardKeys, ",", true);
|
||||
HashSet<Slice> allSlices = new HashSet<Slice>();
|
||||
for (String shardKey : shardKeyList) {
|
||||
allSlices.addAll( getSearchSlicesSingle(shardKey, params, collection) );
|
||||
}
|
||||
return allSlices;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ public abstract class HashBasedRouter extends DocRouter {
|
|||
|
||||
|
||||
@Override
|
||||
public Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection) {
|
||||
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
|
||||
if (shardKey == null) {
|
||||
// search across whole collection
|
||||
// TODO: this may need modification in the future when shard splitting could cause an overlap
|
||||
|
|
|
@ -54,7 +54,7 @@ public class ImplicitDocRouter extends DocRouter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<Slice> getSearchSlices(String shardKey, SolrParams params, DocCollection collection) {
|
||||
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
|
||||
if (shardKey == null) {
|
||||
return collection.getSlices();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue