mirror of https://github.com/apache/lucene.git
SOLR-10278: test added for client data provider
This commit is contained in:
parent
c3627ad425
commit
f31546f6e6
|
@ -37,14 +37,19 @@ import com.google.common.collect.ImmutableMap;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.ClientDataProvider;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
|
||||
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner;
|
||||
import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.recipe.Policy;
|
||||
import org.apache.solr.cloud.rule.Rule;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
|
@ -79,6 +84,7 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
|
@ -90,6 +96,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
|
|||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
|
||||
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
|
@ -699,9 +706,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
List<String> nodeList,
|
||||
ZkNodeProps message,
|
||||
List<String> shardNames,
|
||||
int repFactor) throws IOException {
|
||||
int repFactor) throws IOException, KeeperException, InterruptedException {
|
||||
List<Map> rulesMap = (List) message.get("rule");
|
||||
if (rulesMap == null) {
|
||||
String policyName = message.getStr("policy");
|
||||
if (rulesMap == null && policyName == null) {
|
||||
int i = 0;
|
||||
Map<Position, String> result = new HashMap<>();
|
||||
for (String aShard : shardNames) {
|
||||
|
@ -713,23 +721,46 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
return result;
|
||||
}
|
||||
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (Object map : rulesMap) rules.add(new Rule((Map) map));
|
||||
if (policyName != null) {
|
||||
String collName = message.getStr(CommonParams.NAME, "coll_" + System.nanoTime());
|
||||
try(CloudSolrClient csc = new CloudSolrClient.Builder()
|
||||
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
|
||||
.build()) {
|
||||
ClientDataProvider clientDataProvider = new ClientDataProvider(csc);
|
||||
Map<String, List<String>> locations = Policy.getReplicaLocations(collName,
|
||||
zkStateReader.getZkClient().getJson(SOLR_AUTOSCALING_CONF_PATH, true),
|
||||
policyName, clientDataProvider, shardNames, repFactor);
|
||||
Map<Position, String> result = new HashMap<>();
|
||||
for (Map.Entry<String, List<String>> e : locations.entrySet()) {
|
||||
List<String> value = e.getValue();
|
||||
for ( int i = 0; i < value.size(); i++) {
|
||||
result.put(new Position(e.getKey(),i), value.get(i));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
||||
} else {
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (Object map : rulesMap) rules.add(new Rule((Map) map));
|
||||
|
||||
for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
|
||||
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
|
||||
sharVsReplicaCount,
|
||||
(List<Map>) message.get(SNITCH),
|
||||
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
|
||||
nodeList,
|
||||
overseer.getZkController().getCoreContainer(),
|
||||
clusterState);
|
||||
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
|
||||
|
||||
return replicaAssigner.getNodeMappings();
|
||||
for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
|
||||
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
|
||||
sharVsReplicaCount,
|
||||
(List<Map>) message.get(SNITCH),
|
||||
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
|
||||
nodeList,
|
||||
overseer.getZkController().getCoreContainer(),
|
||||
clusterState);
|
||||
|
||||
return replicaAssigner.getNodeMappings();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
|
||||
Map<String, Replica> result = new HashMap<>();
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
|
||||
|
@ -1013,7 +1044,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
lockSession = lockTree.getSession();
|
||||
}
|
||||
return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
|
||||
Arrays.asList(
|
||||
asList(
|
||||
getTaskKey(message),
|
||||
message.getStr(ZkStateReader.SHARD_ID_PROP),
|
||||
message.getStr(ZkStateReader.REPLICA_PROP))
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.policy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.ClientDataProvider;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@LuceneTestCase.Slow
|
||||
public class TestPolicyCloud extends SolrCloudTestCase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(5)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@After
|
||||
public void removeCollections() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
|
||||
public void testDataProvider() throws IOException, SolrServerException {
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
|
||||
.process(cluster.getSolrClient());
|
||||
DocCollection rulesCollection = getCollectionState("policiesTest");
|
||||
ClientDataProvider provider = new ClientDataProvider(cluster.getSolrClient());
|
||||
|
||||
Map<String, Object> val = provider.getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList("freedisk", "cores"));
|
||||
assertTrue(((Number) val.get("cores")).intValue() > 0);
|
||||
assertTrue("freedisk value is "+((Number) val.get("freedisk")).intValue() , ((Number) val.get("freedisk")).intValue() > 0);
|
||||
System.out.println(Utils.toJSONString(val));
|
||||
}
|
||||
|
||||
/*public void testMultiReplicaPlacement() {
|
||||
String autoScaleJson ="";
|
||||
|
||||
|
||||
Map<String,Map> nodeValues = (Map<String, Map>) Utils.fromJSONString( "{" +
|
||||
"node1:{cores:12, freedisk: 334, heap:10480}," +
|
||||
"node2:{cores:4, freedisk: 749, heap:6873}," +
|
||||
"node3:{cores:7, freedisk: 262, heap:7834}," +
|
||||
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer}" +
|
||||
"}");
|
||||
|
||||
ClusterDataProvider dataProvider = new ClusterDataProvider() {
|
||||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getNodes() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Map<String, List<String>> locations = Policy.getReplicaLocations("newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
|
||||
"policy1", dataProvider, Arrays.asList("shard1", "shard2"), 3);
|
||||
|
||||
|
||||
}*/
|
||||
|
||||
|
||||
}
|
|
@ -16,18 +16,14 @@
|
|||
*/
|
||||
package org.apache.solr.cloud.rule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.ClientDataProvider;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.GenericSolrRequest;
|
||||
|
@ -35,7 +31,6 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
|
|||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -68,18 +63,6 @@ public class RulesTest extends SolrCloudTestCase {
|
|||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
public void testDataProvider() throws IOException, SolrServerException {
|
||||
CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
|
||||
.process(cluster.getSolrClient());
|
||||
DocCollection rulesCollection = getCollectionState("policiesTest");
|
||||
ClientDataProvider provider = new ClientDataProvider(cluster.getSolrClient());
|
||||
|
||||
Map<String, Object> val = provider.getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList("freedisk", "cores"));
|
||||
assertTrue(((Number)val.get("cores")).intValue() > 0 );
|
||||
assertTrue(((Number)val.get("freedisk")).intValue() > 0 );
|
||||
System.out.println(Utils.toJSONString(val));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doIntegrationTest() throws Exception {
|
||||
final long minGB = (random().nextBoolean() ? 1 : 0);
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.apache.solr.common.params.SolrParams;
|
|||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.recipe.Policy.ClusterDataProvider;
|
||||
import org.apache.solr.recipe.ClusterDataProvider;
|
||||
import org.apache.solr.recipe.Policy.ReplicaInfo;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
|
|
|
@ -39,10 +39,16 @@ public class ZkClientClusterStateProvider implements CloudSolrClient.ClusterStat
|
|||
|
||||
|
||||
ZkStateReader zkStateReader;
|
||||
private boolean closeZkStateReader = true;
|
||||
String zkHost;
|
||||
int zkConnectTimeout = 10000;
|
||||
int zkClientTimeout = 10000;
|
||||
|
||||
public ZkClientClusterStateProvider(ZkStateReader zkStateReader) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
this.closeZkStateReader = false;
|
||||
}
|
||||
|
||||
public ZkClientClusterStateProvider(Collection<String> zkHosts, String chroot) {
|
||||
zkHost = buildZkHostString(zkHosts,chroot);
|
||||
}
|
||||
|
@ -144,7 +150,7 @@ public class ZkClientClusterStateProvider implements CloudSolrClient.ClusterStat
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (zkStateReader != null) {
|
||||
if (zkStateReader != null && closeZkStateReader) {
|
||||
synchronized (this) {
|
||||
if (zkStateReader != null)
|
||||
zkStateReader.close();
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.lang.invoke.MethodHandles;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
|
|||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.common.util.SolrjNamedThreadFactory;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
|
@ -361,6 +363,14 @@ public class SolrZkClient implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public Map<String, Object> getJson(String path, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
|
||||
byte[] bytes = getData(path, null, null, retryOnConnLoss);
|
||||
if (bytes != null && bytes.length > 0) {
|
||||
return (Map<String, Object>) Utils.fromJSON(bytes);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns node's state
|
||||
*/
|
||||
|
|
|
@ -115,12 +115,18 @@ public class Utils {
|
|||
}
|
||||
|
||||
public static Map<String, Object> makeMap(Object... keyVals) {
|
||||
return makeMap(false, keyVals);
|
||||
}
|
||||
|
||||
public static Map<String, Object> makeMap(boolean skipNulls, Object... keyVals) {
|
||||
if ((keyVals.length & 0x01) != 0) {
|
||||
throw new IllegalArgumentException("arguments should be key,value");
|
||||
}
|
||||
Map<String, Object> propMap = new LinkedHashMap<>(keyVals.length >> 1);
|
||||
for (int i = 0; i < keyVals.length; i += 2) {
|
||||
propMap.put(keyVals[i].toString(), keyVals[i + 1]);
|
||||
Object keyVal = keyVals[i + 1];
|
||||
if(keyVal == null) continue;
|
||||
propMap.put(keyVals[i].toString(), keyVal);
|
||||
}
|
||||
return propMap;
|
||||
}
|
||||
|
@ -155,6 +161,7 @@ public class Utils {
|
|||
}
|
||||
|
||||
public static Object getObjectByPath(Map root, boolean onlyPrimitive, List<String> hierarchy) {
|
||||
if(root == null) return null;
|
||||
Map obj = root;
|
||||
for (int i = 0; i < hierarchy.size(); i++) {
|
||||
int idx = -1;
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Arrays;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -69,7 +70,7 @@ public class Clause implements MapWriter {
|
|||
tag = parse(s, singletonMap(s, o));
|
||||
}
|
||||
|
||||
class Condition {
|
||||
static class Condition {
|
||||
final String name;
|
||||
final Object val;
|
||||
final Operand op;
|
||||
|
@ -92,9 +93,17 @@ public class Clause implements MapWriter {
|
|||
return op.match(val, row.getVal(name)) == PASS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that instanceof Condition) {
|
||||
Condition c = (Condition) that;
|
||||
return Objects.equals(c.name, name) && Objects.equals(c.val, val) && c.op == op;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Condition parse(String s, Map m) {
|
||||
static Condition parse(String s, Map m) {
|
||||
Object expectedVal = null;
|
||||
Object val = m.get(s);
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.recipe;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public interface ClusterDataProvider extends Closeable {
|
||||
Map<String, Object> getNodeValues(String node, Collection<String> keys);
|
||||
|
||||
/**
|
||||
* Get the details of each replica in a node. It attempts to fetch as much details about
|
||||
* the replica as mentioned in the keys list. It is not necessary to give al details
|
||||
* <p>
|
||||
* the format is {collection:shard :[{replicadetails}]}
|
||||
*/
|
||||
Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys);
|
||||
|
||||
Collection<String> getNodes();
|
||||
|
||||
@Override
|
||||
default void close() throws IOException {
|
||||
}
|
||||
}
|
|
@ -27,16 +27,23 @@ import java.util.HashSet;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.util.Utils.getDeepCopy;
|
||||
|
||||
public class Policy {
|
||||
public static final String EACH = "#EACH";
|
||||
|
@ -173,11 +180,11 @@ public class Policy {
|
|||
|
||||
|
||||
enum SortParam {
|
||||
freedisk, cores, heap, cpu;
|
||||
replica, freedisk, cores, heap, cpu;
|
||||
|
||||
static SortParam get(String m) {
|
||||
for (SortParam p : values()) if (p.name().equals(m)) return p;
|
||||
throw new RuntimeException("Sort must be on one of these " + Arrays.asList(values()));
|
||||
throw new RuntimeException( "Invalid sort "+ m+ " Sort must be on one of these " + Arrays.asList(values()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -217,21 +224,7 @@ public class Policy {
|
|||
}
|
||||
|
||||
|
||||
public interface ClusterDataProvider {
|
||||
Map<String, Object> getNodeValues(String node, Collection<String> keys);
|
||||
|
||||
/**
|
||||
* Get the details of each replica in a node. It attempts to fetch as much details about
|
||||
* the replica as mentioned in the keys list. It is not necessary to give al details
|
||||
* <p>
|
||||
* the format is {collection:shard :[{replicadetails}]}
|
||||
*/
|
||||
Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys);
|
||||
|
||||
Collection<String> getNodes();
|
||||
}
|
||||
|
||||
static abstract class Suggester {
|
||||
public static abstract class Suggester {
|
||||
String coll;
|
||||
String shard;
|
||||
Policy.Session session;
|
||||
|
@ -255,6 +248,78 @@ public class Policy {
|
|||
return operation;
|
||||
}
|
||||
|
||||
public Session getSession() {
|
||||
return session;
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, List<String>> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
|
||||
String policyName, ClusterDataProvider cdp,
|
||||
List<String> shardNames,
|
||||
int repFactor) {
|
||||
Map<String, List<String>> positionMapping = new HashMap<>();
|
||||
for (String shardName : shardNames) positionMapping.put(shardName, new ArrayList<>(repFactor));
|
||||
Map policyJson = (Map) Utils.getObjectByPath(autoScalingJson, false, asList("policies", policyName));
|
||||
if (policyJson == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such policy exists " + policyName);
|
||||
}
|
||||
Map defaultPolicy = (Map) Utils.getObjectByPath(autoScalingJson, false, asList("policies", "default"));
|
||||
|
||||
Policy policy = new Policy(Policy.mergePolicies(collName, policyJson, defaultPolicy));
|
||||
Policy.Session session = policy.createSession(cdp);
|
||||
for (String shardName : shardNames) {
|
||||
for (int i = 0; i < repFactor; i++) {
|
||||
Policy.Suggester suggester = session.getSuggester(ADDREPLICA, collName, shardName);
|
||||
Map op = suggester.getOperation();
|
||||
if (op == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules");
|
||||
}
|
||||
session = suggester.getSession();
|
||||
positionMapping.get(shardName).add((String) op.get(CoreAdminParams.NODE));
|
||||
}
|
||||
}
|
||||
|
||||
return positionMapping;
|
||||
}
|
||||
|
||||
public static Map<String, Object> mergePolicies(String coll,
|
||||
Map<String, Object> collPolicy,
|
||||
Map<String, Object> defaultPolicy) {
|
||||
Collection<Map<String, Object>> conditions = getDeepCopy(getListOfMap("conditions", collPolicy), 4, true);
|
||||
insertColl(coll, conditions);
|
||||
List<Clause> parsedConditions = conditions.stream().map(Clause::new).collect(toList());
|
||||
Collection<Map<String, Object>> preferences = getDeepCopy(getListOfMap("preferences", collPolicy), 4, true);
|
||||
List<Preference> parsedPreferences = preferences.stream().map(Preference::new).collect(toList());
|
||||
if (defaultPolicy != null) {
|
||||
Collection<Map<String, Object>> defaultConditions = getDeepCopy(getListOfMap("conditions", defaultPolicy), 4, true);
|
||||
insertColl(coll,defaultConditions);
|
||||
defaultConditions.forEach(e -> {
|
||||
Clause clause = new Clause(e);
|
||||
for (Clause c : parsedConditions) {
|
||||
if (c.collection.equals(clause.collection) &&
|
||||
c.tag.name.equals(clause.tag.name) ) return;
|
||||
}
|
||||
conditions.add(e);
|
||||
});
|
||||
Collection<Map<String,Object>> defaultPreferences = getDeepCopy(getListOfMap("preferences", defaultPolicy), 4, true);
|
||||
defaultPreferences.forEach(e -> {
|
||||
Preference preference = new Preference(e);
|
||||
for (Preference p : parsedPreferences) {
|
||||
if(p.name == preference.name) return;
|
||||
}
|
||||
preferences.add(e);
|
||||
|
||||
});
|
||||
}
|
||||
return Utils.makeMap("conditions", conditions, "preferences", preferences );
|
||||
|
||||
}
|
||||
|
||||
private static Collection<Map<String, Object>> insertColl(String coll, Collection<Map<String, Object>> conditions) {
|
||||
conditions.forEach(e -> {
|
||||
if (!e.containsKey("collection")) e.put("collection", coll);
|
||||
});
|
||||
return conditions;
|
||||
}
|
||||
|
||||
private static final Map<CollectionAction, Supplier<Suggester>> ops = new HashMap<>();
|
||||
|
|
|
@ -41,7 +41,7 @@ class Row implements MapWriter {
|
|||
List<Clause> violations = new ArrayList<>();
|
||||
boolean anyValueMissing = false;
|
||||
|
||||
Row(String node, List<String> params, Policy.ClusterDataProvider snitch) {
|
||||
Row(String node, List<String> params, ClusterDataProvider snitch) {
|
||||
replicaInfo = snitch.getReplicaInfo(node, params);
|
||||
if (replicaInfo == null) replicaInfo = Collections.emptyMap();
|
||||
this.node = node;
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.solr.common.util.Utils;
|
|||
import org.apache.solr.common.util.ValidatingJsonMap;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.util.Utils.getObjectByPath;
|
||||
|
||||
public class TestPolicy extends SolrTestCaseJ4 {
|
||||
|
||||
|
@ -51,15 +52,49 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertFalse(c.tag.isPass("overseer"));
|
||||
}
|
||||
|
||||
public void testRow(){
|
||||
Row row = new Row("nodex", new Cell[]{new Cell(0,"node", "nodex")}, false, new HashMap<>(), new ArrayList<>());
|
||||
public void testRow() {
|
||||
Row row = new Row("nodex", new Cell[]{new Cell(0, "node", "nodex")}, false, new HashMap<>(), new ArrayList<>());
|
||||
Row r1 = row.addReplica("c1", "s1");
|
||||
Row r2 = r1.addReplica("c1", "s1");
|
||||
assertEquals(1,r1.replicaInfo.get("c1").get("s1").size());
|
||||
assertEquals(2,r2.replicaInfo.get("c1").get("s1").size());
|
||||
assertEquals(1, r1.replicaInfo.get("c1").get("s1").size());
|
||||
assertEquals(2, r2.replicaInfo.get("c1").get("s1").size());
|
||||
assertTrue(r2.replicaInfo.get("c1").get("s1").get(0) instanceof Policy.ReplicaInfo);
|
||||
assertTrue(r2.replicaInfo.get("c1").get("s1").get(1) instanceof Policy.ReplicaInfo);
|
||||
}
|
||||
|
||||
public void testMerge() {
|
||||
Map map = (Map) Utils.fromJSONString("{" +
|
||||
" 'policies': {" +
|
||||
" 'default': {" +
|
||||
" 'conditions': [" +
|
||||
" { replica:'#ANY' , 'nodeRole': '!overseer'}," +
|
||||
" { 'replica': '<2', 'shard': '#EACH', node:'#ANY'}" +
|
||||
" ]," +
|
||||
" 'preferences':[" +
|
||||
" {'maximize': 'freedisk', 'precision':50}," +
|
||||
" {'minimize': 'replica', 'precision':50}]" +
|
||||
" }," +
|
||||
" 'policy1': {" +
|
||||
" 'conditions': [" +
|
||||
" { 'replica': '1', 'sysprop.fs': 'ssd', 'shard': '#EACH'}," +
|
||||
" { 'replica': '<2', 'shard': '#ANY' , node: '#ANY'}," +
|
||||
" { 'replica': '<2', 'shard':'#EACH', 'rack': 'rack1' }" +
|
||||
" ], preferences: [{maximize:freedisk, precision:75}]} } }");
|
||||
map = (Map) map.get("policies");
|
||||
map = Policy.mergePolicies("mycoll", (Map<String,Object>)map.get("policy1"),(Map<String,Object>)map.get("default"));
|
||||
assertEquals(((List)map.get("conditions")).size(), 4 );
|
||||
assertEquals(((List) map.get("preferences")).size(), 2);
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "conditions[0]/replica")),"1");
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "conditions[1]/replica")),"<2");
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "conditions[1]/shard")),"#ANY");
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "conditions[2]/rack")),"rack1");
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "conditions[3]/nodeRole")),"!overseer");
|
||||
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "preferences[0]/maximize")),"freedisk");
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "preferences[0]/precision")),"75");
|
||||
assertEquals(String.valueOf(getObjectByPath(map, true, "preferences[1]/precision")),"50");
|
||||
}
|
||||
|
||||
public void testRules() throws IOException {
|
||||
String rules = "{" +
|
||||
"conditions:[{nodeRole:'!overseer', strict:false},{replica:'<1',node:node3}," +
|
||||
|
@ -70,53 +105,12 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
"{minimize:heap, precision:1000}]}";
|
||||
|
||||
|
||||
Map<String,Map> nodeValues = (Map<String, Map>) Utils.fromJSONString( "{" +
|
||||
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
|
||||
"node1:{cores:12, freedisk: 334, heap:10480}," +
|
||||
"node2:{cores:4, freedisk: 749, heap:6873}," +
|
||||
"node3:{cores:7, freedisk: 262, heap:7834}," +
|
||||
"node4:{cores:8, freedisk: 375, heap:16900, nodeRole:overseer}" +
|
||||
"}");
|
||||
String clusterState = "{'gettingstarted':{" +
|
||||
" 'router':{'name':'compositeId'}," +
|
||||
" 'shards':{" +
|
||||
" 'shard1':{" +
|
||||
" 'range':'80000000-ffffffff'," +
|
||||
" 'replicas':{" +
|
||||
" 'r1':{" +
|
||||
" 'core':r1," +
|
||||
" 'base_url':'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name':'node1'," +
|
||||
" 'state':'active'," +
|
||||
" 'leader':'true'}," +
|
||||
" 'r2':{" +
|
||||
" 'core':r2," +
|
||||
" 'base_url':'http://10.0.0.4:7574/solr'," +
|
||||
" 'node_name':'node2'," +
|
||||
" 'state':'active'}}}," +
|
||||
" 'shard2':{" +
|
||||
" 'range':'0-7fffffff'," +
|
||||
" 'replicas':{" +
|
||||
" 'r3':{" +
|
||||
" 'core':r3," +
|
||||
" 'base_url':'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name':'node1'," +
|
||||
" 'state':'active'," +
|
||||
" 'leader':'true'}," +
|
||||
" 'r4':{" +
|
||||
" 'core':r4," +
|
||||
" 'base_url':'http://10.0.0.4:8987/solr'," +
|
||||
" 'node_name':'node4'," +
|
||||
" 'state':'active'}," +
|
||||
" 'r6':{" +
|
||||
" 'core':r6," +
|
||||
" 'base_url':'http://10.0.0.4:8989/solr'," +
|
||||
" 'node_name':'node3'," +
|
||||
" 'state':'active'}," +
|
||||
" 'r5':{" +
|
||||
" 'core':r5," +
|
||||
" 'base_url':'http://10.0.0.4:7574/solr'," +
|
||||
" 'node_name':'node1'," +
|
||||
" 'state':'active'}}}}}}";
|
||||
|
||||
|
||||
ValidatingJsonMap m = ValidatingJsonMap
|
||||
|
@ -125,10 +119,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
|
||||
Policy.Session session;
|
||||
Policy.ClusterDataProvider snitch = new Policy.ClusterDataProvider() {
|
||||
ClusterDataProvider snitch = new ClusterDataProvider() {
|
||||
@Override
|
||||
public Map<String,Object> getNodeValues(String node, Collection<String> keys) {
|
||||
Map<String,Object> result = new LinkedHashMap<>();
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
|
||||
Map<String, Object> result = new LinkedHashMap<>();
|
||||
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
|
||||
return result;
|
||||
}
|
||||
|
@ -140,39 +134,19 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
@Override
|
||||
public Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<Policy.ReplicaInfo>>> result = new LinkedHashMap<>();
|
||||
|
||||
m.forEach((collName, o) -> {
|
||||
ValidatingJsonMap coll = (ValidatingJsonMap) o;
|
||||
coll.getMap("shards").forEach((shard, o1) -> {
|
||||
ValidatingJsonMap sh = (ValidatingJsonMap) o1;
|
||||
sh.getMap("replicas").forEach((replicaName, o2) -> {
|
||||
ValidatingJsonMap r = (ValidatingJsonMap) o2;
|
||||
String node_name = (String) r.get("node_name");
|
||||
if (!node_name.equals(node)) return;
|
||||
Map<String, List<Policy.ReplicaInfo>> shardVsReplicaStats = result.get(collName);
|
||||
if (shardVsReplicaStats == null) result.put(collName, shardVsReplicaStats = new HashMap<>());
|
||||
List<Policy.ReplicaInfo> replicaInfos = shardVsReplicaStats.get(shard);
|
||||
if (replicaInfos == null) shardVsReplicaStats.put(shard, replicaInfos = new ArrayList<>());
|
||||
replicaInfos.add(new Policy.ReplicaInfo(replicaName, new HashMap<>()));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
return result;
|
||||
return getReplicaDetails(node, m);
|
||||
}
|
||||
|
||||
|
||||
};
|
||||
|
||||
session = policy.createSession( snitch);
|
||||
session = policy.createSession(snitch);
|
||||
|
||||
session.applyRules();
|
||||
List<Row> l = session.getSorted();
|
||||
assertEquals("node1",l.get(0).node);
|
||||
assertEquals("node3",l.get(1).node);
|
||||
assertEquals("node4",l.get(2).node);
|
||||
assertEquals("node2",l.get(3).node);
|
||||
assertEquals("node1", l.get(0).node);
|
||||
assertEquals("node3", l.get(1).node);
|
||||
assertEquals("node4", l.get(2).node);
|
||||
assertEquals("node2", l.get(3).node);
|
||||
|
||||
|
||||
System.out.printf(Utils.toJSONString(Utils.getDeepCopy(session.toMap(new LinkedHashMap<>()), 8)));
|
||||
|
@ -198,7 +172,70 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
System.out.println(Utils.toJSONString(operation));
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static String clusterState = "{'gettingstarted':{" +
|
||||
" 'router':{'name':'compositeId'}," +
|
||||
" 'shards':{" +
|
||||
" 'shard1':{" +
|
||||
" 'range':'80000000-ffffffff'," +
|
||||
" 'replicas':{" +
|
||||
" 'r1':{" +
|
||||
" 'core':r1," +
|
||||
" 'base_url':'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name':'node1'," +
|
||||
" 'state':'active'," +
|
||||
" 'leader':'true'}," +
|
||||
" 'r2':{" +
|
||||
" 'core':r2," +
|
||||
" 'base_url':'http://10.0.0.4:7574/solr'," +
|
||||
" 'node_name':'node2'," +
|
||||
" 'state':'active'}}}," +
|
||||
" 'shard2':{" +
|
||||
" 'range':'0-7fffffff'," +
|
||||
" 'replicas':{" +
|
||||
" 'r3':{" +
|
||||
" 'core':r3," +
|
||||
" 'base_url':'http://10.0.0.4:8983/solr'," +
|
||||
" 'node_name':'node1'," +
|
||||
" 'state':'active'," +
|
||||
" 'leader':'true'}," +
|
||||
" 'r4':{" +
|
||||
" 'core':r4," +
|
||||
" 'base_url':'http://10.0.0.4:8987/solr'," +
|
||||
" 'node_name':'node4'," +
|
||||
" 'state':'active'}," +
|
||||
" 'r6':{" +
|
||||
" 'core':r6," +
|
||||
" 'base_url':'http://10.0.0.4:8989/solr'," +
|
||||
" 'node_name':'node3'," +
|
||||
" 'state':'active'}," +
|
||||
" 'r5':{" +
|
||||
" 'core':r5," +
|
||||
" 'base_url':'http://10.0.0.4:7574/solr'," +
|
||||
" 'node_name':'node1'," +
|
||||
" 'state':'active'}}}}}}";
|
||||
|
||||
public static Map<String, Map<String, List<Policy.ReplicaInfo>>> getReplicaDetails(String node, ValidatingJsonMap m) {
|
||||
Map<String, Map<String, List<Policy.ReplicaInfo>>> result = new LinkedHashMap<>();
|
||||
|
||||
m.forEach((collName, o) -> {
|
||||
ValidatingJsonMap coll = (ValidatingJsonMap) o;
|
||||
coll.getMap("shards").forEach((shard, o1) -> {
|
||||
ValidatingJsonMap sh = (ValidatingJsonMap) o1;
|
||||
sh.getMap("replicas").forEach((replicaName, o2) -> {
|
||||
ValidatingJsonMap r = (ValidatingJsonMap) o2;
|
||||
String node_name = (String) r.get("node_name");
|
||||
if (!node_name.equals(node)) return;
|
||||
Map<String, List<Policy.ReplicaInfo>> shardVsReplicaStats = result.get(collName);
|
||||
if (shardVsReplicaStats == null) result.put(collName, shardVsReplicaStats = new HashMap<>());
|
||||
List<Policy.ReplicaInfo> replicaInfos = shardVsReplicaStats.get(shard);
|
||||
if (replicaInfos == null) shardVsReplicaStats.put(shard, replicaInfos = new ArrayList<>());
|
||||
replicaInfos.add(new Policy.ReplicaInfo(replicaName, new HashMap<>()));
|
||||
});
|
||||
});
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue