SOLR-11464: SOLR-11493: Minor refactorings to DistributedUpdateProcessor

This commit is contained in:
David Smiley 2017-10-18 20:18:54 -04:00
parent a37c4b5ff1
commit 18c8091da5
2 changed files with 114 additions and 128 deletions

View File

@ -77,6 +77,8 @@ Other Changes
method is now called after the SolrCore or CoreContainer has been set for the instance.
(Christine Poerschke)
* SOLR-11464, SOLR-11493: Minor refactorings to DistributedUpdateProcessor. (Gus Heck, David Smiley)
================== 7.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -351,17 +351,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
private List<Node> setupRequest(String id, SolrInputDocument doc, String route) {
List<Node> nodes = null;
// if we are in zk mode...
if (zkEnabled) {
if (!zkEnabled) {
return null;
}
assert TestInjection.injectUpdateRandomPause();
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return nodes;
return null;
}
ClusterState cstate = zkController.getClusterState();
@ -392,17 +392,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
forwardToLeader = false;
return nodes;
return null;
}
}
String shardId = slice.getName();
try {
// Not equivalent to getLeaderProps, which does retries to find a leader.
// Not equivalent to getLeaderProps, which retries to find a leader.
// Replica leader = slice.getLeader();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
isLeader = leaderReplica.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
@ -410,12 +409,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
slice = coll.getSlice(myShardId);
shardId = myShardId;
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
}
}
@ -428,17 +423,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) {
// we are coming from the leader, just go local - add no urls
forwardToLeader = false;
return null;
} else if (isLeader || isSubShardLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
if (replicaProps != null) {
if (nodes == null) {
nodes = new ArrayList<>(replicaProps.size());
if (replicaProps == null) {
return null;
}
// check for test param that lets us miss replicas
String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
Set<String> skipListSet = null;
@ -448,6 +443,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
}
List<Node> nodes = new ArrayList<>(replicaProps.size());
for (ZkCoreNodeProps props : replicaProps) {
if (skipList != null) {
boolean skip = skipListSet.contains(props.getCoreUrl());
@ -459,25 +455,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
nodes.add(new StdNode(props, collection, shardId));
}
}
}
return nodes;
} else {
// I need to forward onto the leader...
nodes = new ArrayList<>(1);
nodes.add(new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId));
forwardToLeader = true;
return Collections.singletonList(
new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
}
}
return nodes;
}
private boolean couldIbeSubShardLeader(DocCollection coll) {
// Could I be the leader of a shard in "construction/recovery" state?
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
@ -768,9 +760,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_PARENT, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node subShardLeader : subShardLeaders) {
cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
}
cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
}
final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument());
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
@ -778,13 +768,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node nodesByRoutingRule : nodesByRoutingRules) {
cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
}
cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true);
}
}
@ -1443,9 +1429,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
zkController.getBaseUrl(), req.getCore().getName()));
params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName());
params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node nodesByRoutingRule : nodesByRoutingRules) {
cmdDistrib.distribDelete(cmd, Collections.singletonList(nodesByRoutingRule), params, true);
}
cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true);
}
}