SOLR-10419: All collection APIs should use the new Policy framework for replica placement

This commit is contained in:
Shalin Shekhar Mangar 2017-06-06 09:22:38 +05:30
parent 744d1ab974
commit b47572ee87
10 changed files with 102 additions and 14 deletions

View File

@ -208,6 +208,8 @@ Other Changes
* SOLR-10782: Improve error handling and tests for Snitch and subclasses and general cleanups. (Noble Paul, shalin)
* SOLR-10419: All collection APIs should use the new Policy framework for replica placement. (Noble Paul, shalin)
================== 6.7.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -199,7 +199,7 @@ public class Assign {
Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
positions= Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
policyName, cc.getZkController().getZkStateReader());
policyName, cc.getZkController().getZkStateReader(), createNodeList);
}
if(positions != null){
@ -216,7 +216,8 @@ public class Assign {
}
public static Map<ReplicaAssigner.Position, String> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
String policyName, ZkStateReader zkStateReader) throws KeeperException, InterruptedException {
String policyName, ZkStateReader zkStateReader,
List<String> nodesList) throws KeeperException, InterruptedException {
try (CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
@ -224,7 +225,7 @@ public class Assign {
Map<String, Object> autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
autoScalingJson,
clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas);
clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas, nodesList);
Map<ReplicaAssigner.Position, String> result = new HashMap<>();
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
List<String> value = e.getValue();

View File

@ -162,7 +162,7 @@ public class CreateCollectionCmd implements Cmd {
+ " shards to be created (higher than the allowed number)");
}
positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
}
ZkStateReader zkStateReader = ocmh.zkStateReader;

View File

