mirror of https://github.com/apache/lucene.git
SOLR-8728: ReplicaAssigner throws NPE when a partial list of nodes are only participating in replica
placement. splitshard should preassign nodes using rules, if rules are present
This commit is contained in:
parent
e44eebf39d
commit
0cd24c5d08
|
@ -228,6 +228,9 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-8375: ReplicaAssigner rejects valid nodes (Kelvin Tan, noble)
|
* SOLR-8375: ReplicaAssigner rejects valid nodes (Kelvin Tan, noble)
|
||||||
|
|
||||||
|
* SOLR-8728: ReplicaAssigner throws NPE when a partial list of nodes are only participating in replica
|
||||||
|
placement. splitshard should preassign nodes using rules, if rules are present (noble, Shai Erera)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
|
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
|
||||||
|
|
|
@ -1308,13 +1308,18 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
||||||
|
|
||||||
// TODO: change this to handle sharding a slice into > 2 sub-shards.
|
// TODO: change this to handle sharding a slice into > 2 sub-shards.
|
||||||
|
|
||||||
|
|
||||||
|
Map<Position, String> nodeMap = identifyNodes(clusterState,
|
||||||
|
new ArrayList<>(clusterState.getLiveNodes()),
|
||||||
|
new ZkNodeProps(collection.getProperties()),
|
||||||
|
subSlices, repFactor - 1);
|
||||||
|
|
||||||
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
||||||
for (int i = 1; i <= subSlices.size(); i++) {
|
|
||||||
Collections.shuffle(nodeList, RANDOM);
|
for (Map.Entry<Position, String> entry : nodeMap.entrySet()) {
|
||||||
String sliceName = subSlices.get(i - 1);
|
String sliceName = entry.getKey().shard;
|
||||||
for (int j = 2; j <= repFactor; j++) {
|
String subShardNodeName = entry.getValue();
|
||||||
String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
|
String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index);
|
||||||
String shardName = collectionName + "_" + sliceName + "_replica" + (j);
|
|
||||||
|
|
||||||
log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
|
log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
|
||||||
+ collectionName + " on " + subShardNodeName);
|
+ collectionName + " on " + subShardNodeName);
|
||||||
|
@ -1349,7 +1354,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
||||||
|
|
||||||
replicas.add(propMap);
|
replicas.add(propMap);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// we must set the slice state into recovery before actually creating the replica cores
|
// we must set the slice state into recovery before actually creating the replica cores
|
||||||
// this ensures that the logic inside Overseer to update sub-shard state to 'active'
|
// this ensures that the logic inside Overseer to update sub-shard state to 'active'
|
||||||
|
@ -2078,8 +2082,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
||||||
ZkNodeProps message,
|
ZkNodeProps message,
|
||||||
List<String> shardNames,
|
List<String> shardNames,
|
||||||
int repFactor) throws IOException {
|
int repFactor) throws IOException {
|
||||||
List<Map> maps = (List) message.get("rule");
|
List<Map> rulesMap = (List) message.get("rule");
|
||||||
if (maps == null) {
|
if (rulesMap == null) {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
Map<Position, String> result = new HashMap<>();
|
Map<Position, String> result = new HashMap<>();
|
||||||
for (String aShard : shardNames) {
|
for (String aShard : shardNames) {
|
||||||
|
@ -2092,7 +2096,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Rule> rules = new ArrayList<>();
|
List<Rule> rules = new ArrayList<>();
|
||||||
for (Object map : maps) rules.add(new Rule((Map) map));
|
for (Object map : rulesMap) rules.add(new Rule((Map) map));
|
||||||
|
|
||||||
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
||||||
|
|
||||||
|
@ -2159,8 +2163,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
||||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||||
|
|
||||||
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
|
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
|
||||||
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
|
if(node == null) {
|
||||||
overseer.getZkController().getCoreContainer()).get(0).nodeName;
|
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
|
||||||
|
overseer.getZkController().getCoreContainer()).get(0).nodeName;
|
||||||
|
}
|
||||||
log.info("Node not provided, Identified {} for creating new replica", node);
|
log.info("Node not provided, Identified {} for creating new replica", node);
|
||||||
|
|
||||||
if (!clusterState.liveNodesContain(node)) {
|
if (!clusterState.liveNodesContain(node)) {
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class ReplicaAssigner {
|
||||||
Map<String, Integer> shardVsReplicaCount;
|
Map<String, Integer> shardVsReplicaCount;
|
||||||
Map<String, Map<String, Object>> nodeVsTags;
|
Map<String, Map<String, Object>> nodeVsTags;
|
||||||
Map<String, HashMap<String, Integer>> shardVsNodes;
|
Map<String, HashMap<String, Integer>> shardVsNodes;
|
||||||
List<String> liveNodes;
|
List<String> participatingLiveNodes;
|
||||||
Set<String> tagNames = new HashSet<>();
|
Set<String> tagNames = new HashSet<>();
|
||||||
private Map<String, AtomicInteger> nodeVsCores = new HashMap<>();
|
private Map<String, AtomicInteger> nodeVsCores = new HashMap<>();
|
||||||
|
|
||||||
|
@ -93,12 +93,12 @@ public class ReplicaAssigner {
|
||||||
Map<String, Integer> shardVsReplicaCount,
|
Map<String, Integer> shardVsReplicaCount,
|
||||||
List snitches,
|
List snitches,
|
||||||
Map<String, Map<String, Integer>> shardVsNodes,
|
Map<String, Map<String, Integer>> shardVsNodes,
|
||||||
List<String> liveNodes,
|
List<String> participatingLiveNodes,
|
||||||
CoreContainer cc, ClusterState clusterState) {
|
CoreContainer cc, ClusterState clusterState) {
|
||||||
this.rules = rules;
|
this.rules = rules;
|
||||||
for (Rule rule : rules) tagNames.add(rule.tag.name);
|
for (Rule rule : rules) tagNames.add(rule.tag.name);
|
||||||
this.shardVsReplicaCount = shardVsReplicaCount;
|
this.shardVsReplicaCount = shardVsReplicaCount;
|
||||||
this.liveNodes = new ArrayList<>(liveNodes);
|
this.participatingLiveNodes = new ArrayList<>(participatingLiveNodes);
|
||||||
this.nodeVsTags = getTagsForNodes(cc, snitches);
|
this.nodeVsTags = getTagsForNodes(cc, snitches);
|
||||||
this.shardVsNodes = getDeepCopy(shardVsNodes, 2);
|
this.shardVsNodes = getDeepCopy(shardVsNodes, 2);
|
||||||
validateTags(nodeVsTags);
|
validateTags(nodeVsTags);
|
||||||
|
@ -209,7 +209,7 @@ public class ReplicaAssigner {
|
||||||
Map<Position, String> result = new LinkedHashMap<>();
|
Map<Position, String> result = new LinkedHashMap<>();
|
||||||
int startPosition = 0;
|
int startPosition = 0;
|
||||||
Map<String, Map<String, Integer>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
|
Map<String, Map<String, Integer>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
|
||||||
List<String> sortedLiveNodes = new ArrayList<>(this.liveNodes);
|
List<String> sortedLiveNodes = new ArrayList<>(this.participatingLiveNodes);
|
||||||
Collections.sort(sortedLiveNodes, new Comparator<String>() {
|
Collections.sort(sortedLiveNodes, new Comparator<String>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(String n1, String n2) {
|
public int compare(String n1, String n2) {
|
||||||
|
@ -397,7 +397,7 @@ public class ReplicaAssigner {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for (String node : liveNodes) {
|
for (String node : participatingLiveNodes) {
|
||||||
//now use the Snitch to get the tags
|
//now use the Snitch to get the tags
|
||||||
for (SnitchInfoImpl info : snitches.values()) {
|
for (SnitchInfoImpl info : snitches.values()) {
|
||||||
if (!info.myTags.isEmpty()) {
|
if (!info.myTags.isEmpty()) {
|
||||||
|
@ -419,7 +419,7 @@ public class ReplicaAssigner {
|
||||||
String node = e.getKey();
|
String node = e.getKey();
|
||||||
if (context.exception != null) {
|
if (context.exception != null) {
|
||||||
failedNodes.put(node, context);
|
failedNodes.put(node, context);
|
||||||
liveNodes.remove(node);
|
participatingLiveNodes.remove(node);
|
||||||
log.warn("Not all tags were obtained from node " + node);
|
log.warn("Not all tags were obtained from node " + node);
|
||||||
context.exception = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
context.exception = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||||
"Not all tags were obtained from node " + node);
|
"Not all tags were obtained from node " + node);
|
||||||
|
@ -436,7 +436,7 @@ public class ReplicaAssigner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (liveNodes.isEmpty()) {
|
if (participatingLiveNodes.isEmpty()) {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not get all tags for any nodes");
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not get all tags for any nodes");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,7 +163,9 @@ public class Rule {
|
||||||
Map<String,Integer> nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName);
|
Map<String,Integer> nodesInThisShard = shardVsNodeSet.get(shardCondition.val.equals(WILD_WILD_CARD) ? entry.getKey() : shardName);
|
||||||
if (nodesInThisShard != null) {
|
if (nodesInThisShard != null) {
|
||||||
for (Map.Entry<String,Integer> aNode : nodesInThisShard.entrySet()) {
|
for (Map.Entry<String,Integer> aNode : nodesInThisShard.entrySet()) {
|
||||||
Object obj = nodeVsTags.get(aNode.getKey()).get(tag.name);
|
Map<String, Object> tagValues = nodeVsTags.get(aNode.getKey());
|
||||||
|
if(tagValues == null) continue;
|
||||||
|
Object obj = tagValues.get(tag.name);
|
||||||
if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue += aNode.getValue();
|
if (tagCondition.canMatch(obj, phase)) countMatchingThisTagValue += aNode.getValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,9 @@ import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
|
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.common.SolrDocument;
|
import org.apache.solr.common.SolrDocument;
|
||||||
import org.apache.solr.common.cloud.ClusterState;
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
@ -86,7 +88,32 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||||
// and the new sub-shards don't have any.
|
// and the new sub-shards don't have any.
|
||||||
waitForRecoveriesToFinish(true);
|
waitForRecoveriesToFinish(true);
|
||||||
//waitForThingsToLevelOut(15);
|
//waitForThingsToLevelOut(15);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitShardWithRule() throws Exception {
|
||||||
|
waitForThingsToLevelOut(15);
|
||||||
|
|
||||||
|
if (usually()) {
|
||||||
|
log.info("Using legacyCloud=false for cluster");
|
||||||
|
CollectionsAPIDistributedZkTest.setClusterProp(cloudClient, "legacyCloud", "false");
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Starting testSplitShardWithRule");
|
||||||
|
String collectionName = "shardSplitWithRule";
|
||||||
|
CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create()
|
||||||
|
.setCollectionName(collectionName)
|
||||||
|
.setNumShards(1)
|
||||||
|
.setReplicationFactor(2)
|
||||||
|
.setRule("shard:*,replica:<2,node:*");
|
||||||
|
CollectionAdminResponse response = createRequest.process(cloudClient);
|
||||||
|
assertEquals(0, response.getStatus());
|
||||||
|
|
||||||
|
CollectionAdminRequest.SplitShard splitShardRequest = new CollectionAdminRequest.SplitShard()
|
||||||
|
.setCollectionName(collectionName)
|
||||||
|
.setShardName("shard1");
|
||||||
|
response = splitShardRequest.process(cloudClient);
|
||||||
|
assertEquals(String.valueOf(response.getErrorMessages()), 0, response.getStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void incompleteOrOverlappingCustomRangeTest() throws Exception {
|
private void incompleteOrOverlappingCustomRangeTest() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue