SOLR-10419: added testcase for create collection using policy

This commit is contained in:
Noble Paul 2017-05-31 22:10:54 +09:30
parent 217a5002e1
commit bf8057dc89
9 changed files with 178 additions and 80 deletions

View File

@ -16,51 +16,6 @@
*/
package org.apache.solr.cloud;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.Utils.makeMap;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@ -78,14 +33,20 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.autoscaling.Policy;
import org.apache.solr.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
@ -104,6 +65,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
@ -124,7 +86,23 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import static java.util.Collections.singletonMap;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.Utils.makeMap;
/**
* A {@link OverseerMessageHandler} that handles Collections API related
@ -736,9 +714,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
List<String> shardNames,
int numNrtReplicas,
int numTlogReplicas,
int numPullReplicas) throws IOException {
int numPullReplicas) throws IOException, KeeperException, InterruptedException {
List<Map> rulesMap = (List) message.get("rule");
if (rulesMap == null) {
String policyName = message.getStr("policy");
Map autoSalingJson = zkStateReader.getZkClient().getJson(SOLR_AUTOSCALING_CONF_PATH, true);
autoSalingJson = autoSalingJson == null ? Collections.EMPTY_MAP : autoSalingJson;
if (rulesMap == null && policyName == null) {
int i = 0;
Map<Position, String> result = new HashMap<>();
for (String aShard : shardNames) {
@ -758,26 +740,47 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
return result;
} else {
if (numTlogReplicas + numPullReplicas != 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules");
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
}
}
List<Rule> rules = new ArrayList<>();
for (Object map : rulesMap) rules.add(new Rule((Map) map));
if (policyName != null || autoSalingJson.get(Policy.CLUSTER_POLICY) != null) {
String collName = message.getStr(COLLECTION_PROP, message.getStr(NAME));
try (CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
SolrClientDataProvider clientDataProvider = new SolrClientDataProvider(csc);
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
zkStateReader.getZkClient().getJson(SOLR_AUTOSCALING_CONF_PATH, true),
clientDataProvider, singletonMap(collName, policyName), shardNames, numNrtReplicas);
Map<Position, String> result = new HashMap<>();
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
List<String> value = e.getValue();
for (int i = 0; i < value.size(); i++) {
result.put(new Position(e.getKey(), i, Replica.Type.NRT), value.get(i));
}
}
return result;
}
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
} else {
List<Rule> rules = new ArrayList<>();
for (Object map : rulesMap) rules.add(new Rule((Map) map));
for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
sharVsReplicaCount,
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
overseer.getZkController().getCoreContainer(),
clusterState);
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
return replicaAssigner.getNodeMappings();
for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
sharVsReplicaCount,
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
overseer.getZkController().getCoreContainer(),
clusterState);
return replicaAssigner.getNodeMappings();
}
}
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {

View File

@ -47,6 +47,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.autoscaling.Policy;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
@ -109,6 +110,7 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOW
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
import static org.apache.solr.cloud.autoscaling.Policy.POLICY;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import static org.apache.solr.common.cloud.DocCollection.RULE;
@ -402,7 +404,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
SNITCH,
PULL_REPLICAS,
TLOG_REPLICAS,
NRT_REPLICAS);
NRT_REPLICAS,
POLICY);
if (props.get(STATE_FORMAT) == null) {
props.put(STATE_FORMAT, "2");

View File

@ -22,23 +22,29 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -297,6 +303,24 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
}
}
public void testCreateCollectionPolicy() throws Exception {
JettySolrRunner jetty = cluster.getRandomJetty(random());
int port = jetty.getLocalPort();
String commands = "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: 'REPLACEPORT'}]}}".replace("REPLACEPORT",String.valueOf(port));
Utils.fromJSONString(commands);
cluster.getSolrClient().request(createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
Map<String, Object> json = cluster.getZkClient().getJson(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true);
assertEquals("full json:"+ Utils.toJSONString(json) , "#EACH",
Utils.getObjectByPath(json, true, "/policies/c1[0]/shard"));
CollectionAdminRequest.createCollection("policiesTest",2, 1)
.setPolicy("c1")
.process(cluster.getSolrClient());
DocCollection coll = getCollectionState("policiesTest");
coll.forEachReplica((s, replica) -> assertEquals(jetty.getNodeName(), replica.getNodeName()));
}
static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String message) {
return createAutoScalingRequest(m, null, message);
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.solr.client.solrj.request;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@ -48,6 +46,8 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import static org.apache.solr.cloud.autoscaling.Policy.POLICY;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM;
import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
@ -391,6 +391,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected String configName = null;
protected String createNodeSet = null;
protected String routerName;
protected String policy;
protected String shards;
protected String routerField;
protected Integer numShards;
@ -561,9 +562,14 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
if(rule != null) params.set("rule", rule);
if(snitch != null) params.set("snitch", snitch);
params.setNonNull(POLICY, policy);
return params;
}
public Create setPolicy(String policy) {
this.policy = policy;
return this;
}
}
/**

View File

@ -126,6 +126,11 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
}
void addTags(List<String> params) {
if (globalTag != null && !params.contains(globalTag.name)) params.add(globalTag.name);
if (tag != null && !params.contains(tag.name)) params.add(tag.name);
}
static class Condition {
final String name;
final Object val;

View File

@ -30,6 +30,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -56,6 +58,7 @@ import static java.util.stream.Collectors.toList;
*
*/
public class Policy implements MapWriter {
public static final String POLICY = "policy";
public static final String EACH = "#EACH";
public static final String ANY = "#ANY";
public static final String CLUSTER_POLICY = "cluster-policy";
@ -79,13 +82,28 @@ public class Policy implements MapWriter {
if (clusterPreferences.isEmpty()) {
clusterPreferences.add(new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}")));
}
for (Preference preference : clusterPreferences) {
if (params.contains(preference.name.name())) {
throw new RuntimeException(preference.name + " is repeated");
}
params.add(preference.name.toString());
preference.idx = params.size() - 1;
}
clusterPolicy = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_POLICY, emptyList())).stream()
.map(Clause::new)
.filter(clause -> {
clause.addTags(params);
return true;
})
.collect(Collectors.toList());
((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault("policies", emptyMap())).forEach((s, l1) ->
this.policies.put(s, l1.stream()
.map(Clause::new)
.filter(clause -> {
clause.addTags(params);
return true;
})
.sorted()
.collect(toList())));
@ -96,13 +114,14 @@ public class Policy implements MapWriter {
}
});
for (Preference preference : clusterPreferences) {
if (params.contains(preference.name.name())) {
throw new RuntimeException(preference.name + " is repeated");
clusterPolicy.stream().forEach(new Consumer<Clause>() {
@Override
public void accept(Clause clause) {
clause.addTags(params);
}
params.add(preference.name.toString());
preference.idx = params.size() - 1;
}
});
}
public List<Clause> getClusterPolicy() {

View File

@ -21,10 +21,12 @@ package org.apache.solr.cloud.autoscaling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Utils;
@ -87,12 +89,4 @@ public class PolicyHelper {
return positionMapping;
}
public List<Map> addNode(Map<String, Object> autoScalingJson, String node, ClusterDataProvider cdp) {
//todo
return null;
}
}

View File

@ -76,6 +76,10 @@ public class ModifiableSolrParams extends SolrParams
}
public ModifiableSolrParams setNonNull(String name, Object val) {
if (val != null) set(name, String.valueOf(val));
return this;
}
//----------------------------------------------------------------
//----------------------------------------------------------------

View File

@ -486,6 +486,47 @@ public class TestPolicy extends SolrTestCaseJ4 {
};
}
public void testEmptyClusterState(){
String autoScaleJson = " {'policies':{'c1':[{" +
" 'replica':1," +
" 'shard':'#EACH'," +
" 'port':'50096'}]}}";
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
" '127.0.0.1:50097_solr':{" +
" 'cores':0," +
" 'port':'50097'}," +
" '127.0.0.1:50096_solr':{" +
" 'cores':0," +
" 'port':'50096'}}");
ClusterDataProvider dataProvider = new ClusterDataProvider() {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterState);
}
@Override
public String getPolicyNameByCollection(String coll) {
return null;
}
@Override
public Collection<String> getNodes() {
return Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr");
}
};
Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
"newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1);
assertTrue(locations.get("shard1").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
assertTrue(locations.get("shard2").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
}
public void testMultiReplicaPlacement() {
String autoScaleJson = "{" +
@ -502,7 +543,6 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'policy1': [" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" { 'replica': '<2', 'shard': '#EACH', 'rack': 'rack1'}" +
// " { 'replica': '1', 'sysprop.fs': 'ssd', 'shard': '#EACH'}" +
" ]" +
" }" +
"}";