@ -706,6 +706,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
Map<Position, String> identifyNodes(ClusterState clusterState,
List<String> nodeList,
String collectionName,
ZkNodeProps message,
List<String> shardNames,
int numNrtReplicas,
@ -741,8 +742,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
return Assign.getPositionsUsingPolicy(message.getStr(COLLECTION_PROP, message.getStr(NAME)),
shardNames, numNrtReplicas, policyName, zkStateReader);
return Assign.getPositionsUsingPolicy(collectionName,
shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
} else {
List<Rule> rules = new ArrayList<>();

View File

@ -214,7 +214,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList,
message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
restoreCollectionName, message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {

View File

@ -381,9 +381,9 @@ public class SplitShardCmd implements Cmd {
// TODO: change this to handle sharding a slice into > 2 sub-shards.
Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
new ArrayList<>(clusterState.getLiveNodes()),
collectionName,
new ZkNodeProps(collection.getProperties()),
subSlices, repFactor - 1, 0, 0);

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
@ -31,7 +32,9 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
@ -61,6 +64,72 @@ public class TestPolicyCloud extends SolrCloudTestCase {
"{}".getBytes(StandardCharsets.UTF_8), true);
}
public void testCreateCollectionAddReplica() throws Exception {
JettySolrRunner jetty = cluster.getRandomJetty(random());
int port = jetty.getLocalPort();
String commands = "{set-policy :{c1 : [{replica:2 , shard:'#EACH', port: '" + port + "'}]}}";
cluster.getSolrClient().request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
String collectionName = "testCreateCollectionAddReplica";
CollectionAdminRequest.createCollection(collectionName, 1, 1)
.setPolicy("c1")
.process(cluster.getSolrClient());
getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(jetty.getNodeName(), replica.getNodeName()));
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1").process(cluster.getSolrClient());
waitForState("Timed out waiting to see 2 replicas for collection: " + collectionName,
collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(jetty.getNodeName(), replica.getNodeName()));
}
public void testCreateCollectionSplitShard() throws Exception {
JettySolrRunner firstNode = cluster.getRandomJetty(random());
int firstNodePort = firstNode.getLocalPort();
JettySolrRunner secondNode = null;
while (true) {
secondNode = cluster.getRandomJetty(random());
if (secondNode.getLocalPort() != firstNodePort) break;
}
int secondNodePort = secondNode.getLocalPort();
String commands = "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: '" + firstNodePort + "'}, {replica:1, shard:'#EACH', port:'" + secondNodePort + "'}]}}";
NamedList<Object> response = cluster.getSolrClient().request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
assertEquals("success", response.get("result"));
String collectionName = "testCreateCollectionSplitShard";
CollectionAdminRequest.createCollection(collectionName, 1, 2)
.setPolicy("c1")
.setMaxShardsPerNode(10)
.process(cluster.getSolrClient());
DocCollection docCollection = getCollectionState(collectionName);
List<Replica> list = docCollection.getReplicas(firstNode.getNodeName());
int replicasOnNode1 = list != null ? list.size() : 0;
list = docCollection.getReplicas(secondNode.getNodeName());
int replicasOnNode2 = list != null ? list.size() : 0;
assertEquals("Expected exactly one replica of collection on node with port: " + firstNodePort, 1, replicasOnNode1);
assertEquals("Expected exactly one replica of collection on node with port: " + secondNodePort, 1, replicasOnNode2);
CollectionAdminRequest.splitShard(collectionName).setShardName("shard1").process(cluster.getSolrClient());
waitForState("Timed out waiting to see 6 replicas for collection: " + collectionName,
collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 6);
docCollection = getCollectionState(collectionName);
list = docCollection.getReplicas(firstNode.getNodeName());
replicasOnNode1 = list != null ? list.size() : 0;
list = docCollection.getReplicas(secondNode.getNodeName());
replicasOnNode2 = list != null ? list.size() : 0;
assertEquals("Expected exactly three replica of collection on node with port: " + firstNodePort, 3, replicasOnNode1);
assertEquals("Expected exactly three replica of collection on node with port: " + secondNodePort, 3, replicasOnNode2);
}
public void testCreateCollectionAddShardUsingPolicy() throws Exception {
JettySolrRunner jetty = cluster.getRandomJetty(random());
int port = jetty.getLocalPort();

View File

@ -348,7 +348,11 @@ public class Policy implements MapWriter {
}
public Suggester hint(Hint hint, Object value) {
if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE) {
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).add(value);
} else {
hints.put(hint, value);
}
return this;
}
@ -461,8 +465,13 @@ public class Policy implements MapWriter {
protected boolean isAllowed(Object v, Hint hint) {
Object hintVal = hints.get(hint);
if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE) {
Set set = (Set) hintVal;
return set == null || set.contains(v);
} else {
return hintVal == null || Objects.equals(v, hintVal);
}
}
public enum Hint {
COLL, SHARD, SRC_NODE, TARGET_NODE

View File

@ -25,10 +25,10 @@ import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@ -37,7 +37,8 @@ public class PolicyHelper {
ClusterDataProvider cdp,
Map<String, String> optionalPolicyMapping,
List<String> shardNames,
int repFactor) {
int repFactor,
List<String> nodesList) {
Map<String, List<String>> positionMapping = new HashMap<>();
for (String shardName : shardNames) positionMapping.put(shardName, new ArrayList<>(repFactor));
if (optionalPolicyMapping != null) {
@ -76,6 +77,11 @@ public class PolicyHelper {
Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL, collName)
.hint(Hint.SHARD, shardName);
if (nodesList != null) {
for (String nodeName : nodesList) {
suggester = suggester.hint(Hint.TARGET_NODE, nodeName);
}
}
SolrRequest op = suggester.getOperation();
if (op == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules "+ Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));

View File

@ -586,7 +586,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
};
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1);
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, null);
assertTrue(locations.get("shard1").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
assertTrue(locations.get("shard2").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
}
@ -643,7 +643,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
};
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3);
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3, null);
assertTrue(locations.get("shard1").containsAll(ImmutableList.of("node2", "node1", "node3")));
assertTrue(locations.get("shard2").containsAll(ImmutableList.of("node2", "node1", "node3")));