SOLR-4078: Allow custom naming of SolrCloud nodes so that a new host:port combination can take over for a previous shard.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1450012 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-02-26 04:09:52 +00:00
parent c1b8dcfb0f
commit d0d2ff9209
23 changed files with 172 additions and 112 deletions

View File

@ -87,6 +87,9 @@ New Features
* SOLR-4481: SwitchQParserPlugin registered by default as 'switch' using
syntax: {!switch case=XXX case.foo=YYY case.bar=ZZZ default=QQQ}foo
(hossman)
* SOLR-4078: Allow custom naming of SolrCloud nodes so that a new host:port
combination can take over for a previous shard. (Mark Miller)
Bug Fixes
----------------------

View File

@ -259,7 +259,7 @@ sb.append("(group_name=").append(tg.getName()).append(")");
private Map<String,Object> getReplicaProps(ZkController zkController, SolrCore core) {
final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
Replica replica = zkController.getClusterState().getReplica(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
Replica replica = zkController.getClusterState().getReplica(collection, zkController.getCoreNodeName(core.getCoreDescriptor()));
if(replica!=null) {
return replica.getProperties();
}

View File

@ -85,6 +85,8 @@ public class JettySolrRunner {
private boolean stopAtShutdown;
private String coreNodeName;
public static class DebugFilter implements Filter {
public int requestsToKeep = 10;
private AtomicLong nRequests = new AtomicLong();
@ -333,6 +335,9 @@ public class JettySolrRunner {
if(shards != null) {
System.setProperty("shard", shards);
}
if (coreNodeName != null) {
System.setProperty("coreNodeName", coreNodeName);
}
if (!server.isRunning()) {
server.start();
@ -349,6 +354,7 @@ public class JettySolrRunner {
System.clearProperty("shard");
System.clearProperty("solr.data.dir");
System.clearProperty("coreNodeName");
}
public void stop() throws Exception {
@ -452,6 +458,10 @@ public class JettySolrRunner {
public void setDataDir(String dataDir) {
this.dataDir = dataDir;
}
public void setCoreNodeName(String coreNodeName) {
this.coreNodeName = coreNodeName;
}
}
class NoLog implements Logger {

View File

@ -25,7 +25,8 @@ public class CloudDescriptor {
private SolrParams params;
private String roles = null;
private Integer numShards;
private String nodeName = null;
volatile boolean isLeader = false;
volatile String lastPublished;
@ -78,5 +79,13 @@ public class CloudDescriptor {
public void setNumShards(int numShards) {
this.numShards = numShards;
}
public String getCoreNodeName() {
return nodeName;
}
public void setCoreNodeName(String nodeName) {
this.nodeName = nodeName;
}
}

View File

@ -94,7 +94,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, "leader",
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties()
@ -343,7 +343,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
cancelElection();
try {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getName());
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
} catch (Throwable t) {
SolrException.log(log, "Error trying to start recovery", t);
}

View File

@ -17,11 +17,14 @@ package org.apache.solr.cloud;
* the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClosableThread;
@ -213,7 +216,11 @@ public class Overseer {
*/
private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final String zkCoreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
if (coreNodeName == null) {
// it must be the default then
coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
}
Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)):null;
log.info("Update state numShards={} message={}", numShards, message);
//collection does not yet exist, create placeholders if num shards is specified
@ -225,9 +232,9 @@ public class Overseer {
// use the provided non null shardId
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
if (sliceName == null) {
String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
//String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
//get shardId from ClusterState
sliceName = getAssignedId(state, nodeName, message);
sliceName = getAssignedId(state, coreNodeName, message);
if (sliceName != null) {
log.info("shard=" + sliceName + " is already registered");
}
@ -249,7 +256,7 @@ public class Overseer {
replicaProps.putAll(message.getProperties());
// System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
if (slice != null) {
Replica oldReplica = slice.getReplicasMap().get(zkCoreNodeName);
Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
if (oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
}
@ -258,9 +265,22 @@ public class Overseer {
// we don't put num_shards in the clusterstate
replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
replicaProps.remove(QUEUE_OPERATION);
// remove any props with null values
Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
List<String> removeKeys = new ArrayList<String>();
for (Entry<String,Object> entry : entrySet) {
if (entry.getValue() == null) {
removeKeys.add(entry.getKey());
}
}
for (String removeKey : removeKeys) {
replicaProps.remove(removeKey);
}
replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
Replica replica = new Replica(zkCoreNodeName, replicaProps);
Replica replica = new Replica(coreNodeName, replicaProps);
// TODO: where do we get slice properties in this message? or should there be a separate create-slice message if we want that?
@ -322,11 +342,10 @@ public class Overseer {
*/
private String getAssignedId(final ClusterState state, final String nodeName,
final ZkNodeProps coreState) {
final String key = coreState.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getStr(ZkStateReader.CORE_NAME_PROP);
Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
if (slices != null) {
for (Slice slice : slices) {
if (slice.getReplicasMap().get(key) != null) {
if (slice.getReplicasMap().get(nodeName) != null) {
return slice.getName();
}
}
@ -424,7 +443,12 @@ public class Overseer {
*/
private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
final String coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
if (cnn == null) {
// it must be the default then
cnn = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
}
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
@ -436,10 +460,10 @@ public class Overseer {
Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
for (Slice slice : coll.getSlices()) {
Replica replica = slice.getReplica(coreNodeName);
Replica replica = slice.getReplica(cnn);
if (replica != null) {
Map<String, Replica> newReplicas = slice.getReplicasCopy();
newReplicas.remove(coreNodeName);
newReplicas.remove(cnn);
// TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
// if (newReplicas.size() == 0 && slice.getRange() == null) {
// if there are no replicas left for the slice remove it

View File

@ -83,15 +83,15 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
private boolean recoveringAfterStartup;
private CoreContainer cc;
public RecoveryStrategy(CoreContainer cc, String name, RecoveryListener recoveryListener) {
public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = name;
this.coreName = cd.getName();
this.recoveryListener = recoveryListener;
setName("RecoveryThread");
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
coreZkNodeName = zkController.getNodeName() + "_" + coreName;
coreZkNodeName = zkController.getCoreNodeName(cd);
}
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {

View File

@ -37,6 +37,7 @@ import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
@ -142,7 +143,7 @@ public class SyncStrategy {
if (success) {
log.info("Sync Success - now sync replicas to me");
syncToMe(zkController, collection, shardId, leaderProps);
syncToMe(zkController, collection, shardId, leaderProps, core.getCoreDescriptor());
} else {
SolrException.log(log, "Sync Failed");
@ -160,7 +161,7 @@ public class SyncStrategy {
ZkNodeProps props, String collection, String shardId) {
List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
.getReplicaProps(collection, shardId,
props.getStr(ZkStateReader.NODE_NAME_PROP),
zkController.getCoreNodeName(core.getCoreDescriptor()),
props.getStr(ZkStateReader.CORE_NAME_PROP));
if (nodes == null) {
@ -181,14 +182,14 @@ public class SyncStrategy {
}
private void syncToMe(ZkController zkController, String collection,
String shardId, ZkNodeProps leaderProps) {
String shardId, ZkNodeProps leaderProps, CoreDescriptor cd) {
// sync everyone else
// TODO: we should do this in parallel at least
List<ZkCoreNodeProps> nodes = zkController
.getZkStateReader()
.getReplicaProps(collection, shardId,
leaderProps.getStr(ZkStateReader.NODE_NAME_PROP),
zkController.getCoreNodeName(cd),
leaderProps.getStr(ZkStateReader.CORE_NAME_PROP));
if (nodes == null) {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas");

View File

@ -258,8 +258,7 @@ public final class ZkController {
// before registering as live, make sure everyone is in a
// down state
for (CoreDescriptor descriptor : descriptors) {
final String coreZkNodeName = getNodeName() + "_"
+ descriptor.getName();
final String coreZkNodeName = getCoreNodeName(descriptor);
try {
descriptor.getCloudDescriptor().isLeader = false;
publish(descriptor, ZkStateReader.DOWN, updateLastPublished);
@ -543,6 +542,7 @@ public final class ZkController {
if (replica.getNodeName().equals(getNodeName())
&& !(replica.getStr(ZkStateReader.STATE_PROP)
.equals(ZkStateReader.DOWN))) {
assert replica.getStr(ZkStateReader.SHARD_ID_PROP) != null;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
@ -554,7 +554,8 @@ public final class ZkController {
ZkStateReader.SHARD_ID_PROP,
replica.getStr(ZkStateReader.SHARD_ID_PROP),
ZkStateReader.COLLECTION_PROP,
replica.getStr(ZkStateReader.COLLECTION_PROP));
replica.getStr(ZkStateReader.COLLECTION_PROP),
ZkStateReader.CORE_NODE_NAME_PROP, replica.getName());
updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
@ -732,7 +733,7 @@ public final class ZkController {
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
final String coreZkNodeName = getNodeName() + "_" + coreName;
final String coreZkNodeName = getCoreNodeName(desc);
String shardId = cloudDesc.getShardId();
@ -915,7 +916,7 @@ public final class ZkController {
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
final String coreZkNodeName = getNodeName() + "_" + cd.getName();
final String coreZkNodeName = getCoreNodeName(cd);
ZkNodeProps ourProps = new ZkNodeProps(props);
String collection = cd.getCloudDescriptor()
.getCollectionName();
@ -950,7 +951,7 @@ public final class ZkController {
if (doRecovery) {
log.info("Core needs to recover:" + core.getName());
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, coreName);
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
} else {
@ -981,6 +982,8 @@ public final class ZkController {
numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
}
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
//assert cd.getCloudDescriptor().getShardId() != null;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, state,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
@ -991,17 +994,19 @@ public final class ZkController {
ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
.getCollectionName(),
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
: null,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName != null ? coreNodeName
: null);
cd.getCloudDescriptor().lastPublished = state;
overseerJobQueue.offer(ZkStateReader.toJSON(m));
}
private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
final ClusterState state, final String shardZkNodeName) {
final ClusterState state, final String coreNodeName) {
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String shardId = state.getShardId(shardZkNodeName);
final String shardId = state.getShardId(coreNodeName);
if (shardId != null) {
cloudDesc.setShardId(shardId);
@ -1010,15 +1015,15 @@ public final class ZkController {
return true;
}
public void unregister(String coreName, CloudDescriptor cloudDesc)
public void unregister(String coreName, CoreDescriptor cd)
throws InterruptedException, KeeperException {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
"deletecore", ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, cloudDesc.getCollectionName());
ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor().getCollectionName());
overseerJobQueue.offer(ZkStateReader.toJSON(m));
final String zkNodeName = getNodeName() + "_" + coreName;
final String zkNodeName = getCoreNodeName(cd);
ElectionContext context = electionContexts.remove(zkNodeName);
if (context != null) {
context.cancelElection();
@ -1190,8 +1195,8 @@ public final class ZkController {
return zkStateReader;
}
private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor) {
final String shardZkNodeName = getNodeName() + "_" + coreName;
private String doGetShardIdProcess(String coreName, CoreDescriptor descriptor) {
final String shardZkNodeName = getCoreNodeName(descriptor);
int retryCount = 320;
while (retryCount-- > 0) {
final String shardId = zkStateReader.getClusterState().getShardId(
@ -1244,9 +1249,15 @@ public final class ZkController {
}
private String getCoreNodeName(CoreDescriptor descriptor){
return getNodeName() + "_"
+ descriptor.getName();
public String getCoreNodeName(CoreDescriptor descriptor){
String coreNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
System.out.println("coreNodeName:" + coreNodeName);
if (coreNodeName == null) {
// it's the default
return getNodeName() + "_" + descriptor.getName();
}
return coreNodeName;
}
public static void uploadConfigDir(SolrZkClient zkClient, File dir, String configName) throws IOException, KeeperException, InterruptedException {
@ -1264,7 +1275,7 @@ public final class ZkController {
String shardZkNodeName = getCoreNodeName(cd);
if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), shardZkNodeName)) {
String shardId;
shardId = doGetShardIdProcess(cd.getName(), cd.getCloudDescriptor());
shardId = doGetShardIdProcess(cd.getName(), cd);
cd.getCloudDescriptor().setShardId(shardId);
}

View File

@ -117,6 +117,7 @@ public class CoreContainer
private static final String CORE_SHARD = "shard";
private static final String CORE_COLLECTION = "collection";
private static final String CORE_ROLES = "roles";
private static final String CORE_NODE_NAME = "coreNodeName";
private static final String CORE_PROPERTIES = "properties";
private static final String CORE_LOADONSTARTUP = "loadOnStartup";
private static final String CORE_TRANSIENT = "transient";
@ -595,6 +596,10 @@ public class CoreContainer
if (opt != null) {
p.getCloudDescriptor().setRoles(opt);
}
opt = DOMUtil.getAttr(node, CORE_NODE_NAME, null);
if (opt != null && opt.length() > 0) {
p.getCloudDescriptor().setCoreNodeName(opt);
}
}
opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
if (opt != null) {

View File

@ -456,6 +456,10 @@ public class CoreAdminHandler extends RequestHandlerBase {
if (opts != null)
cd.setRoles(opts);
opts = params.get(CoreAdminParams.CORE_NODE_NAME);
if (opts != null)
cd.setCoreNodeName(opts);
Integer numShards = params.getInt(ZkStateReader.NUM_SHARDS_PROP);
if (numShards != null)
cd.setNumShards(numShards);
@ -551,7 +555,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
log.info("Unregistering core " + core.getName() + " from cloudstate.");
try {
coreContainer.getZkController().unregister(cname,
core.getCoreDescriptor().getCloudDescriptor());
core.getCoreDescriptor());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@ -761,7 +765,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
SolrException.log(log, "", t);
}
core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, cname);
core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, core.getCoreDescriptor());
} else {
SolrException.log(log, "Cound not find core to call recovery:" + cname);
}

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.RefCounted;
@ -211,7 +212,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
@Override
public void doRecovery(CoreContainer cc, String name) {
public void doRecovery(CoreContainer cc, CoreDescriptor cd) {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
return;
@ -243,7 +244,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
boolean recoveringAfterStartup = recoveryStrat == null;
recoveryStrat = new RecoveryStrategy(cc, name, this);
recoveryStrat = new RecoveryStrategy(cc, cd, this);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
recoveryStrat.start();
recoveryRunning = true;

View File

@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.RefCounted;
@ -111,7 +112,7 @@ public abstract class SolrCoreState {
public void closeWriter(IndexWriter writer) throws IOException;
}
public abstract void doRecovery(CoreContainer cc, String name);
public abstract void doRecovery(CoreContainer cc, CoreDescriptor cd);
public abstract void cancelRecovery();

View File

@ -193,7 +193,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
ClusterState cstate = zkController.getClusterState();
numNodes = cstate.getLiveNodes().size();
@ -217,12 +216,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
try {
// Not equivalent to getLeaderProps, which does retries to find a leader.
// Replica leader = slice.getLeader();
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderRetry(
collection, shardId));
String leaderNodeName = leaderProps.getCoreNodeName();
isLeader = coreNodeName.equals(leaderNodeName);
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(leaderReplica);
String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
isLeader = coreNodeName.equals(leaderReplica.getName());
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
@ -238,7 +236,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// so get the replicas...
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, zkController.getNodeName(),
.getReplicaProps(collection, shardId, coreNodeName,
coreName, null, ZkStateReader.DOWN);
if (replicaProps != null) {
nodes = new ArrayList<Node>(replicaProps.size());
@ -303,21 +301,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String shardId = cloudDesc.getShardId();
try {
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
String leaderCoreNodeName = leaderReplica.getName();
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderRetry(
collection, shardId));
String leaderNodeName = leaderProps.getCoreNodeName();
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
isLeader = coreNodeName.equals(leaderNodeName);
String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
isLeader = coreNodeName.equals(leaderCoreNodeName);
// TODO: what if we are no longer the leader?
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, zkController.getNodeName(),
coreName);
.getReplicaProps(collection, shardId, coreNodeName,
req.getCore().getName());
if (replicaProps != null) {
nodes = new ArrayList<Node>(replicaProps.size());
for (ZkCoreNodeProps props : replicaProps) {
@ -812,10 +808,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// Am I the leader for this slice?
ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
String leaderNodeName = coreLeaderProps.getCoreNodeName();
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
isLeader = coreNodeName.equals(leaderNodeName);
String leaderCoreNodeName = leader.getName();
String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
isLeader = coreNodeName.equals(leaderCoreNodeName);
if (isLeader) {
// don't forward to ourself
@ -1074,11 +1069,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
params.set(COMMIT_END_POINT, true);
String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName();
String shardZkNodeName = nodeName + "_" + req.getCore().getName();
String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName(), shardZkNodeName);
.getCloudDescriptor().getCollectionName(), coreNodeName);
if (nodes != null) {
cmdDistrib.distribCommit(cmd, nodes, params);
@ -1097,7 +1090,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, String coreNodeName) {
ClusterState clusterState = req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getClusterState();
List<Node> urls = new ArrayList<Node>();
@ -1113,7 +1106,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
for (Entry<String,Replica> entry : shardMap.entrySet()) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !entry.getKey().equals(shardZkNodeName)) {
System.out.println("Key:" + entry.getKey() + " cnn:" + coreNodeName);
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !entry.getKey().equals(coreNodeName)) {
urls.add(new StdNode(nodeProps));
}
}

View File

@ -31,13 +31,12 @@
<cores adminPath="/admin/cores" defaultCoreName="collection1" host="127.0.0.1" hostPort="${hostPort:8983}"
hostContext="${hostContext:solr}" zkClientTimeout="${solr.zkclienttimeout:30000}" numShards="${numShards:3}" shareSchema="${shareSchema:false}"
distribUpdateConnTimeout="${distribUpdateConnTimeout:15000}" distribUpdateSoTimeout="${distribUpdateSoTimeout:120000}">
<core name="collection1" instanceDir="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" schema="${schema:schema.xml}"/>
<core name="collection1" instanceDir="collection1" shard="${shard:}" collection="${collection:collection1}" config="${solrconfig:solrconfig.xml}" schema="${schema:schema.xml}"
coreNodeName="${coreNodeName:}"/>
<shardHandlerFactory name="shardHandlerFactory" class="HttpShardHandlerFactory">
<int name="socketTimeout">${socketTimeout:120000}</int>
<int name="connTimeout">${connTimeout:15000}</int>
</shardHandlerFactory>
</cores>
</solr>

View File

@ -56,6 +56,7 @@ public class CoreAdminRequest extends SolrRequest
private Integer numShards;
private String shardId;
private String roles;
private String coreNodeName;
public Create() {
action = CoreAdminAction.CREATE;
@ -70,6 +71,7 @@ public class CoreAdminRequest extends SolrRequest
public void setNumShards(int numShards) {this.numShards = numShards;}
public void setShardId(String shardId) {this.shardId = shardId;}
public void setRoles(String roles) {this.roles = roles;}
public void setCoreNodeName(String coreNodeName) {this.coreNodeName = coreNodeName;}
public String getInstanceDir() { return instanceDir; }
public String getSchemaName() { return schemaName; }
@ -79,6 +81,7 @@ public class CoreAdminRequest extends SolrRequest
public String getCollection() { return collection; }
public String getShardId() { return shardId; }
public String getRoles() { return roles; }
public String getCoreNodeName() { return coreNodeName; }
@Override
public SolrParams getParams() {
@ -117,6 +120,9 @@ public class CoreAdminRequest extends SolrRequest
if (roles != null) {
params.set( CoreAdminParams.ROLES, roles);
}
if (coreNodeName != null) {
params.set( CoreAdminParams.CORE_NODE_NAME, coreNodeName);
}
return params;
}

View File

@ -167,7 +167,7 @@ public class ClusterState implements JSONWriter.Writable {
* @param coreNodeName in the form of nodeName_coreName (the name of the replica)
*/
public String getShardId(String coreNodeName) {
// System.out.println("###### getShardId("+coreNodeName+") in " + collectionStates);
System.out.println("###### getShardId("+coreNodeName+") in " + collectionStates);
for (DocCollection coll : collectionStates.values()) {
for (Slice slice : coll.getSlices()) {
if (slice.getReplicasMap().containsKey(coreNodeName)) return slice.getName();

View File

@ -30,7 +30,6 @@ public class Replica extends ZkNodeProps {
super(propMap);
this.name = name;
nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP);
assert nodeName == null || name.startsWith(nodeName);
}
public String getName() {

View File

@ -62,10 +62,6 @@ public class ZkCoreNodeProps {
return nodeProps.toString();
}
public String getCoreNodeName() {
return getNodeName() + "_" + getCoreName();
}
public ZkNodeProps getNodeProps() {
return nodeProps;
}

View File

@ -51,6 +51,7 @@ public class ZkStateReader {
public static final String BASE_URL_PROP = "base_url";
public static final String NODE_NAME_PROP = "node_name";
public static final String CORE_NODE_NAME_PROP = "core_node_name";
public static final String ROLES_PROP = "roles";
public static final String STATE_PROP = "state";
public static final String CORE_NAME_PROP = "core";
@ -438,26 +439,18 @@ public class ZkStateReader {
: "");
}
/**
* Get CoreNodeName for a core. This name is unique across the collection.
* @param nodeName in form: 127.0.0.1:54065_solr
*/
public static String getCoreNodeName(String nodeName, String coreName) {
return nodeName + "_" + coreName;
}
public List<ZkCoreNodeProps> getReplicaProps(String collection,
String shardId, String thisNodeName, String coreName) {
return getReplicaProps(collection, shardId, thisNodeName, coreName, null);
String shardId, String thisCoreNodeName, String coreName) {
return getReplicaProps(collection, shardId, thisCoreNodeName, coreName, null);
}
public List<ZkCoreNodeProps> getReplicaProps(String collection,
String shardId, String thisNodeName, String coreName, String mustMatchStateFilter) {
return getReplicaProps(collection, shardId, thisNodeName, coreName, mustMatchStateFilter, null);
String shardId, String thisCoreNodeName, String coreName, String mustMatchStateFilter) {
return getReplicaProps(collection, shardId, thisCoreNodeName, coreName, mustMatchStateFilter, null);
}
public List<ZkCoreNodeProps> getReplicaProps(String collection,
String shardId, String thisNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) {
String shardId, String thisCoreNodeName, String coreName, String mustMatchStateFilter, String mustNotMatchStateFilter) {
ClusterState clusterState = this.clusterState;
if (clusterState == null) {
return null;
@ -476,11 +469,12 @@ public class ZkStateReader {
Map<String,Replica> shardMap = replicas.getReplicasMap();
List<ZkCoreNodeProps> nodes = new ArrayList<ZkCoreNodeProps>(shardMap.size());
String filterNodeName = thisNodeName + "_" + coreName;
for (Entry<String,Replica> entry : shardMap.entrySet()) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
String coreNodeName = nodeProps.getNodeName() + "_" + nodeProps.getCoreName();
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(filterNodeName)) {
String coreNodeName = entry.getValue().getName();
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) {
if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) {
nodes.add(nodeProps);

View File

@ -76,6 +76,8 @@ public interface CoreAdminParams
public static final String ROLES = "roles";
public static final String CORE_NODE_NAME = "coreNodeName";
/** Prefix for core property name=value pair **/
public final static String PROPERTY_PREFIX = "property.";

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@ -64,7 +65,10 @@ import org.slf4j.LoggerFactory;
public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
// TODO: this shouldn't be static. get the random when you need it to avoid sharing.
public static Random r;
private AtomicInteger nodeCnt = new AtomicInteger(0);
protected boolean useExplicitNodeNames;
@BeforeClass
public static void initialize() {
assumeFalse("SOLR-4147: ibm 64bit has jvm bugs!", Constants.JRE_IS_64BIT && Constants.JAVA_VENDOR.startsWith("IBM"));
@ -353,10 +357,17 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
}
public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception {
return createJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, useExplicitNodeNames);
}
public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride, boolean explicitCoreNodeName) throws Exception {
JettySolrRunner jetty = new JettySolrRunner(solrHome.getAbsolutePath(), context, 0, solrConfigOverride, schemaOverride);
jetty.setShards(shardList);
jetty.setDataDir(dataDir);
if (explicitCoreNodeName) {
jetty.setCoreNodeName(Integer.toString(nodeCnt.incrementAndGet()));
}
jetty.start();
return jetty;

View File

@ -199,6 +199,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
// TODO: for now, turn off stress because it uses regular clients, and we
// need the cloud client because we kill servers
stress = 0;
useExplicitNodeNames = random().nextBoolean();
}
protected void initCloud() throws Exception {
@ -412,18 +414,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return cnt;
}
public JettySolrRunner createJetty(String dataDir, String shardList,
String solrConfigOverride) throws Exception {
JettySolrRunner jetty = new JettySolrRunner(getSolrHome(), context, 0,
solrConfigOverride, null, false);
jetty.setShards(shardList);
jetty.setDataDir(dataDir);
jetty.start();
return jetty;
}
protected void updateMappingsFromZk(List<JettySolrRunner> jettys,
List<SolrServer> clients) throws Exception {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
@ -444,7 +434,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
int port = new URI(((HttpSolrServer) client).getBaseURL())
.getPort();
if (replica.getName().contains(":" + port + "_")) {
if (replica.getNodeName().contains(":" + port + "_")) {
CloudSolrServerClient csc = new CloudSolrServerClient();
csc.solrClient = client;
csc.port = port;
@ -468,7 +458,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
nextJetty:
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (replica.getName().contains(":" + port + "_")) {
if (replica.getNodeName().contains(":" + port + "_")) {
List<CloudJettyRunner> list = shardToJetty.get(slice.getName());
if (list == null) {
list = new ArrayList<CloudJettyRunner>();
@ -501,7 +491,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
List<CloudJettyRunner> jetties = shardToJetty.get(slice.getName());
assertNotNull("Test setup problem: We found no jetties for shard: " + slice.getName()
+ " just:" + shardToJetty.keySet(), jetties);
assertEquals(slice.getReplicas().size(), jetties.size());
assertEquals("slice:" + slice.getName(), slice.getReplicas().size(), jetties.size());
}
}
@ -511,7 +501,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return client;
}
}
throw new IllegalArgumentException("Client with the give port does not exist:" + port);
throw new IllegalArgumentException("Client with the given port does not exist:" + port);
}
@Override