mirror of https://github.com/apache/lucene.git
SOLR-10994: CREATE & CREATESHARD to support replica types when using policy
This commit is contained in:
parent
0093015c54
commit
b21b0dcc22
|
@ -17,6 +17,7 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -27,6 +28,7 @@ import java.util.Locale;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -52,8 +54,9 @@ import org.apache.solr.common.util.StrUtils;
|
|||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
|
||||
|
@ -66,6 +69,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P
|
|||
|
||||
|
||||
public class Assign {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static Pattern COUNT = Pattern.compile("core_node(\\d+)");
|
||||
|
||||
public static String assignNode(DocCollection collection) {
|
||||
|
@ -190,11 +195,11 @@ public class Assign {
|
|||
String policyName = message.getStr(POLICY);
|
||||
Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
|
||||
|
||||
if (rulesMap == null && policyName == null) {
|
||||
if (rulesMap == null && policyName == null && autoScalingJson.get(Policy.CLUSTER_POLICY) == null) {
|
||||
log.debug("Identify nodes using default");
|
||||
int i = 0;
|
||||
List<ReplicaPosition> result = new ArrayList<>();
|
||||
for (String aShard : shardNames) {
|
||||
|
||||
for (String aShard : shardNames)
|
||||
for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
|
||||
Replica.Type.TLOG, numTlogReplicas,
|
||||
Replica.Type.PULL, numPullReplicas
|
||||
|
@ -204,20 +209,21 @@ public class Assign {
|
|||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
if (numTlogReplicas + numPullReplicas != 0) {
|
||||
if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
|
||||
}
|
||||
}
|
||||
|
||||
if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
|
||||
if (message.getStr(CREATE_NODE_SET) == null)
|
||||
nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
|
||||
return getPositionsUsingPolicy(collectionName,
|
||||
shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
|
||||
shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, zkStateReader, nodeList);
|
||||
} else {
|
||||
log.debug("Identify nodes using rules framework");
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (Object map : rulesMap) rules.add(new Rule((Map) map));
|
||||
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
||||
|
@ -257,7 +263,7 @@ public class Assign {
|
|||
// Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas
|
||||
// could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
|
||||
public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
|
||||
String shard, int numberOfNodes,
|
||||
String shard, int nrtReplicas,
|
||||
Object createNodeSet, CoreContainer cc) throws KeeperException, InterruptedException {
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
Integer maxShardsPerNode = coll.getInt(MAX_SHARDS_PER_NODE, 1);
|
||||
|
@ -279,22 +285,22 @@ public class Assign {
|
|||
availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes);
|
||||
}
|
||||
}
|
||||
if (availableSlots < numberOfNodes) {
|
||||
if (availableSlots < nrtReplicas) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d",
|
||||
numberOfNodes, collectionName, maxShardsPerNode));
|
||||
nrtReplicas, collectionName, maxShardsPerNode));
|
||||
}
|
||||
}
|
||||
|
||||
List l = (List) coll.get(DocCollection.RULE);
|
||||
List<ReplicaPosition> replicaPositions = null;
|
||||
if (l != null) {
|
||||
replicaPositions = getNodesViaRules(clusterState, shard, numberOfNodes, cc, coll, createNodeList, l);
|
||||
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cc, coll, createNodeList, l);
|
||||
}
|
||||
String policyName = coll.getStr(POLICY);
|
||||
Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
|
||||
if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
|
||||
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
|
||||
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
|
||||
policyName, cc.getZkController().getZkStateReader(), createNodeList);
|
||||
}
|
||||
|
||||
|
@ -312,7 +318,10 @@ public class Assign {
|
|||
|
||||
}
|
||||
|
||||
public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
|
||||
public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames,
|
||||
int nrtReplicas,
|
||||
int tlogReplicas,
|
||||
int pullReplicas,
|
||||
String policyName, ZkStateReader zkStateReader,
|
||||
List<String> nodesList) throws KeeperException, InterruptedException {
|
||||
try (CloudSolrClient csc = new CloudSolrClient.Builder()
|
||||
|
@ -320,17 +329,17 @@ public class Assign {
|
|||
.build()) {
|
||||
SolrClientDataProvider clientDataProvider = new SolrClientDataProvider(csc);
|
||||
Map<String, Object> autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
|
||||
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
|
||||
Map<String, String> kvMap = Collections.singletonMap(collName, policyName);
|
||||
return PolicyHelper.getReplicaLocations(
|
||||
collName,
|
||||
autoScalingJson,
|
||||
clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas, nodesList);
|
||||
List<ReplicaPosition> result = new ArrayList<>();
|
||||
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
|
||||
List<String> value = e.getValue();
|
||||
for (int i = 0; i < value.size(); i++) {
|
||||
result.add(new ReplicaPosition(e.getKey(), i, Replica.Type.NRT, value.get(i)));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
clientDataProvider,
|
||||
kvMap,
|
||||
shardNames,
|
||||
nrtReplicas,
|
||||
tlogReplicas,
|
||||
pullReplicas,
|
||||
nodesList);
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing CloudSolrClient",e);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
@ -131,6 +133,69 @@ public class TestPolicyCloud extends SolrCloudTestCase {
|
|||
assertEquals("Expected exactly three replica of collection on node with port: " + secondNodePort, 3, replicasOnNode2);
|
||||
}
|
||||
|
||||
public void testCreateCollectionAddShardWithReplicaTypeUsingPolicy() throws Exception {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);
|
||||
String nrtNodeName = jetty.getNodeName();
|
||||
int nrtPort = jetty.getLocalPort();
|
||||
|
||||
jetty = cluster.getJettySolrRunners().get(1);
|
||||
String pullNodeName = jetty.getNodeName();
|
||||
int pullPort = jetty.getLocalPort();
|
||||
|
||||
jetty = cluster.getJettySolrRunners().get(2);
|
||||
String tlogNodeName = jetty.getNodeName();
|
||||
int tlogPort = jetty.getLocalPort();
|
||||
log.info("NRT {} PULL {} , TLOG {} ", nrtNodeName, pullNodeName, tlogNodeName);
|
||||
|
||||
String commands = "{set-cluster-policy :[" +
|
||||
"{replica:0 , shard:'#EACH', type: NRT, port: '!" + nrtPort + "'}" +
|
||||
"{replica:0 , shard:'#EACH', type: PULL, port: '!" + pullPort + "'}" +
|
||||
"{replica:0 , shard:'#EACH', type: TLOG, port: '!" + tlogPort + "'}" +
|
||||
"]}";
|
||||
|
||||
|
||||
cluster.getSolrClient().request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
|
||||
Map<String, Object> json = Utils.getJson(cluster.getZkClient(), ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true);
|
||||
assertEquals("full json:" + Utils.toJSONString(json), "!" + nrtPort,
|
||||
Utils.getObjectByPath(json, true, "cluster-policy[0]/port"));
|
||||
assertEquals("full json:" + Utils.toJSONString(json), "!" + pullPort,
|
||||
Utils.getObjectByPath(json, true, "cluster-policy[1]/port"));
|
||||
assertEquals("full json:" + Utils.toJSONString(json), "!" + tlogPort,
|
||||
Utils.getObjectByPath(json, true, "cluster-policy[2]/port"));
|
||||
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1", 1, 1, 1)
|
||||
.setMaxShardsPerNode(5)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
DocCollection coll = getCollectionState("policiesTest");
|
||||
|
||||
|
||||
BiConsumer<String, Replica> verifyReplicas = (s, replica) -> {
|
||||
switch (replica.getType()) {
|
||||
case NRT: {
|
||||
assertTrue("NRT replica should be in " + nrtNodeName, replica.getNodeName().equals(nrtNodeName));
|
||||
break;
|
||||
}
|
||||
case TLOG: {
|
||||
assertTrue("TLOG replica should be in " + tlogNodeName, replica.getNodeName().equals(tlogNodeName));
|
||||
break;
|
||||
}
|
||||
case PULL: {
|
||||
assertTrue("PULL replica should be in " + pullNodeName, replica.getNodeName().equals(pullNodeName));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
coll.forEachReplica(verifyReplicas);
|
||||
|
||||
CollectionAdminRequest.createShard("policiesTest", "s3").
|
||||
process(cluster.getSolrClient());
|
||||
coll = getCollectionState("policiesTest");
|
||||
assertEquals(3, coll.getSlice("s3").getReplicas().size());
|
||||
coll.forEachReplica(verifyReplicas);
|
||||
}
|
||||
|
||||
public void testCreateCollectionAddShardUsingPolicy() throws Exception {
|
||||
JettySolrRunner jetty = cluster.getRandomJetty(random());
|
||||
int port = jetty.getLocalPort();
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -27,20 +28,24 @@ import java.util.Map;
|
|||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.NODE;
|
||||
|
||||
public class PolicyHelper {
|
||||
public static Map<String, List<String>> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
|
||||
ClusterDataProvider cdp,
|
||||
Map<String, String> optionalPolicyMapping,
|
||||
List<String> shardNames,
|
||||
int repFactor,
|
||||
List<String> nodesList) {
|
||||
Map<String, List<String>> positionMapping = new HashMap<>();
|
||||
for (String shardName : shardNames) positionMapping.put(shardName, new ArrayList<>(repFactor));
|
||||
public static List<ReplicaPosition> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
|
||||
ClusterDataProvider cdp,
|
||||
Map<String, String> optionalPolicyMapping,
|
||||
List<String> shardNames,
|
||||
int nrtReplicas,
|
||||
int tlogReplicas,
|
||||
int pullReplicas,
|
||||
List<String> nodesList) {
|
||||
List<ReplicaPosition> positions = new ArrayList<>();
|
||||
if (optionalPolicyMapping != null) {
|
||||
final ClusterDataProvider delegate = cdp;
|
||||
cdp = new ClusterDataProvider() {
|
||||
|
@ -66,31 +71,37 @@ public class PolicyHelper {
|
|||
delegate.getPolicyNameByCollection(coll);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
Policy policy = new Policy(autoScalingJson);
|
||||
Policy.Session session = policy.createSession(cdp);
|
||||
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
|
||||
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
|
||||
typeVsCount.put(Replica.Type.TLOG, tlogReplicas);
|
||||
typeVsCount.put(Replica.Type.PULL, pullReplicas);
|
||||
for (String shardName : shardNames) {
|
||||
for (int i = 0; i < repFactor; i++) {
|
||||
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.COLL, collName)
|
||||
.hint(Hint.SHARD, shardName);
|
||||
if (nodesList != null) {
|
||||
for (String nodeName : nodesList) {
|
||||
suggester = suggester.hint(Hint.TARGET_NODE, nodeName);
|
||||
int idx = 0;
|
||||
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
|
||||
for (int i = 0; i < e.getValue(); i++) {
|
||||
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.COLL, collName)
|
||||
.hint(Hint.REPLICATYPE, e.getKey())
|
||||
.hint(Hint.SHARD, shardName);
|
||||
if (nodesList != null) {
|
||||
for (String nodeName : nodesList) {
|
||||
suggester = suggester.hint(Hint.TARGET_NODE, nodeName);
|
||||
}
|
||||
}
|
||||
SolrRequest op = suggester.getOperation();
|
||||
if (op == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules " + Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));
|
||||
}
|
||||
session = suggester.getSession();
|
||||
positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
|
||||
}
|
||||
SolrRequest op = suggester.getOperation();
|
||||
if (op == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules "+ Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));
|
||||
}
|
||||
session = suggester.getSession();
|
||||
positionMapping.get(shardName).add(op.getParams().get(CoreAdminParams.NODE));
|
||||
}
|
||||
}
|
||||
|
||||
return positionMapping;
|
||||
return positions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.HashMap;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
|
@ -36,6 +37,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
@ -903,11 +905,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
return Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr");
|
||||
}
|
||||
};
|
||||
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
|
||||
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
|
||||
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
|
||||
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, null);
|
||||
assertTrue(locations.get("shard1").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
|
||||
assertTrue(locations.get("shard2").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
|
||||
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, 0, 0, null);
|
||||
|
||||
assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr")) );
|
||||
}
|
||||
|
||||
public void testMultiReplicaPlacement() {
|
||||
|
@ -960,13 +962,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
return Arrays.asList("node1", "node2", "node3", "node4");
|
||||
}
|
||||
};
|
||||
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
|
||||
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
|
||||
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
|
||||
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3, null);
|
||||
assertTrue(locations.get("shard1").containsAll(ImmutableList.of("node2", "node1", "node3")));
|
||||
assertTrue(locations.get("shard2").containsAll(ImmutableList.of("node2", "node1", "node3")));
|
||||
|
||||
|
||||
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3,0,0, null);
|
||||
assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2", "node1", "node3").contains(it.node)) );
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue