mirror of https://github.com/apache/lucene.git
SOLR-5308: A new 'migrate' collection API to split all documents with a route key into another collection
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1541832 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2033638560
commit
5d2d0e70f3
|
@ -69,6 +69,12 @@ Upgrading from Solr 4.6.0
|
|||
Detailed Change List
|
||||
----------------------
|
||||
|
||||
New Features
|
||||
----------------------
|
||||
|
||||
* SOLR-5308: A new 'migrate' collection API to split all documents with a
|
||||
route key into another collection (shalin)
|
||||
|
||||
Other Changes
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.solr.common.cloud.DocCollection;
|
|||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.RoutingRule;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
|
@ -53,6 +54,8 @@ public class Overseer {
|
|||
public static final String DELETECORE = "deletecore";
|
||||
public static final String REMOVECOLLECTION = "removecollection";
|
||||
public static final String REMOVESHARD = "removeshard";
|
||||
public static final String ADD_ROUTING_RULE = "addroutingrule";
|
||||
public static final String REMOVE_ROUTING_RULE = "removeroutingrule";
|
||||
|
||||
private static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
|
||||
|
||||
|
@ -228,6 +231,10 @@ public class Overseer {
|
|||
clusterState = updateShardState(clusterState, message);
|
||||
} else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) {
|
||||
clusterState = buildCollection(clusterState, message);
|
||||
} else if (Overseer.ADD_ROUTING_RULE.equals(operation)) {
|
||||
clusterState = addRoutingRule(clusterState, message);
|
||||
} else if (Overseer.REMOVE_ROUTING_RULE.equals(operation)) {
|
||||
clusterState = removeRoutingRule(clusterState, message);
|
||||
} else {
|
||||
throw new RuntimeException("unknown operation:" + operation
|
||||
+ " contents:" + message.getProperties());
|
||||
|
@ -280,6 +287,72 @@ public class Overseer {
|
|||
return clusterState;
|
||||
}
|
||||
|
||||
private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
|
||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
String routeKey = message.getStr("routeKey");
|
||||
String range = message.getStr("range");
|
||||
String targetCollection = message.getStr("targetCollection");
|
||||
String targetShard = message.getStr("targetShard");
|
||||
String expireAt = message.getStr("expireAt");
|
||||
|
||||
Slice slice = clusterState.getSlice(collection, shard);
|
||||
if (slice == null) {
|
||||
throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collection + " slice:" + shard);
|
||||
}
|
||||
|
||||
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
|
||||
if (routingRules == null)
|
||||
routingRules = new HashMap<String, RoutingRule>();
|
||||
RoutingRule r = routingRules.get(routeKey);
|
||||
if (r == null) {
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
map.put("routeRanges", range);
|
||||
map.put("targetCollection", targetCollection);
|
||||
map.put("expireAt", expireAt);
|
||||
RoutingRule rule = new RoutingRule(routeKey, map);
|
||||
routingRules.put(routeKey, rule);
|
||||
} else {
|
||||
// add this range
|
||||
Map<String, Object> map = r.shallowCopy();
|
||||
map.put("routeRanges", map.get("routeRanges") + "," + range);
|
||||
map.put("expireAt", expireAt);
|
||||
routingRules.put(routeKey, new RoutingRule(routeKey, map));
|
||||
}
|
||||
|
||||
Map<String, Object> props = slice.shallowCopy();
|
||||
props.put("routingRules", routingRules);
|
||||
|
||||
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
|
||||
clusterState = updateSlice(clusterState, collection, newSlice);
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
|
||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
String routeKeyStr = message.getStr("routeKey");
|
||||
|
||||
log.info("Overseer.removeRoutingRule invoked for collection: " + collection
|
||||
+ " shard: " + shard + " routeKey: " + routeKeyStr);
|
||||
|
||||
Slice slice = clusterState.getSlice(collection, shard);
|
||||
if (slice == null) {
|
||||
log.warn("Unknown collection: " + collection + " shard: " + shard);
|
||||
return clusterState;
|
||||
}
|
||||
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
|
||||
if (routingRules != null) {
|
||||
routingRules.remove(routeKeyStr); // no rules left
|
||||
Map<String, Object> props = slice.shallowCopy();
|
||||
props.put("routingRules", routingRules);
|
||||
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
|
||||
clusterState = updateSlice(clusterState, collection, newSlice);
|
||||
}
|
||||
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
|
||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||
String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.DocRouter;
|
|||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.PlainIdRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.RoutingRule;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
|
@ -59,8 +60,8 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -106,6 +107,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
public static final String DELETEREPLICA = "deletereplica";
|
||||
|
||||
public static final String MIGRATE = "migrate";
|
||||
|
||||
public static final String COLL_CONF = "collection.configName";
|
||||
|
||||
|
||||
|
@ -243,6 +246,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
deleteShard(zkStateReader.getClusterState(), message, results);
|
||||
} else if (DELETEREPLICA.equals(operation)) {
|
||||
deleteReplica(zkStateReader.getClusterState(), message, results);
|
||||
} else if (MIGRATE.equals(operation)) {
|
||||
migrate(zkStateReader.getClusterState(), message, results);
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
|
||||
+ operation);
|
||||
|
@ -871,18 +876,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
|
||||
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
|
||||
|
||||
log.info("Calling soft commit to make sub shard updates visible");
|
||||
String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
|
||||
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
|
||||
// and we force open a searcher so that we have documents to show upon switching states
|
||||
UpdateResponse updateResponse = null;
|
||||
try {
|
||||
updateResponse = softCommit(coreUrl);
|
||||
processResponse(results, null, coreUrl, updateResponse, slice);
|
||||
} catch (Exception e) {
|
||||
processResponse(results, e, coreUrl, updateResponse, slice);
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
|
||||
}
|
||||
commit(results, slice, parentShardLeader);
|
||||
|
||||
if (repFactor == 1) {
|
||||
// switch sub shard states to 'active'
|
||||
|
@ -919,6 +913,21 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private void commit(NamedList results, String slice, Replica parentShardLeader) {
|
||||
log.info("Calling soft commit to make sub shard updates visible");
|
||||
String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
|
||||
// HttpShardHandler is hard coded to send a QueryRequest hence we go direct
|
||||
// and we force open a searcher so that we have documents to show upon switching states
|
||||
UpdateResponse updateResponse = null;
|
||||
try {
|
||||
updateResponse = softCommit(coreUrl);
|
||||
processResponse(results, null, coreUrl, updateResponse, slice);
|
||||
} catch (Exception e) {
|
||||
processResponse(results, e, coreUrl, updateResponse, slice);
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
|
||||
}
|
||||
}
|
||||
|
||||
static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
|
||||
HttpSolrServer server = null;
|
||||
try {
|
||||
|
@ -1053,6 +1062,218 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private void migrate(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
|
||||
String sourceCollectionName = message.getStr("collection");
|
||||
String splitKey = message.getStr("split.key");
|
||||
String targetCollectionName = message.getStr("target.collection");
|
||||
int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
|
||||
|
||||
DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
|
||||
if (sourceCollection == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
|
||||
}
|
||||
DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
|
||||
if (targetCollection == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
|
||||
}
|
||||
if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
|
||||
}
|
||||
if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
|
||||
}
|
||||
CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
|
||||
CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
|
||||
Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
|
||||
if (sourceSlices.isEmpty()) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
|
||||
}
|
||||
Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
|
||||
if (targetSlices.isEmpty()) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
|
||||
}
|
||||
|
||||
for (Slice sourceSlice : sourceSlices) {
|
||||
for (Slice targetSlice : targetSlices) {
|
||||
log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
|
||||
migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
|
||||
String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
|
||||
if (clusterState.getCollectionStates().containsKey(tempSourceCollectionName)) {
|
||||
log.info("Deleting temporary collection: " + tempSourceCollectionName);
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
QUEUE_OPERATION, DELETECOLLECTION,
|
||||
"name", tempSourceCollectionName);
|
||||
try {
|
||||
deleteCollection(new ZkNodeProps(props), results);
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
|
||||
}
|
||||
}
|
||||
|
||||
CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
|
||||
DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
|
||||
|
||||
log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
|
||||
// intersect source range, keyHashRange and target range
|
||||
// this is the range that has to be split from source and transferred to target
|
||||
DocRouter.Range splitRange = intersect(targetSlice.getRange(), intersect(sourceSlice.getRange(), keyHashRange));
|
||||
if (splitRange == null) {
|
||||
log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
|
||||
return;
|
||||
}
|
||||
log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
|
||||
|
||||
Replica targetLeader = targetSlice.getLeader();
|
||||
|
||||
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
|
||||
+ targetLeader.getStr("core") + " to buffer updates");
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
|
||||
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
|
||||
sendShardRequest(targetLeader.getNodeName(), params);
|
||||
collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
|
||||
COLLECTION_PROP, sourceCollection.getName(),
|
||||
SHARD_ID_PROP, sourceSlice.getName(),
|
||||
"routeKey", splitKey,
|
||||
"range", splitRange.toString(),
|
||||
"targetCollection", targetCollection.getName(),
|
||||
"expireAt", String.valueOf(System.currentTimeMillis() + timeout));
|
||||
log.info("Adding routing rule: " + m);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
|
||||
ZkStateReader.toJSON(m));
|
||||
|
||||
// wait for a while until we see the new rule
|
||||
log.info("Waiting to see routing rule updated in clusterstate");
|
||||
long waitUntil = System.currentTimeMillis() + 60000;
|
||||
boolean added = false;
|
||||
while (System.currentTimeMillis() < waitUntil) {
|
||||
Thread.sleep(100);
|
||||
Map<String, RoutingRule> rules = zkStateReader.getClusterState().getSlice(sourceCollection.getName(), sourceSlice.getName()).getRoutingRules();
|
||||
if (rules != null) {
|
||||
RoutingRule rule = rules.get(splitKey);
|
||||
if (rule.getRouteRanges().contains(splitRange)) {
|
||||
added = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!added) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
|
||||
}
|
||||
|
||||
log.info("Routing rule added successfully");
|
||||
|
||||
// Create temp core on source shard
|
||||
Replica sourceLeader = sourceSlice.getLeader();
|
||||
|
||||
// create a temporary collection with just one node on the shard leader
|
||||
String sourceLeaderUrl = zkStateReader.getZkClient().getBaseUrlForNodeName(sourceLeader.getNodeName());
|
||||
if (sourceLeaderUrl.startsWith("http://")) sourceLeaderUrl = sourceLeaderUrl.substring(7);
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
QUEUE_OPERATION, CREATECOLLECTION,
|
||||
"name", tempSourceCollectionName,
|
||||
REPLICATION_FACTOR, 1,
|
||||
NUM_SLICES, 1,
|
||||
CREATE_NODE_SET, sourceLeader.getNodeName());
|
||||
log.info("Creating temporary collection: " + props);
|
||||
createCollection(clusterState, new ZkNodeProps(props), results);
|
||||
// refresh cluster state
|
||||
clusterState = zkStateReader.getClusterState();
|
||||
Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
|
||||
Replica tempSourceLeader = clusterState.getLeader(tempSourceCollectionName, tempSourceSlice.getName());
|
||||
|
||||
log.info("Asking source leader to split index");
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
|
||||
params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
|
||||
params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
|
||||
params.set(CoreAdminParams.RANGES, splitRange.toString());
|
||||
params.set("split.key", splitKey);
|
||||
|
||||
sendShardRequest(sourceLeader.getNodeName(), params);
|
||||
collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
|
||||
|
||||
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
|
||||
tempSourceCollectionName, targetLeader.getNodeName());
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
|
||||
params.set(CoreAdminParams.NAME, tempCollectionReplica2);
|
||||
params.set(CoreAdminParams.COLLECTION, tempSourceCollectionName);
|
||||
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
|
||||
sendShardRequest(targetLeader.getNodeName(), params);
|
||||
|
||||
String coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
|
||||
zkStateReader.getZkClient().getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
|
||||
// wait for the replicas to be seen as active on temp source leader
|
||||
log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
|
||||
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
|
||||
cmd.setCoreName(tempSourceLeader.getStr("core"));
|
||||
cmd.setNodeName(targetLeader.getNodeName());
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(ZkStateReader.ACTIVE); // todo introduce asynchronous actions
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
|
||||
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
|
||||
|
||||
log.info("Successfully created replica of temp source collection on target leader node");
|
||||
|
||||
log.info("Requesting merge of temp source collection replica to target leader");
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
|
||||
params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
|
||||
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
|
||||
sendShardRequest(targetLeader.getNodeName(), params);
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
|
||||
|
||||
log.info("Asking target leader to apply buffered updates");
|
||||
params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
|
||||
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
|
||||
sendShardRequest(targetLeader.getNodeName(), params);
|
||||
collectShardResponses(results, true,
|
||||
"MIGRATE failed to request node to apply buffered updates");
|
||||
|
||||
try {
|
||||
log.info("Deleting temporary collection: " + tempSourceCollectionName);
|
||||
props = ZkNodeProps.makeMap(
|
||||
QUEUE_OPERATION, DELETECOLLECTION,
|
||||
"name", tempSourceCollectionName);
|
||||
deleteCollection(new ZkNodeProps(props), results);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to delete temporary collection: " + tempSourceCollectionName
|
||||
+ ". Please remove it manually", e);
|
||||
}
|
||||
}
|
||||
|
||||
private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
|
||||
if (a == null || b == null || !a.overlaps(b)) {
|
||||
return null;
|
||||
} else if (a.isSubsetOf(b))
|
||||
return a;
|
||||
else if (b.isSubsetOf(a))
|
||||
return b;
|
||||
else if (b.includes(a.max)) {
|
||||
return new DocRouter.Range(b.min, a.max);
|
||||
} else {
|
||||
return new DocRouter.Range(a.min, b.max);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
|
|
|
@ -164,6 +164,8 @@ public final class SolrCore implements SolrInfoMBean {
|
|||
private IndexReaderFactory indexReaderFactory;
|
||||
private final Codec codec;
|
||||
|
||||
private final ReentrantLock ruleExpiryLock;
|
||||
|
||||
public long getStartTime() { return startTime; }
|
||||
|
||||
static int boolean_query_max_clause_count = Integer.MIN_VALUE;
|
||||
|
@ -646,6 +648,7 @@ public final class SolrCore implements SolrInfoMBean {
|
|||
this.updateProcessorChains = null;
|
||||
this.infoRegistry = null;
|
||||
this.codec = null;
|
||||
this.ruleExpiryLock = null;
|
||||
|
||||
solrCoreState = null;
|
||||
}
|
||||
|
@ -861,6 +864,8 @@ public final class SolrCore implements SolrInfoMBean {
|
|||
// For debugging
|
||||
// numOpens.incrementAndGet();
|
||||
// openHandles.put(this, new RuntimeException("unclosed core - name:" + getName() + " refs: " + refCount.get()));
|
||||
|
||||
ruleExpiryLock = new ReentrantLock();
|
||||
}
|
||||
|
||||
private Codec initCodec(SolrConfig solrConfig, final IndexSchema schema) {
|
||||
|
@ -2215,6 +2220,10 @@ public final class SolrCore implements SolrInfoMBean {
|
|||
return solrDelPolicy;
|
||||
}
|
||||
|
||||
public ReentrantLock getRuleExpiryLock() {
|
||||
return ruleExpiryLock;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// SolrInfoMBean stuff: Statistics and Module Info
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -161,7 +161,8 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
case DELETESHARD: {
|
||||
this.handleDeleteShardAction(req, rsp);
|
||||
break;
|
||||
}case CREATESHARD: {
|
||||
}
|
||||
case CREATESHARD: {
|
||||
this.handleCreateShard(req, rsp);
|
||||
break;
|
||||
}
|
||||
|
@ -169,7 +170,10 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
this.handleRemoveReplica(req, rsp);
|
||||
break;
|
||||
}
|
||||
|
||||
case MIGRATE: {
|
||||
this.handleMigrate(req, rsp);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new RuntimeException("Unknown action: " + action);
|
||||
}
|
||||
|
@ -422,6 +426,16 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp, DEFAULT_ZK_TIMEOUT * 5);
|
||||
}
|
||||
|
||||
private void handleMigrate(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
|
||||
log.info("Migrate action invoked: " + req.getParamString());
|
||||
req.getParams().required().check("collection", "split.key", "target.collection");
|
||||
Map<String,Object> props = new HashMap<String,Object>();
|
||||
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.MIGRATE);
|
||||
copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout");
|
||||
ZkNodeProps m = new ZkNodeProps(props);
|
||||
handleResponse(OverseerCollectionProcessor.MIGRATE, m, rsp, DEFAULT_ZK_TIMEOUT * 20);
|
||||
}
|
||||
|
||||
public static ModifiableSolrParams params(String... params) {
|
||||
ModifiableSolrParams msp = new ModifiableSolrParams();
|
||||
for (int i=0; i<params.length; i+=2) {
|
||||
|
|
|
@ -214,7 +214,10 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
this.handleRequestApplyUpdatesAction(req, rsp);
|
||||
break;
|
||||
}
|
||||
|
||||
case REQUESTBUFFERUPDATES: {
|
||||
this.handleRequestBufferUpdatesAction(req, rsp);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
this.handleCustomAction(req, rsp);
|
||||
break;
|
||||
|
@ -963,6 +966,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
SolrParams params = req.getParams();
|
||||
String cname = params.get(CoreAdminParams.NAME, "");
|
||||
SolrCore core = coreContainer.getCore(cname);
|
||||
log.info("Applying buffered updates on core: " + cname);
|
||||
try {
|
||||
UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
|
||||
if (updateLog.getState() != UpdateLog.State.BUFFERING) {
|
||||
|
@ -999,6 +1003,31 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
|||
|
||||
}
|
||||
|
||||
private void handleRequestBufferUpdatesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
|
||||
SolrParams params = req.getParams();
|
||||
String cname = params.get(CoreAdminParams.NAME, "");
|
||||
SolrCore core = coreContainer.getCore(cname);
|
||||
log.info("Starting to buffer updates on core:" + cname);
|
||||
try {
|
||||
UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
|
||||
if (updateLog.getState() != UpdateLog.State.ACTIVE) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Core " + cname + " not in active state");
|
||||
}
|
||||
updateLog.bufferUpdates();
|
||||
rsp.add("core", cname);
|
||||
rsp.add("status", "BUFFERING");
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof SolrException)
|
||||
throw (SolrException)e;
|
||||
else
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not start buffering updates", e);
|
||||
} finally {
|
||||
if (req != null) req.close();
|
||||
if (core != null)
|
||||
core.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the core status for a particular core.
|
||||
* @param cores - the enclosing core container
|
||||
|
|
|
@ -214,7 +214,7 @@ public class SolrIndexSplitter {
|
|||
return docSets;
|
||||
}
|
||||
|
||||
private String getRouteKey(String idString) {
|
||||
public static String getRouteKey(String idString) {
|
||||
int idx = idString.indexOf(CompositeIdRouter.separator);
|
||||
if (idx <= 0) return null;
|
||||
String part1 = idString.substring(0, idx);
|
||||
|
|
|
@ -24,17 +24,23 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
|||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.DistributedQueue;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.RoutingRule;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
|
@ -45,6 +51,7 @@ import org.apache.solr.common.params.UpdateParams;
|
|||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.component.HttpShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.RealTimeGetComponent;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
|
@ -61,11 +68,13 @@ import org.apache.solr.update.SolrCmdDistributor.Error;
|
|||
import org.apache.solr.update.SolrCmdDistributor.Node;
|
||||
import org.apache.solr.update.SolrCmdDistributor.RetryNode;
|
||||
import org.apache.solr.update.SolrCmdDistributor.StdNode;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
import org.apache.solr.update.UpdateCommand;
|
||||
import org.apache.solr.update.UpdateHandler;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.VersionBucket;
|
||||
import org.apache.solr.update.VersionInfo;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -80,6 +89,8 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
|
||||
|
||||
|
@ -210,7 +221,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
forwardToLeader = false;
|
||||
return nodes;
|
||||
}
|
||||
|
||||
String coreName = req.getCore().getName();
|
||||
|
||||
ClusterState cstate = zkController.getClusterState();
|
||||
|
@ -256,8 +266,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
doDefensiveChecks(phase);
|
||||
|
||||
// if request is coming from another collection then we want it to be sent to all replicas
|
||||
// even if it's phase is FROMLEADER
|
||||
String fromCollection = updateCommand.getReq().getParams().get("distrib.from.collection");
|
||||
|
||||
if (DistribPhase.FROMLEADER == phase && !isSubShardLeader) {
|
||||
if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) {
|
||||
// we are coming from the leader, just go local - add no urls
|
||||
forwardToLeader = false;
|
||||
} else if (isLeader || isSubShardLeader) {
|
||||
|
@ -360,6 +373,88 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
return nodes;
|
||||
}
|
||||
|
||||
private List<Node> getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) {
|
||||
DocRouter router = coll.getRouter();
|
||||
List<Node> nodes = null;
|
||||
if (router instanceof CompositeIdRouter) {
|
||||
CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
|
||||
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
Slice slice = coll.getSlice(myShardId);
|
||||
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
|
||||
if (routingRules != null) {
|
||||
|
||||
// delete by query case
|
||||
if (id == null) {
|
||||
for (Entry<String, RoutingRule> entry : routingRules.entrySet()) {
|
||||
String targetCollectionName = entry.getValue().getTargetCollectionName();
|
||||
Collection<Slice> activeSlices = cstate.getActiveSlices(targetCollectionName);
|
||||
if (activeSlices != null && !activeSlices.isEmpty()) {
|
||||
Slice any = activeSlices.iterator().next();
|
||||
if (nodes == null) nodes = new ArrayList<Node>();
|
||||
nodes.add(new StdNode(new ZkCoreNodeProps(any.getLeader())));
|
||||
}
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
||||
String routeKey = SolrIndexSplitter.getRouteKey(id);
|
||||
if (routeKey != null) {
|
||||
RoutingRule rule = routingRules.get(routeKey + "!");
|
||||
if (rule != null) {
|
||||
if (rule.getExpireAt() >= System.currentTimeMillis()) {
|
||||
List<DocRouter.Range> ranges = rule.getRouteRanges();
|
||||
if (ranges != null && !ranges.isEmpty()) {
|
||||
int hash = compositeIdRouter.sliceHash(id, doc, null, coll);
|
||||
for (DocRouter.Range range : ranges) {
|
||||
if (range.includes(hash)) {
|
||||
if (nodes == null) nodes = new ArrayList<Node>();
|
||||
Collection<Slice> activeSlices = cstate.getActiveSlices(rule.getTargetCollectionName());
|
||||
if (activeSlices == null || activeSlices.isEmpty()) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"No active slices found for target collection: " + rule.getTargetCollectionName());
|
||||
}
|
||||
// it doesn't matter where we forward it so just choose the first one
|
||||
// todo this can be optimized
|
||||
Replica targetLeader = cstate.getLeader(rule.getTargetCollectionName(), activeSlices.iterator().next().getName());
|
||||
nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock();
|
||||
if (!ruleExpiryLock.isLocked()) {
|
||||
try {
|
||||
if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
|
||||
log.info("Going to expire routing rule");
|
||||
try {
|
||||
Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, Overseer.REMOVE_ROUTING_RULE,
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.SHARD_ID_PROP, myShardId,
|
||||
"routeKey", routeKey + "!");
|
||||
SolrZkClient zkClient = req.getCore().getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
|
||||
DistributedQueue queue = Overseer.getInQueue(zkClient);
|
||||
queue.offer(ZkStateReader.toJSON(map));
|
||||
} catch (KeeperException e) {
|
||||
log.warn("Exception while removing routing rule for route key: " + routeKey, e);
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while removing routing rule for route key: " + routeKey, e);
|
||||
} finally {
|
||||
ruleExpiryLock.unlock();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
||||
private void doDefensiveChecks(DistribPhase phase) {
|
||||
boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
|
||||
if (isReplayOrPeersync) return;
|
||||
|
@ -385,10 +480,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
"Request says it is coming from parent shard leader but parent hash range is not superset of my range");
|
||||
}
|
||||
} else {
|
||||
String fromCollection = req.getParams().get("distrib.from.collection"); // is it because of a routing rule?
|
||||
if (fromCollection == null) {
|
||||
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) {
|
||||
log.error("ClusterState says we are the leader, but locally we don't think so");
|
||||
|
@ -468,6 +566,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
|
||||
}
|
||||
}
|
||||
List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument());
|
||||
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
|
||||
zkController.getBaseUrl(), req.getCore().getName()));
|
||||
params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
|
||||
params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
|
||||
for (Node nodesByRoutingRule : nodesByRoutingRules) {
|
||||
cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ModifiableSolrParams params = null;
|
||||
|
@ -635,7 +745,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
|
||||
boolean leaderLogic = isLeader && !isReplayOrPeersync;
|
||||
|
||||
boolean forwardedFromCollection = cmd.getReq().getParams().get("distrib.from.collection") != null;
|
||||
|
||||
VersionBucket bucket = vinfo.bucket(bucketHash);
|
||||
|
||||
|
@ -659,8 +769,26 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
if (leaderLogic) {
|
||||
|
||||
if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
|
||||
// forwarded from a collection but we are not buffering so strip original version and apply our own
|
||||
// see SOLR-5308
|
||||
log.info("Removing version field from doc: " + cmd.getPrintableId());
|
||||
cmd.solrDoc.remove(VERSION_FIELD);
|
||||
versionOnUpdate = 0;
|
||||
}
|
||||
|
||||
boolean updated = getUpdatedDocument(cmd, versionOnUpdate);
|
||||
|
||||
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
|
||||
if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
|
||||
&& (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
// we're not in an active state, and this update isn't from a replay, so buffer it.
|
||||
log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
|
||||
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
|
||||
ulog.add(cmd);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (versionOnUpdate != 0) {
|
||||
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
long foundVersion = lastVersion == null ? -1 : lastVersion;
|
||||
|
@ -867,6 +995,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
params.set("distrib.from.parent", cloudDesc.getShardId());
|
||||
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
|
||||
}
|
||||
|
||||
List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
|
||||
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
|
||||
zkController.getBaseUrl(), req.getCore().getName()));
|
||||
params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
|
||||
params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
|
||||
for (Node nodesByRoutingRule : nodesByRoutingRules) {
|
||||
cmdDistrib.distribDelete(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1090,6 +1231,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
if (subShardLeaders != null) {
|
||||
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
|
||||
}
|
||||
List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
|
||||
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
|
||||
params = new ModifiableSolrParams(filterParams(req.getParams()));
|
||||
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
|
||||
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
|
||||
zkController.getBaseUrl(), req.getCore().getName()));
|
||||
params.set("distrib.from.collection", req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
|
||||
params.set("distrib.from.shard", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
|
||||
cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true);
|
||||
}
|
||||
if (replicas != null) {
|
||||
cmdDistrib.distribDelete(cmd, replicas, params);
|
||||
someReplicas = true;
|
||||
|
@ -1182,6 +1333,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
|
||||
boolean leaderLogic = isLeader && !isReplayOrPeersync;
|
||||
boolean forwardedFromCollection = cmd.getReq().getParams().get("distrib.from.collection") != null;
|
||||
|
||||
if (!leaderLogic && versionOnUpdate==0) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
|
||||
|
@ -1198,6 +1350,23 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
if (leaderLogic) {
|
||||
|
||||
if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
|
||||
// forwarded from a collection but we are not buffering so strip original version and apply our own
|
||||
// see SOLR-5308
|
||||
log.info("Removing version field from doc: " + cmd.getId());
|
||||
versionOnUpdate = signedVersionOnUpdate = 0;
|
||||
}
|
||||
|
||||
// leaders can also be in buffering state during "migrate" API call, see SOLR-5308
|
||||
if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
|
||||
&& (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
// we're not in an active state, and this update isn't from a replay, so buffer it.
|
||||
log.info("Leader logic applied but update log is buffering: " + cmd.getId());
|
||||
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
|
||||
ulog.delete(cmd);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (signedVersionOnUpdate != 0) {
|
||||
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
|
||||
long foundVersion = lastVersion == null ? -1 : lastVersion;
|
||||
|
|
|
@ -0,0 +1,229 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.RoutingRule;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
|
||||
|
||||
public class MigrateRouteKeyTest extends BasicDistributedZkTest {
|
||||
|
||||
public MigrateRouteKeyTest() {
|
||||
schemaString = "schema15.xml"; // we need a string id
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
System.setProperty("numShards", Integer.toString(sliceCount));
|
||||
System.setProperty("solr.xml.persist", "true");
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
|
||||
if (VERBOSE || printLayoutOnTearDown) {
|
||||
super.printLayout();
|
||||
}
|
||||
if (controlClient != null) {
|
||||
controlClient.shutdown();
|
||||
}
|
||||
if (cloudClient != null) {
|
||||
cloudClient.shutdown();
|
||||
}
|
||||
if (controlClientCloud != null) {
|
||||
controlClientCloud.shutdown();
|
||||
}
|
||||
super.tearDown();
|
||||
|
||||
System.clearProperty("zkHost");
|
||||
System.clearProperty("numShards");
|
||||
System.clearProperty("solr.xml.persist");
|
||||
|
||||
// insurance
|
||||
DirectUpdateHandler2.commitOnClose = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
waitForThingsToLevelOut(15);
|
||||
|
||||
final String splitKey = "a!";
|
||||
final int[] splitKeyCount = new int[1];
|
||||
for (int id = 0; id < 26*3; id++) {
|
||||
String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", shardKey + "!" + id);
|
||||
doc.addField("n_ti", id);
|
||||
cloudClient.add(doc);
|
||||
if (splitKey.equals(shardKey + "!"))
|
||||
splitKeyCount[0]++;
|
||||
}
|
||||
assertTrue(splitKeyCount[0] > 0);
|
||||
|
||||
String targetCollection = "migrate_routekey_test_targetCollection";
|
||||
|
||||
HashMap<String, List<Integer>> collectionInfos = new HashMap<String, List<Integer>>();
|
||||
CloudSolrServer client = null;
|
||||
try {
|
||||
client = createCloudClient(null);
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
REPLICATION_FACTOR, 1,
|
||||
MAX_SHARDS_PER_NODE, 5,
|
||||
NUM_SLICES, 1);
|
||||
|
||||
createCollection(collectionInfos, targetCollection, props, client);
|
||||
} finally {
|
||||
if (client != null) client.shutdown();
|
||||
}
|
||||
|
||||
List<Integer> list = collectionInfos.get(targetCollection);
|
||||
checkForCollection(targetCollection, list, null);
|
||||
|
||||
waitForRecoveriesToFinish(targetCollection, false);
|
||||
|
||||
class Indexer extends Thread {
|
||||
final int seconds;
|
||||
|
||||
public Indexer(int seconds) {
|
||||
this.seconds = seconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long start = System.currentTimeMillis();
|
||||
for (int id = 26*3; id < 500 && System.currentTimeMillis() - start <= seconds*1000; id++) {
|
||||
String shardKey = "" + (char) ('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", shardKey + "!" + id);
|
||||
doc.addField("n_ti", id);
|
||||
try {
|
||||
cloudClient.add(doc);
|
||||
if (splitKey.equals(shardKey + "!"))
|
||||
splitKeyCount[0]++;
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while adding document id: " + doc.getField("id"), e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread indexer = new Indexer(30);
|
||||
indexer.start();
|
||||
|
||||
String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), targetCollection);
|
||||
HttpSolrServer collectionClient = new HttpSolrServer(url);
|
||||
|
||||
SolrQuery solrQuery = new SolrQuery("*:*");
|
||||
assertEquals("DocCount on target collection does not match", 0, collectionClient.query(solrQuery).getResults().getNumFound());
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
|
||||
params.set("collection", AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||
params.set("target.collection", targetCollection);
|
||||
params.set("split.key", splitKey);
|
||||
params.set("forward.timeout", 45);
|
||||
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
|
||||
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
|
||||
.getBaseURL();
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
|
||||
|
||||
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
|
||||
baseServer.setConnectionTimeout(15000);
|
||||
baseServer.setSoTimeout(60000 * 5);
|
||||
baseServer.request(request);
|
||||
baseServer.shutdown();
|
||||
long finishTime = System.currentTimeMillis();
|
||||
|
||||
indexer.join();
|
||||
|
||||
try {
|
||||
cloudClient.deleteById("a!104");
|
||||
splitKeyCount[0]--;
|
||||
} catch (Exception e) {
|
||||
log.warn("Error deleting document a!104", e);
|
||||
}
|
||||
cloudClient.commit();
|
||||
collectionClient.commit();
|
||||
|
||||
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
||||
ClusterState state = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
Slice slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
|
||||
assertNotNull("Routing rule map is null", slice.getRoutingRules());
|
||||
assertFalse("Routing rule map is empty", slice.getRoutingRules().isEmpty());
|
||||
assertNotNull("No routing rule exists for route key: " + splitKey, slice.getRoutingRules().get(splitKey));
|
||||
|
||||
boolean ruleRemoved = false;
|
||||
while (System.currentTimeMillis() - finishTime < 60000) {
|
||||
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
||||
state = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
slice = state.getSlice(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD2);
|
||||
Map<String,RoutingRule> routingRules = slice.getRoutingRules();
|
||||
if (routingRules == null || routingRules.isEmpty() || !routingRules.containsKey(splitKey)) {
|
||||
ruleRemoved = true;
|
||||
break;
|
||||
}
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField("id", splitKey + System.currentTimeMillis());
|
||||
cloudClient.add(doc);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
assertTrue("Routing rule was not expired", ruleRemoved);
|
||||
|
||||
solrQuery = new SolrQuery("*:*").setRows(1000);
|
||||
QueryResponse response = collectionClient.query(solrQuery);
|
||||
log.info("Response from target collection: " + response);
|
||||
assertEquals("DocCount on shard1_0 does not match", splitKeyCount[0], response.getResults().getNumFound());
|
||||
|
||||
printLayout();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package org.apache.solr.common.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.noggit.JSONUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Used for routing docs with particular keys into another collection
|
||||
*/
|
||||
public class RoutingRule extends ZkNodeProps {
|
||||
private final List<DocRouter.Range> routeRanges;
|
||||
private final String routeRangesStr;
|
||||
private final String targetCollectionName;
|
||||
private final Long expireAt;
|
||||
|
||||
public RoutingRule(String routeKey, Map<String, Object> propMap) {
|
||||
super(propMap);
|
||||
this.routeRangesStr = (String) propMap.get("routeRanges");
|
||||
String[] rangesArr = this.routeRangesStr.split(",");
|
||||
if (rangesArr != null && rangesArr.length > 0) {
|
||||
this.routeRanges = new ArrayList<DocRouter.Range>();
|
||||
for (String r : rangesArr) {
|
||||
routeRanges.add(DocRouter.DEFAULT.fromString(r));
|
||||
}
|
||||
} else {
|
||||
this.routeRanges = null;
|
||||
}
|
||||
this.targetCollectionName = (String) propMap.get("targetCollection");
|
||||
this.expireAt = Long.parseLong((String) propMap.get("expireAt"));
|
||||
}
|
||||
|
||||
public List<DocRouter.Range> getRouteRanges() {
|
||||
return routeRanges;
|
||||
}
|
||||
|
||||
public String getTargetCollectionName() {
|
||||
return targetCollectionName;
|
||||
}
|
||||
|
||||
public Long getExpireAt() {
|
||||
return expireAt;
|
||||
}
|
||||
|
||||
public String getRouteRangesStr() {
|
||||
return routeRangesStr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JSONUtil.toJSON(propMap, -1);
|
||||
}
|
||||
}
|
|
@ -46,6 +46,7 @@ public class Slice extends ZkNodeProps {
|
|||
private final Replica leader;
|
||||
private final String state;
|
||||
private final String parent;
|
||||
private final Map<String, RoutingRule> routingRules;
|
||||
|
||||
/**
|
||||
* @param name The name of the slice
|
||||
|
@ -89,6 +90,23 @@ public class Slice extends ZkNodeProps {
|
|||
this.replicas = replicas != null ? replicas : makeReplicas((Map<String,Object>)propMap.get(REPLICAS));
|
||||
propMap.put(REPLICAS, this.replicas);
|
||||
|
||||
Map<String, Object> rules = (Map<String, Object>) propMap.get("routingRules");
|
||||
if (rules != null) {
|
||||
this.routingRules = new HashMap<String, RoutingRule>();
|
||||
for (Map.Entry<String, Object> entry : rules.entrySet()) {
|
||||
Object o = entry.getValue();
|
||||
if (o instanceof Map) {
|
||||
Map map = (Map) o;
|
||||
RoutingRule rule = new RoutingRule(entry.getKey(), map);
|
||||
routingRules.put(entry.getKey(), rule);
|
||||
} else {
|
||||
routingRules.put(entry.getKey(), (RoutingRule) o);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.routingRules = null;
|
||||
}
|
||||
|
||||
leader = findLeader();
|
||||
}
|
||||
|
||||
|
@ -162,6 +180,10 @@ public class Slice extends ZkNodeProps {
|
|||
return parent;
|
||||
}
|
||||
|
||||
public Map<String, RoutingRule> getRoutingRules() {
|
||||
return routingRules;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name + ':' + JSONUtil.toJSON(propMap);
|
||||
|
|
|
@ -28,7 +28,7 @@ public interface CollectionParams
|
|||
|
||||
|
||||
public enum CollectionAction {
|
||||
CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD, DELETESHARD, CREATESHARD, DELETEREPLICA;
|
||||
CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD, DELETESHARD, CREATESHARD, DELETEREPLICA, MIGRATE;
|
||||
|
||||
public static CollectionAction get( String p )
|
||||
{
|
||||
|
|
|
@ -124,6 +124,7 @@ public abstract class CoreAdminParams
|
|||
REQUESTSYNCSHARD,
|
||||
CREATEALIAS,
|
||||
DELETEALIAS,
|
||||
REQUESTBUFFERUPDATES,
|
||||
REQUESTAPPLYUPDATES,
|
||||
LOAD_ON_STARTUP,
|
||||
TRANSIENT;
|
||||
|
|
Loading…
Reference in New Issue