mirror of https://github.com/apache/lucene.git
SOLR-12031: Refactor Policy framework to make simulated changes affect more than a single node
SOLR-12050: UTILIZENODE does not enforce policy rules
This commit is contained in:
parent
0424d9c06b
commit
23aee00213
|
@ -225,6 +225,9 @@ Bug Fixes
|
|||
|
||||
* SOLR-10720: Aggressive removal of a collection breaks cluster status API. (Alexey Serba, shalin)
|
||||
|
||||
* SOLR-12050: UTILIZENODE does not enforce policy rules (hossman, noble)
|
||||
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
@ -327,6 +330,8 @@ Other Changes
|
|||
|
||||
* SOLR-12028: BadApple and AwaitsFix annotations usage (Erick Erickson, Uwe Schindler)
|
||||
|
||||
* SOLR-12031: Refactor Policy framework to make simulated changes affect more than a single node (noble)
|
||||
|
||||
================== 7.2.1 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -85,18 +85,31 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
}
|
||||
executeAll(requests);
|
||||
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager());
|
||||
Policy.Session session = sessionWrapper.get();
|
||||
Policy.Session session = sessionWrapper.get();
|
||||
Suggester initialsuggester = session.getSuggester(MOVEREPLICA)
|
||||
.hint(Suggester.Hint.TARGET_NODE, nodeName);
|
||||
Suggester suggester = null;
|
||||
for (; ; ) {
|
||||
Suggester suggester = session.getSuggester(MOVEREPLICA)
|
||||
suggester = session.getSuggester(MOVEREPLICA)
|
||||
.hint(Suggester.Hint.TARGET_NODE, nodeName);
|
||||
session = suggester.getSession();
|
||||
SolrRequest request = suggester.getSuggestion();
|
||||
if (requests.size() > 10) {
|
||||
log.info("too_many_suggestions");
|
||||
PolicyHelper.logState(ocmh.overseer.getSolrCloudManager(), initialsuggester);
|
||||
break;
|
||||
}
|
||||
log.info("SUGGESTION: {}", request);
|
||||
if (request == null) break;
|
||||
session = suggester.getSession();
|
||||
requests.add(new ZkNodeProps(COLLECTION_PROP, request.getParams().get(COLLECTION_PROP),
|
||||
CollectionParams.TARGET_NODE, request.getParams().get(CollectionParams.TARGET_NODE),
|
||||
REPLICA_PROP, request.getParams().get(REPLICA_PROP),
|
||||
ASYNC, request.getParams().get(ASYNC)));
|
||||
}
|
||||
log.info("total_suggestions: {}", requests.size());
|
||||
if (requests.size() == 0) {
|
||||
PolicyHelper.logState(ocmh.overseer.getSolrCloudManager(), initialsuggester);
|
||||
}
|
||||
sessionWrapper.returnSession(session);
|
||||
try {
|
||||
executeAll(requests);
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* This class is responsible for using the configured policy and preferences
|
||||
* with the hints provided by the trigger event to compute the required cluster operations.
|
||||
*
|
||||
* <p>
|
||||
* The cluster operations computed here are put into the {@link ActionContext}'s properties
|
||||
* with the key name "operations". The value is a List of SolrRequest objects.
|
||||
*/
|
||||
|
@ -81,7 +81,8 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
log.trace("-- state: {}", clusterState);
|
||||
}
|
||||
try {
|
||||
Suggester suggester = getSuggester(session, event, cloudManager);
|
||||
Suggester intialSuggester = getSuggester(session, event, cloudManager);
|
||||
Suggester suggester = intialSuggester;
|
||||
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
|
||||
int requestedOperations = getRequestedNumOps(event);
|
||||
if (requestedOperations > maxOperations) {
|
||||
|
@ -104,8 +105,15 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
|
||||
// break on first null op
|
||||
// unless a specific number of ops was requested
|
||||
// uncomment the following to log too many operations
|
||||
/*if (opCount > 10) {
|
||||
PolicyHelper.logState(cloudManager, intialSuggester);
|
||||
}*/
|
||||
|
||||
if (operation == null) {
|
||||
if (requestedOperations < 0) {
|
||||
//uncomment the following to log zero operations
|
||||
// PolicyHelper.logState(cloudManager, intialSuggester);
|
||||
break;
|
||||
} else {
|
||||
log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try.");
|
||||
|
@ -150,7 +158,7 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
AtomicInteger totalRF = new AtomicInteger();
|
||||
clusterState.forEachCollection(coll -> totalRF.addAndGet(coll.getReplicationFactor() * coll.getSlices().size()));
|
||||
int totalMax = clusterState.getLiveNodes().size() * totalRF.get() * 3;
|
||||
int maxOp = (Integer)autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax);
|
||||
int maxOp = (Integer) autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax);
|
||||
Object o = event.getProperty(AutoScalingParams.MAX_COMPUTE_OPERATIONS, maxOp);
|
||||
try {
|
||||
return Integer.parseInt(String.valueOf(o));
|
||||
|
@ -161,7 +169,7 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
}
|
||||
|
||||
protected int getRequestedNumOps(TriggerEvent event) {
|
||||
Collection<TriggerEvent.Op> ops = (Collection<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
|
||||
Collection<TriggerEvent.Op> ops = (Collection<TriggerEvent.Op>) event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
|
||||
if (ops.isEmpty()) {
|
||||
return -1;
|
||||
} else {
|
||||
|
|
|
@ -29,21 +29,22 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.impl.SolrClientDataProvider=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper=TRACE")
|
||||
public class TestUtilizeNode extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
configureCluster(3)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
|
||||
.configure();
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
|
@ -71,7 +72,6 @@ public class TestUtilizeNode extends SolrCloudTestCase {
|
|||
}
|
||||
|
||||
@Test
|
||||
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12050")
|
||||
public void test() throws Exception {
|
||||
cluster.waitForAllNodes(5000);
|
||||
int REPLICATION = 2;
|
||||
|
@ -79,7 +79,8 @@ public class TestUtilizeNode extends SolrCloudTestCase {
|
|||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
|
||||
log.info("Creating Collection...");
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION)
|
||||
.setMaxShardsPerNode(2);
|
||||
cloudClient.request(create);
|
||||
|
||||
log.info("Spinning up additional jettyX...");
|
||||
|
|
|
@ -17,18 +17,18 @@
|
|||
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
|
||||
class AddReplicaSuggester extends Suggester {
|
||||
|
||||
SolrRequest init() {
|
||||
|
@ -42,42 +42,38 @@ class AddReplicaSuggester extends Suggester {
|
|||
if (shards.isEmpty()) {
|
||||
throw new RuntimeException("add-replica requires 'collection' and 'shard'");
|
||||
}
|
||||
for (Pair<String,String> shard : shards) {
|
||||
for (Pair<String, String> shard : shards) {
|
||||
Replica.Type type = Replica.Type.get((String) hints.get(Hint.REPLICATYPE));
|
||||
//iterate through elements and identify the least loaded
|
||||
//iterate through elemenodesnts and identify the least loaded
|
||||
List<Violation> leastSeriousViolation = null;
|
||||
Integer targetNodeIndex = null;
|
||||
Row bestNode = null;
|
||||
for (int i = getMatrix().size() - 1; i >= 0; i--) {
|
||||
Row row = getMatrix().get(i);
|
||||
if (!isNodeSuitable(row)) continue;
|
||||
if (!isNodeSuitableForReplicaAddition(row)) continue;
|
||||
Row tmpRow = row.addReplica(shard.first(), shard.second(), type);
|
||||
|
||||
List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getMatrix(), tmpRow, i));
|
||||
List<Violation> errs = testChangedMatrix(strict, tmpRow.session.matrix);
|
||||
if (!containsNewErrors(errs)) {
|
||||
if (isLessSerious(errs, leastSeriousViolation)) {
|
||||
leastSeriousViolation = errs;
|
||||
targetNodeIndex = i;
|
||||
bestNode = tmpRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (targetNodeIndex != null) {// there are no rule violations
|
||||
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(shard.first(), shard.second(), type));
|
||||
if (bestNode != null) {// there are no rule violations
|
||||
this.session = bestNode.session;
|
||||
return CollectionAdminRequest
|
||||
.addReplicaToShard(shard.first(), shard.second())
|
||||
.setType(type)
|
||||
.setNode(getMatrix().get(targetNodeIndex).node);
|
||||
.setNode(bestNode.node);
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void writeMap(MapWriter.EntryWriter ew) throws IOException {
|
||||
ew.put("action", CollectionParams.CollectionAction.ADDREPLICA.toString());
|
||||
super.writeMap(ew);
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return ADDREPLICA;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,28 +23,32 @@ import java.util.HashMap;
|
|||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
/**Each instance represents an attribute that is being tracked by the framework such as , freedisk, cores etc
|
||||
*
|
||||
*/
|
||||
public class Cell implements MapWriter {
|
||||
final int index;
|
||||
final Suggestion.ConditionType type;
|
||||
final String name;
|
||||
Object val, approxVal;
|
||||
Row row;
|
||||
|
||||
public Cell(int index, String name, Object val) {
|
||||
this.index = index;
|
||||
this.name = name;
|
||||
this.val = val;
|
||||
}
|
||||
|
||||
public Cell(int index, String name, Object val, Object approxVal) {
|
||||
public Cell(int index, String name, Object val, Object approxVal, Suggestion.ConditionType type, Row row) {
|
||||
this.index = index;
|
||||
this.name = name;
|
||||
this.val = val;
|
||||
this.approxVal = approxVal;
|
||||
this.type = type;
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put(name, val);
|
||||
}
|
||||
public Row getRow(){
|
||||
return row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
@ -52,7 +56,7 @@ public class Cell implements MapWriter {
|
|||
}
|
||||
|
||||
public Cell copy() {
|
||||
return new Cell(index, name, val, approxVal);
|
||||
return new Cell(index, name, val, approxVal, this.type, row);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
|
|
|
@ -151,19 +151,20 @@ public class Clause implements MapWriter, Comparable<Clause> {
|
|||
class Condition {
|
||||
final String name;
|
||||
final Object val;
|
||||
final Suggestion.ConditionType varType;
|
||||
final Operand op;
|
||||
|
||||
Condition(String name, Object val, Operand op) {
|
||||
this.name = name;
|
||||
this.val = val;
|
||||
this.op = op;
|
||||
varType = Suggestion.getTagType(name);
|
||||
}
|
||||
|
||||
|
||||
boolean isPass(Object inputVal) {
|
||||
if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(type);
|
||||
Suggestion.ConditionType validator = Suggestion.getTagType(name);
|
||||
if (validator == Suggestion.ConditionType.LAZY) { // we don't know the type
|
||||
if (varType == Suggestion.ConditionType.LAZY) { // we don't know the type
|
||||
return op.match(parseString(val), parseString(inputVal)) == PASS;
|
||||
} else {
|
||||
return op.match(val, validate(name, inputVal, false)) == PASS;
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -26,6 +25,8 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
|
||||
|
||||
public class MoveReplicaSuggester extends Suggester {
|
||||
|
||||
@Override
|
||||
|
@ -38,50 +39,50 @@ public class MoveReplicaSuggester extends Suggester {
|
|||
SolrRequest tryEachNode(boolean strict) {
|
||||
//iterate through elements and identify the least loaded
|
||||
List<Violation> leastSeriousViolation = null;
|
||||
Integer targetNodeIndex = null;
|
||||
Integer sourceNodeIndex = null;
|
||||
Row bestSrcRow = null;
|
||||
Row bestTargetRow = null;
|
||||
ReplicaInfo sourceReplicaInfo = null;
|
||||
List<Pair<ReplicaInfo, Row>> validReplicas = getValidReplicas(true, true, -1);
|
||||
validReplicas.sort(leaderLast);
|
||||
for (Pair<ReplicaInfo, Row> fromReplica : validReplicas) {
|
||||
for (int i1 = 0; i1 < validReplicas.size(); i1++) {
|
||||
Pair<ReplicaInfo, Row> fromReplica = validReplicas.get(i1);
|
||||
Row fromRow = fromReplica.second();
|
||||
ReplicaInfo replicaInfo = fromReplica.first();
|
||||
String coll = replicaInfo.getCollection();
|
||||
String shard = replicaInfo.getShard();
|
||||
Pair<Row, ReplicaInfo> pair = fromRow.removeReplica(coll, shard, replicaInfo.getType());
|
||||
Row srcTmpRow = pair.first();
|
||||
if (srcTmpRow == null) {
|
||||
//no such replica available
|
||||
continue;
|
||||
}
|
||||
|
||||
final int i = getMatrix().indexOf(fromRow);
|
||||
ReplicaInfo ri = fromReplica.first();
|
||||
if (ri == null) continue;
|
||||
final int i = session.indexOf(fromRow.node);
|
||||
int stopAt = force ? 0 : i;
|
||||
for (int j = getMatrix().size() - 1; j >= stopAt; j--) {
|
||||
if (j == i) continue;
|
||||
Row targetRow = getMatrix().get(j);
|
||||
if (!isNodeSuitable(targetRow)) continue;
|
||||
targetRow = targetRow.addReplica(coll, shard, replicaInfo.getType());
|
||||
List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), srcTmpRow, i), targetRow, j));
|
||||
if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation) &&
|
||||
(force || Policy.compareRows(srcTmpRow, targetRow, session.getPolicy()) < 1)) {
|
||||
Row targetRow = null;
|
||||
for (int j = session.matrix.size() - 1; j >= stopAt; j--) {
|
||||
targetRow = session.matrix.get(j);
|
||||
if (targetRow.node.equals(fromRow.node)) continue;
|
||||
if (!isNodeSuitableForReplicaAddition(targetRow)) continue;
|
||||
targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType());//add replica to target first
|
||||
Pair<Row, ReplicaInfo> pair = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
|
||||
if (pair == null) continue;//should not happen
|
||||
Row srcRowModified = pair.first();//this is the final state of the source row and session
|
||||
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session.matrix);
|
||||
srcRowModified.session.applyRules();// now resort the nodes with the new values
|
||||
Policy.Session tmpSession = srcRowModified.session;
|
||||
if (!containsNewErrors(errs) &&
|
||||
isLessSerious(errs, leastSeriousViolation) &&
|
||||
(force || (tmpSession.indexOf(srcRowModified.node) < tmpSession.indexOf(targetRow.node)))) {
|
||||
leastSeriousViolation = errs;
|
||||
targetNodeIndex = j;
|
||||
sourceNodeIndex = i;
|
||||
sourceReplicaInfo = replicaInfo;
|
||||
bestSrcRow = srcRowModified;
|
||||
sourceReplicaInfo = ri;
|
||||
bestTargetRow = targetRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (targetNodeIndex != null && sourceNodeIndex != null) {
|
||||
getMatrix().set(sourceNodeIndex, getMatrix().get(sourceNodeIndex).removeReplica(sourceReplicaInfo.getCollection(), sourceReplicaInfo.getShard(), sourceReplicaInfo.getType()).first());
|
||||
getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(sourceReplicaInfo.getCollection(), sourceReplicaInfo.getShard(), sourceReplicaInfo.getType()));
|
||||
if (bestSrcRow != null) {
|
||||
this.session = bestSrcRow.session;
|
||||
return new CollectionAdminRequest.MoveReplica(
|
||||
sourceReplicaInfo.getCollection(),
|
||||
sourceReplicaInfo.getName(),
|
||||
getMatrix().get(targetNodeIndex).node);
|
||||
bestTargetRow.node);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
static Comparator<Pair<ReplicaInfo, Row>> leaderLast = (r1, r2) -> {
|
||||
if (r1.first().isLeader) return 1;
|
||||
if (r2.first().isLeader) return -1;
|
||||
|
@ -90,8 +91,7 @@ public class MoveReplicaSuggester extends Suggester {
|
|||
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("action", CollectionParams.CollectionAction.MOVEREPLICA.toString());
|
||||
super.writeMap(ew);
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return MOVEREPLICA;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.solr.common.MapWriter;
|
|||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -77,7 +78,7 @@ public class Policy implements MapWriter {
|
|||
final Map<String, List<Clause>> policies;
|
||||
final List<Clause> clusterPolicy;
|
||||
final List<Preference> clusterPreferences;
|
||||
final List<String> params;
|
||||
final List<Pair<String, Suggestion.ConditionType>> params;
|
||||
final List<String> perReplicaAttributes;
|
||||
|
||||
public Policy() {
|
||||
|
@ -111,13 +112,16 @@ public class Policy implements MapWriter {
|
|||
.collect(collectingAndThen(toList(), Collections::unmodifiableList));
|
||||
|
||||
this.policies = Collections.unmodifiableMap(
|
||||
policiesFromMap((Map<String, List<Map<String, Object>>>)jsonMap.getOrDefault(POLICIES, emptyMap()), newParams));
|
||||
this.params = Collections.unmodifiableList(newParams);
|
||||
policiesFromMap((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault(POLICIES, emptyMap()), newParams));
|
||||
this.params = Collections.unmodifiableList(newParams.stream()
|
||||
.map(s -> new Pair<>(s, Suggestion.getTagType(s)))
|
||||
.collect(toList()));
|
||||
perReplicaAttributes = readPerReplicaAttrs();
|
||||
}
|
||||
|
||||
private List<String> readPerReplicaAttrs() {
|
||||
return this.params.stream()
|
||||
.map(Suggestion.tagVsPerReplicaVal::get)
|
||||
.map(s -> Suggestion.tagVsPerReplicaVal.get(s.first()))
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
@ -126,7 +130,11 @@ public class Policy implements MapWriter {
|
|||
this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap();
|
||||
this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList();
|
||||
this.clusterPreferences = clusterPreferences != null ? Collections.unmodifiableList(clusterPreferences) : DEFAULT_PREFERENCES;
|
||||
this.params = Collections.unmodifiableList(buildParams(this.clusterPreferences, this.clusterPolicy, this.policies));
|
||||
this.params = Collections.unmodifiableList(
|
||||
buildParams(this.clusterPreferences, this.clusterPolicy, this.policies).stream()
|
||||
.map(s -> new Pair<>(s, Suggestion.getTagType(s)))
|
||||
.collect(toList())
|
||||
);
|
||||
perReplicaAttributes = readPerReplicaAttrs();
|
||||
}
|
||||
|
||||
|
@ -207,9 +215,9 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
/*This stores the logical state of the system, given a policy and
|
||||
* a cluster state.
|
||||
*
|
||||
*/
|
||||
* a cluster state.
|
||||
*
|
||||
*/
|
||||
public class Session implements MapWriter {
|
||||
final List<String> nodes;
|
||||
final SolrCloudManager cloudManager;
|
||||
|
@ -228,6 +236,7 @@ public class Policy implements MapWriter {
|
|||
this.expandedClauses = expandedClauses;
|
||||
this.znodeVersion = znodeVersion;
|
||||
this.nodeStateProvider = nodeStateProvider;
|
||||
for (Row row : matrix) row.session = this;
|
||||
}
|
||||
|
||||
|
||||
|
@ -259,7 +268,7 @@ public class Policy implements MapWriter {
|
|||
Collections.sort(expandedClauses);
|
||||
|
||||
matrix = new ArrayList<>(nodes.size());
|
||||
for (String node : nodes) matrix.add(new Row(node, params, perReplicaAttributes,this));
|
||||
for (String node : nodes) matrix.add(new Row(node, params, perReplicaAttributes, this));
|
||||
applyRules();
|
||||
}
|
||||
|
||||
|
@ -269,7 +278,6 @@ public class Policy implements MapWriter {
|
|||
List<Clause> perCollPolicy = policies.get(p);
|
||||
if (perCollPolicy == null) {
|
||||
return;
|
||||
// throw new RuntimeException(StrUtils.formatString("Policy for collection {0} is {1} . It does not exist", c, p));
|
||||
}
|
||||
}
|
||||
expandedClauses.addAll(mergePolicies(c, policies.getOrDefault(p, emptyList()), clusterPolicy));
|
||||
|
@ -279,9 +287,14 @@ public class Policy implements MapWriter {
|
|||
return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider);
|
||||
}
|
||||
|
||||
public Row getNode(String node) {
|
||||
for (Row row : matrix) if (row.node.equals(node)) return row;
|
||||
return null;
|
||||
}
|
||||
|
||||
List<Row> getMatrixCopy() {
|
||||
return matrix.stream()
|
||||
.map(Row::copy)
|
||||
.map(row -> row.copy(this))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -303,7 +316,6 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
|
||||
|
||||
public List<Violation> getViolations() {
|
||||
return violations;
|
||||
}
|
||||
|
@ -336,6 +348,11 @@ public class Policy implements MapWriter {
|
|||
public NodeStateProvider getNodeStateProvider() {
|
||||
return nodeStateProvider;
|
||||
}
|
||||
|
||||
public int indexOf(String node) {
|
||||
for (int i = 0; i < matrix.size(); i++) if (matrix.get(i).node.equals(node)) return i;
|
||||
throw new RuntimeException("NO such node found " + node);
|
||||
}
|
||||
}
|
||||
|
||||
static void setApproxValuesAndSortNodes(List<Preference> clusterPreferences, List<Row> matrix) {
|
||||
|
@ -367,7 +384,7 @@ public class Policy implements MapWriter {
|
|||
public enum SortParam {
|
||||
freedisk(0, Integer.MAX_VALUE), cores(0, Integer.MAX_VALUE), heapUsage(0, Integer.MAX_VALUE), sysLoadAvg(0, 100);
|
||||
|
||||
public final int min,max;
|
||||
public final int min, max;
|
||||
|
||||
SortParam(int min, int max) {
|
||||
this.min = min;
|
||||
|
@ -416,8 +433,8 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
public static List<Clause> mergePolicies(String coll,
|
||||
List<Clause> collPolicy,
|
||||
List<Clause> globalPolicy) {
|
||||
List<Clause> collPolicy,
|
||||
List<Clause> globalPolicy) {
|
||||
|
||||
List<Clause> merged = insertColl(coll, collPolicy);
|
||||
List<Clause> global = insertColl(coll, globalPolicy);
|
||||
|
@ -455,7 +472,7 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
public List<String> getParams() {
|
||||
return params;
|
||||
return params.stream().map(Pair::first).collect(toList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -223,6 +223,27 @@ public class PolicyHelper {
|
|||
return suggestionCtx.getSuggestions();
|
||||
}
|
||||
|
||||
|
||||
/**Use this to dump the state of a system and to generate a testcase
|
||||
*/
|
||||
public static void logState(SolrCloudManager cloudManager, Suggester suggester) {
|
||||
if(log.isTraceEnabled()) {
|
||||
log.trace("LOGSTATE: {}",
|
||||
Utils.toJSONString((MapWriter) ew -> {
|
||||
ew.put("liveNodes", cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
ew.put("suggester", suggester);
|
||||
if (suggester.session.nodeStateProvider instanceof MapWriter) {
|
||||
MapWriter nodeStateProvider = (MapWriter) suggester.session.nodeStateProvider;
|
||||
nodeStateProvider.writeMap(ew);
|
||||
}
|
||||
try {
|
||||
ew.put("autoscalingJson", cloudManager.getDistribStateManager().getAutoScalingConfig());
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
public enum Status {
|
||||
NULL,
|
||||
//it is just created and not yet used or all operations on it has been competed fully
|
||||
|
|
|
@ -44,7 +44,7 @@ public class Preference implements MapWriter {
|
|||
|
||||
public Preference(Map<String, Object> m, int idx) {
|
||||
this.idx = idx;
|
||||
this.original = Utils.getDeepCopy(m,3);
|
||||
this.original = Utils.getDeepCopy(m, 3);
|
||||
sort = Policy.Sort.get(m);
|
||||
name = Policy.SortParam.get(m.get(sort.name()).toString());
|
||||
Object p = m.getOrDefault("precision", 0);
|
||||
|
@ -52,9 +52,9 @@ public class Preference implements MapWriter {
|
|||
if (precision < 0) {
|
||||
throw new RuntimeException("precision must be a positive value ");
|
||||
}
|
||||
if(precision< name.min || precision> name.max){
|
||||
if (precision < name.min || precision > name.max) {
|
||||
throw new RuntimeException(StrUtils.formatString("invalid precision value {0} , must lie between {1} and {2}",
|
||||
precision, name.min, name.max ) );
|
||||
precision, name.min, name.max));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -70,11 +70,22 @@ public class Preference implements MapWriter {
|
|||
Object o2 = useApprox ? r2.cells[idx].approxVal : r2.cells[idx].val;
|
||||
int result = 0;
|
||||
if (o1 instanceof Long && o2 instanceof Long) result = ((Long) o1).compareTo((Long) o2);
|
||||
else if (o1 instanceof Double && o2 instanceof Double) result = ((Double) o1).compareTo((Double) o2);
|
||||
else if (!o1.getClass().getName().equals(o2.getClass().getName())) {
|
||||
else if (o1 instanceof Double && o2 instanceof Double) {
|
||||
result = compareWithTolerance((Double) o1, (Double) o2, useApprox ? 1 : 1);
|
||||
} else if (!o1.getClass().getName().equals(o2.getClass().getName())) {
|
||||
throw new RuntimeException("Unable to compare " + o1 + " of type: " + o1.getClass().getName() + " from " + r1.cells[idx].toString() + " and " + o2 + " of type: " + o2.getClass().getName() + " from " + r2.cells[idx].toString());
|
||||
}
|
||||
return result == 0 ? (next == null ? 0 : next.compare(r1, r2, useApprox)) : sort.sortval * result;
|
||||
return result == 0 ?
|
||||
(next == null ? 0 :
|
||||
next.compare(r1, r2, useApprox)) : sort.sortval * result;
|
||||
}
|
||||
|
||||
private int compareWithTolerance(Double o1, Double o2, int percentage) {
|
||||
if (percentage == 0) return o1.compareTo(o2);
|
||||
if (o1.equals(o2)) return 0;
|
||||
double delta = Math.abs(o1 - o2);
|
||||
if ((100 * delta / o1) < percentage) return 0;
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
|
||||
//sets the new value according to precision in val_
|
||||
|
@ -84,10 +95,17 @@ public class Preference implements MapWriter {
|
|||
if (!row.isLive) {
|
||||
continue;
|
||||
}
|
||||
prevVal = row.cells[idx].approxVal =
|
||||
(prevVal == null || Double.compare(Math.abs(((Number) prevVal).doubleValue() - ((Number) row.cells[idx].val).doubleValue()), precision) > 0) ?
|
||||
row.cells[idx].val :
|
||||
prevVal;
|
||||
if (prevVal == null) {//this is the first
|
||||
prevVal = row.cells[idx].approxVal = row.cells[idx].val;
|
||||
} else {
|
||||
double prevD = ((Number) prevVal).doubleValue();
|
||||
double currD = ((Number) row.cells[idx].val).doubleValue();
|
||||
if (Math.abs(prevD - currD) >= precision) {
|
||||
prevVal = row.cells[idx].approxVal = row.cells[idx].val;
|
||||
} else {
|
||||
prevVal = row.cells[idx].approxVal = prevVal;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
|
|||
|
||||
|
||||
public class ReplicaInfo implements MapWriter {
|
||||
// private final Replica replica;
|
||||
private final String name;
|
||||
private String core, collection, shard;
|
||||
private Replica.Type type;
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.common.IteratorWriter;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
|
@ -35,37 +36,44 @@ import org.apache.solr.common.util.Utils;
|
|||
|
||||
import static org.apache.solr.common.params.CoreAdminParams.NODE;
|
||||
|
||||
|
||||
/**
|
||||
* Each instance represents a node in the cluster
|
||||
*/
|
||||
public class Row implements MapWriter {
|
||||
public final String node;
|
||||
final Cell[] cells;
|
||||
//this holds the details of each replica in the node
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
|
||||
boolean anyValueMissing = false;
|
||||
boolean isLive = true;
|
||||
Policy.Session session;
|
||||
|
||||
public Row(String node, List<String> params, List<String> perReplicaAttributes, Policy.Session session) {
|
||||
public Row(String node, List<Pair<String, Suggestion.ConditionType>> params, List<String> perReplicaAttributes, Policy.Session session) {
|
||||
this.session = session;
|
||||
collectionVsShardVsReplicas = session.nodeStateProvider.getReplicaInfo(node, perReplicaAttributes);
|
||||
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
|
||||
this.node = node;
|
||||
cells = new Cell[params.size()];
|
||||
isLive = session.cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
|
||||
Map<String, Object> vals = isLive ? session.nodeStateProvider.getNodeValues(node, params) : Collections.emptyMap();
|
||||
List<String> paramNames = params.stream().map(Pair::first).collect(Collectors.toList());
|
||||
Map<String, Object> vals = isLive ? session.nodeStateProvider.getNodeValues(node, paramNames) : Collections.emptyMap();
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
String s = params.get(i);
|
||||
cells[i] = new Cell(i, s, Clause.validate(s,vals.get(s), false));
|
||||
if (NODE.equals(s)) cells[i].val = node;
|
||||
Pair<String, Suggestion.ConditionType> pair = params.get(i);
|
||||
cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this);
|
||||
if (NODE.equals(pair.first())) cells[i].val = node;
|
||||
if (cells[i].val == null) anyValueMissing = true;
|
||||
}
|
||||
}
|
||||
|
||||
public Row(String node, Cell[] cells, boolean anyValueMissing, Map<String,
|
||||
Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, boolean isLive) {
|
||||
Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, boolean isLive, Policy.Session session) {
|
||||
this.session = session;
|
||||
this.node = node;
|
||||
this.isLive = isLive;
|
||||
this.cells = new Cell[cells.length];
|
||||
for (int i = 0; i < this.cells.length; i++) {
|
||||
this.cells[i] = cells[i].copy();
|
||||
|
||||
this.cells[i].row = this;
|
||||
}
|
||||
this.anyValueMissing = anyValueMissing;
|
||||
this.collectionVsShardVsReplicas = collectionVsShardVsReplicas;
|
||||
|
@ -79,8 +87,8 @@ public class Row implements MapWriter {
|
|||
});
|
||||
}
|
||||
|
||||
Row copy() {
|
||||
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), isLive);
|
||||
Row copy(Policy.Session session) {
|
||||
return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), isLive, session);
|
||||
}
|
||||
|
||||
Object getVal(String name) {
|
||||
|
@ -101,25 +109,51 @@ public class Row implements MapWriter {
|
|||
return node;
|
||||
}
|
||||
|
||||
// this adds a replica to the replica info
|
||||
/**
|
||||
* this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica ,
|
||||
* values of certain attributes will be modified, in this node as well as other nodes. Please note that
|
||||
* the state of the current session is kept intact while this operation is being performed
|
||||
*
|
||||
* @param coll collection name
|
||||
* @param shard shard name
|
||||
* @param type replica type
|
||||
*/
|
||||
public Row addReplica(String coll, String shard, Replica.Type type) {
|
||||
Row row = copy();
|
||||
Row row = session.copy().getNode(this.node);
|
||||
if (row == null) throw new RuntimeException("couldn't get a row");
|
||||
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>());
|
||||
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
|
||||
String replicaname = "" + new Random().nextInt(1000) + 1000;
|
||||
replicas.add(new ReplicaInfo(replicaname, replicaname, coll, shard, type, this.node,
|
||||
Collections.singletonMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString())));
|
||||
ReplicaInfo ri = new ReplicaInfo(replicaname, replicaname, coll, shard, type, this.node,
|
||||
Utils.makeMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString()));
|
||||
replicas.add(ri);
|
||||
for (Cell cell : row.cells) {
|
||||
if (cell.name.equals("cores")) {
|
||||
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1;
|
||||
}
|
||||
cell.type.projectAddReplica(cell, ri);
|
||||
}
|
||||
return row;
|
||||
|
||||
}
|
||||
|
||||
|
||||
public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) {
|
||||
Map<String, List<ReplicaInfo>> c = collectionVsShardVsReplicas.get(coll);
|
||||
if (c == null) return null;
|
||||
List<ReplicaInfo> r = c.get(shard);
|
||||
if (r == null) return null;
|
||||
int idx = -1;
|
||||
for (int i = 0; i < r.size(); i++) {
|
||||
ReplicaInfo info = r.get(i);
|
||||
if (type == null || info.getType() == type) {
|
||||
idx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (idx == -1) return null;
|
||||
return r.get(idx);
|
||||
}
|
||||
|
||||
// this simulates removing a replica from a node
|
||||
public Pair<Row, ReplicaInfo> removeReplica(String coll, String shard, Replica.Type type) {
|
||||
Row row = copy();
|
||||
Row row = session.copy().getNode(this.node);
|
||||
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
|
||||
if (c == null) return null;
|
||||
List<ReplicaInfo> r = c.get(shard);
|
||||
|
@ -132,14 +166,12 @@ public class Row implements MapWriter {
|
|||
break;
|
||||
}
|
||||
}
|
||||
if(idx == -1) return null;
|
||||
|
||||
if (idx == -1) return null;
|
||||
ReplicaInfo removed = r.remove(idx);
|
||||
for (Cell cell : row.cells) {
|
||||
if (cell.name.equals("cores")) {
|
||||
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1;
|
||||
}
|
||||
cell.type.projectRemoveReplica(cell, removed);
|
||||
}
|
||||
return new Pair(row, r.remove(idx));
|
||||
return new Pair(row, removed);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
|||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
|
@ -63,7 +64,7 @@ public abstract class Suggester implements MapWriter {
|
|||
public Suggester hint(Hint hint, Object value) {
|
||||
hint.validator.accept(value);
|
||||
if (hint.multiValued) {
|
||||
Collection<?> values = value instanceof Collection ? (Collection)value : Collections.singletonList(value);
|
||||
Collection<?> values = value instanceof Collection ? (Collection) value : Collections.singletonList(value);
|
||||
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
|
||||
} else {
|
||||
hints.put(hint, value == null ? null : String.valueOf(value));
|
||||
|
@ -71,6 +72,10 @@ public abstract class Suggester implements MapWriter {
|
|||
return this;
|
||||
}
|
||||
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Normally, only less loaded nodes are used for moving replicas. If this is a violation and a MOVE must be performed,
|
||||
* set the flag to true.
|
||||
|
@ -80,7 +85,7 @@ public abstract class Suggester implements MapWriter {
|
|||
return this;
|
||||
}
|
||||
|
||||
protected boolean isNodeSuitable(Row row) {
|
||||
protected boolean isNodeSuitableForReplicaAddition(Row row) {
|
||||
if (!row.isLive) return false;
|
||||
if (!isAllowed(row.node, Hint.TARGET_NODE)) return false;
|
||||
if (!isAllowed(row.getVal(ImplicitSnitch.DISK), Hint.MINFREEDISK)) return false;
|
||||
|
@ -115,7 +120,7 @@ public abstract class Suggester implements MapWriter {
|
|||
if (srcNodes != null && !srcNodes.isEmpty()) {
|
||||
// the source node is dead so live nodes may not have it
|
||||
for (String srcNode : srcNodes) {
|
||||
if(session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
|
||||
if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
|
||||
session.matrix.add(new Row(srcNode, session.getPolicy().params, session.getPolicy().perReplicaAttributes, session));
|
||||
}
|
||||
}
|
||||
|
@ -185,7 +190,7 @@ public abstract class Suggester implements MapWriter {
|
|||
boolean containsNewErrors(List<Violation> violations) {
|
||||
for (Violation v : violations) {
|
||||
int idx = originalViolations.indexOf(v);
|
||||
if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true;
|
||||
if (idx < 0 /*|| originalViolations.get(idx).isLessSerious(v)*/) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -210,14 +215,14 @@ public abstract class Suggester implements MapWriter {
|
|||
if (!isAllowed(e.getKey(), Hint.COLL)) continue;
|
||||
for (Map.Entry<String, List<ReplicaInfo>> shard : e.getValue().entrySet()) {
|
||||
if (!isAllowed(new Pair<>(e.getKey(), shard.getKey()), Hint.COLL_SHARD)) continue;//todo fix
|
||||
if(shard.getValue() == null || shard.getValue().isEmpty()) continue;
|
||||
if (shard.getValue() == null || shard.getValue().isEmpty()) continue;
|
||||
replicaList.add(new Pair<>(shard.getValue().get(0), r));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<Violation> testChangedMatrix(boolean strict, List<Row> rows) {
|
||||
Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences,rows);
|
||||
Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, rows);
|
||||
List<Violation> errors = new ArrayList<>();
|
||||
for (Clause clause : session.expandedClauses) {
|
||||
if (strict || clause.strict) {
|
||||
|
@ -230,11 +235,6 @@ public abstract class Suggester implements MapWriter {
|
|||
return errors;
|
||||
}
|
||||
|
||||
ArrayList<Row> getModifiedMatrix(List<Row> matrix, Row tmpRow, int i) {
|
||||
ArrayList<Row> copy = new ArrayList<>(matrix);
|
||||
copy.set(i, tmpRow);
|
||||
return copy;
|
||||
}
|
||||
|
||||
protected boolean isAllowed(Object v, Hint hint) {
|
||||
Object hintVal = hints.get(hint);
|
||||
|
@ -263,7 +263,16 @@ public abstract class Suggester implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
}),
|
||||
}) {
|
||||
@Override
|
||||
public Object parse(Object v) {
|
||||
if (v instanceof Map) {
|
||||
Map map = (Map) v;
|
||||
return Pair.parse(map);
|
||||
}
|
||||
return super.parse(v);
|
||||
}
|
||||
},
|
||||
SRC_NODE(true),
|
||||
TARGET_NODE(true),
|
||||
REPLICATYPE(false, o -> {
|
||||
|
@ -277,7 +286,7 @@ public abstract class Suggester implements MapWriter {
|
|||
}, hintValVsActual -> {
|
||||
Double hintFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.first(), false);
|
||||
Double actualFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.second(), false);
|
||||
if(actualFreediskInGb == null) return false;
|
||||
if (actualFreediskInGb == null) return false;
|
||||
return actualFreediskInGb > hintFreediskInGb;
|
||||
});
|
||||
|
||||
|
@ -304,6 +313,17 @@ public abstract class Suggester implements MapWriter {
|
|||
this.valueValidator = testval;
|
||||
}
|
||||
|
||||
public static Hint get(String s) {
|
||||
for (Hint hint : values()) {
|
||||
if (hint.name().equals(s)) return hint;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public Object parse(Object v) {
|
||||
return v;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
@ -316,6 +336,7 @@ public abstract class Suggester implements MapWriter {
|
|||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("action", String.valueOf(getAction()));
|
||||
ew.put("hints", (MapWriter) ew1 -> hints.forEach((hint, o) -> ew1.putNoEx(hint.toString(), o)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO
|
|||
|
||||
public class Suggestion {
|
||||
public static final String coreidxsize = "INDEX.sizeInBytes";
|
||||
|
||||
static final Map<String, ConditionType> validatetypes = new HashMap<>();
|
||||
|
||||
public static ConditionType getTagType(String name) {
|
||||
|
@ -85,7 +86,7 @@ public class Suggestion {
|
|||
SolrRequest op = suggester.getSuggestion();
|
||||
if (op != null) {
|
||||
session = suggester.getSession();
|
||||
suggestions.add(new Suggester.SuggestionInfo( violation,
|
||||
suggestions.add(new Suggester.SuggestionInfo(violation,
|
||||
((V2RequestSupport) op.setUseV2(true)).getV2Request()));
|
||||
}
|
||||
return op;
|
||||
|
@ -107,17 +108,20 @@ public class Suggestion {
|
|||
.filter(tag -> tag.perReplicaValue != null)
|
||||
.collect(Collectors.toMap(tag -> tag.tagName, tag -> tag.perReplicaValue));
|
||||
|
||||
/**
|
||||
* Type details of each variable in policies
|
||||
*/
|
||||
public enum ConditionType {
|
||||
|
||||
COLL("collection", String.class, null, null, null),
|
||||
SHARD("shard", String.class, null, null, null),
|
||||
REPLICA("replica", Long.class, null, 0L, null),
|
||||
PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L) ,
|
||||
PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L),
|
||||
IP_1("ip_1", Long.class, null, 0L, 255L),
|
||||
IP_2("ip_2", Long.class, null, 0L, 255L),
|
||||
IP_3("ip_3", Long.class, null, 0L, 255L),
|
||||
IP_4("ip_4", Long.class, null, 0L, 255L),
|
||||
FREEDISK(ImplicitSnitch.DISK, Double.class, null, 0d, Double.MAX_VALUE, coreidxsize) {
|
||||
FREEDISK(ImplicitSnitch.DISK, Double.class, null, 0d, Double.MAX_VALUE, coreidxsize, Boolean.TRUE) {
|
||||
@Override
|
||||
public Object convertVal(Object val) {
|
||||
Number value = (Number) super.validate(ImplicitSnitch.DISK, val, false);
|
||||
|
@ -127,12 +131,19 @@ public class Suggestion {
|
|||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareViolation(Violation v1, Violation v2) {
|
||||
return Long.compare(
|
||||
v1.getViolatingReplicas().stream().mapToLong(v -> v.delta == null? 0 :v.delta).max().orElse(0l),
|
||||
v2.getViolatingReplicas().stream().mapToLong(v3 -> v3.delta == null? 0 : v3.delta).max().orElse(0l));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getSuggestions(SuggestionCtx ctx) {
|
||||
if (ctx.violation == null) return;
|
||||
if (ctx.violation.replicaCountDelta < 0 && !ctx.violation.getViolatingReplicas().isEmpty()) {
|
||||
|
||||
Comparator<Row> rowComparator = Comparator.comparing(r -> ((Long) r.getVal(ImplicitSnitch.DISK, 0l)));
|
||||
Comparator<Row> rowComparator = Comparator.comparing(r -> ((Double) r.getVal(ImplicitSnitch.DISK, 0d)));
|
||||
List<Row> matchingNodes = ctx.session.matrix.stream().filter(
|
||||
row -> ctx.violation.getViolatingReplicas()
|
||||
.stream()
|
||||
|
@ -141,29 +152,59 @@ public class Suggestion {
|
|||
.collect(Collectors.toList());
|
||||
|
||||
|
||||
for (Row r : matchingNodes) {
|
||||
for (Row node : matchingNodes) {
|
||||
//lets try to start moving the smallest cores off of the node
|
||||
ArrayList<ReplicaInfo> replicas = new ArrayList<>();
|
||||
r.forEachReplica(replicas::add);
|
||||
node.forEachReplica(replicas::add);
|
||||
replicas.sort((r1, r2) -> {
|
||||
Long s1 = Clause.parseLong(ConditionType.CORE_IDX.tagName, r1.getVariables().get(ConditionType.CORE_IDX.tagName));
|
||||
Long s2 = Clause.parseLong(ConditionType.CORE_IDX.tagName, r2.getVariables().get(ConditionType.CORE_IDX.tagName));
|
||||
if (s1 != null && s2 != null) return s1.compareTo(s2);
|
||||
return 0;
|
||||
});
|
||||
long currentDelta = ctx.violation.getClause().tag.delta(r.getVal(ImplicitSnitch.DISK));
|
||||
long currentDelta = ctx.violation.getClause().tag.delta(node.getVal(ImplicitSnitch.DISK));
|
||||
for (ReplicaInfo replica : replicas) {
|
||||
if (currentDelta <= 0) break;
|
||||
if (replica.getVariables().get(ConditionType.CORE_IDX.tagName) == null) continue;
|
||||
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
|
||||
.hint(Suggester.Hint.COLL_SHARD, new Pair<>(replica.getCollection(), replica.getShard()))
|
||||
.hint(Suggester.Hint.SRC_NODE, r.node);
|
||||
.hint(Suggester.Hint.SRC_NODE, node.node)
|
||||
.forceOperation(true);
|
||||
if (ctx.addSuggestion(suggester) == null) break;
|
||||
currentDelta -= Clause.parseLong(ConditionType.CORE_IDX.tagName, replica.getVariables().get(ConditionType.CORE_IDX.tagName));
|
||||
currentDelta -= Clause.parseLong(ConditionType.CORE_IDX.tagName, replica.getVariable(ConditionType.CORE_IDX.tagName));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//When a replica is added, freedisk should be incremented
|
||||
@Override
|
||||
public void projectAddReplica(Cell cell, ReplicaInfo ri) {
|
||||
//go through other replicas of this shard and copy the index size value into this
|
||||
for (Row row : cell.getRow().session.matrix) {
|
||||
row.forEachReplica(replicaInfo -> {
|
||||
if (ri != replicaInfo &&
|
||||
ri.getCollection().equals(replicaInfo.getCollection()) &&
|
||||
ri.getShard().equals(replicaInfo.getShard()) &&
|
||||
ri.getVariable(CORE_IDX.tagName) == null &&
|
||||
replicaInfo.getVariable(CORE_IDX.tagName) != null) {
|
||||
ri.getVariables().put(CORE_IDX.tagName, validate(CORE_IDX.tagName, replicaInfo.getVariable(CORE_IDX.tagName), false));
|
||||
}
|
||||
});
|
||||
}
|
||||
Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false);
|
||||
if (idxSize == null) return;
|
||||
Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val;
|
||||
cell.val = currFreeDisk - idxSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
|
||||
Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false);
|
||||
if (idxSize == null) return;
|
||||
Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val;
|
||||
cell.val = currFreeDisk + idxSize;
|
||||
}
|
||||
},
|
||||
CORE_IDX(coreidxsize, Double.class, null, 0d, Double.MAX_VALUE) {
|
||||
@Override
|
||||
|
@ -172,7 +213,7 @@ public class Suggestion {
|
|||
}
|
||||
},
|
||||
NODE_ROLE(ImplicitSnitch.NODEROLE, String.class, Collections.singleton("overseer"), null, null),
|
||||
CORES(ImplicitSnitch.CORES, Long.class, null, 0L, Long.MAX_VALUE) {
|
||||
CORES(ImplicitSnitch.CORES, Long.class, null, 0L, Long.MAX_VALUE, null, Boolean.TRUE) {
|
||||
@Override
|
||||
public void addViolatingReplicas(ViolationCtx ctx) {
|
||||
for (Row r : ctx.allRows) {
|
||||
|
@ -194,12 +235,22 @@ public class Suggestion {
|
|||
ctx.addSuggestion(suggester);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void projectAddReplica(Cell cell, ReplicaInfo ri) {
|
||||
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
|
||||
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1;
|
||||
}
|
||||
},
|
||||
SYSLOADAVG(ImplicitSnitch.SYSLOADAVG, Double.class, null, 0d, 100d),
|
||||
HEAPUSAGE(ImplicitSnitch.HEAPUSAGE, Double.class, null, 0d, null),
|
||||
NUMBER("NUMBER", Long.class, null, 0L, Long.MAX_VALUE),
|
||||
SYSLOADAVG(ImplicitSnitch.SYSLOADAVG, Double.class, null, 0d, 100d, null, Boolean.TRUE),
|
||||
HEAPUSAGE(ImplicitSnitch.HEAPUSAGE, Double.class, null, 0d, null, null, Boolean.TRUE),
|
||||
NUMBER("NUMBER", Long.class, null, 0L, Long.MAX_VALUE, null, Boolean.TRUE),
|
||||
|
||||
STRING("STRING", String.class, null, null, null),
|
||||
NODE("node", String.class, null, null, null) {
|
||||
@Override
|
||||
|
@ -228,7 +279,8 @@ public class Suggestion {
|
|||
perNodeSuggestions(ctx);
|
||||
}
|
||||
},
|
||||
DISKTYPE(ImplicitSnitch.DISKTYPE, String.class, unmodifiableSet(new HashSet(Arrays.asList("ssd", "rotational"))), null, null, null) {
|
||||
DISKTYPE(ImplicitSnitch.DISKTYPE, String.class,
|
||||
unmodifiableSet(new HashSet(Arrays.asList("ssd", "rotational"))), null, null, null, null) {
|
||||
@Override
|
||||
public void getSuggestions(SuggestionCtx ctx) {
|
||||
perNodeSuggestions(ctx);
|
||||
|
@ -239,21 +291,24 @@ public class Suggestion {
|
|||
final Set<String> vals;
|
||||
final Number min;
|
||||
final Number max;
|
||||
final Boolean additive;
|
||||
public final String tagName;
|
||||
public final String perReplicaValue;
|
||||
|
||||
ConditionType(String tagName, Class type, Set<String> vals, Number min, Number max) {
|
||||
this(tagName, type, vals, min, max, null);
|
||||
this(tagName, type, vals, min, max, null, null);
|
||||
|
||||
}
|
||||
|
||||
ConditionType(String tagName, Class type, Set<String> vals, Number min, Number max, String perReplicaValue) {
|
||||
ConditionType(String tagName, Class type, Set<String> vals, Number min, Number max, String perReplicaValue,
|
||||
Boolean additive) {
|
||||
this.tagName = tagName;
|
||||
this.type = type;
|
||||
this.vals = vals;
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
this.perReplicaValue = perReplicaValue;
|
||||
this.additive = additive;
|
||||
}
|
||||
|
||||
public void getSuggestions(SuggestionCtx ctx) {
|
||||
|
@ -265,7 +320,7 @@ public class Suggestion {
|
|||
row.forEachReplica(replica -> {
|
||||
if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
|
||||
if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
|
||||
if(!ctx.currentViolation.matchShard(replica.getShard())) return;
|
||||
if (!ctx.currentViolation.matchShard(replica.getShard())) return;
|
||||
if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
|
||||
return;
|
||||
ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
|
||||
|
@ -311,6 +366,21 @@ public class Suggestion {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate a replica addition to a node in the cluster
|
||||
*/
|
||||
public void projectAddReplica(Cell cell, ReplicaInfo ri) {
|
||||
}
|
||||
|
||||
public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
|
||||
}
|
||||
|
||||
public int compareViolation(Violation v1, Violation v2) {
|
||||
if (v2.replicaCountDelta == null || v1.replicaCountDelta == null) return 0;
|
||||
if (Math.abs(v1.replicaCountDelta) == Math.abs(v2.replicaCountDelta)) return 0;
|
||||
return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
private static void perNodeSuggestions(SuggestionCtx ctx) {
|
||||
|
|
|
@ -91,8 +91,7 @@ public class Violation implements MapWriter {
|
|||
}
|
||||
//if the delta is lower , this violation is less serious
|
||||
public boolean isLessSerious(Violation that) {
|
||||
return that.replicaCountDelta != null && replicaCountDelta != null &&
|
||||
Math.abs(replicaCountDelta) < Math.abs(that.replicaCountDelta);
|
||||
return this.getClause().tag.varType.compareViolation(this,that) <0 ;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,7 +101,7 @@ public class Violation implements MapWriter {
|
|||
return Objects.equals(this.shard, v.shard) &&
|
||||
Objects.equals(this.coll, v.coll) &&
|
||||
Objects.equals(this.node, v.node) &&
|
||||
Objects.equals(this.tagKey, v.tagKey)
|
||||
Objects.equals(this.clause, v.clause)
|
||||
;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.common.util;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
|
@ -65,4 +66,8 @@ public class Pair<T1, T2> implements Serializable, MapWriter {
|
|||
ew.put("second", second);
|
||||
}
|
||||
|
||||
public static Pair parse(Map m) {
|
||||
return new Pair(m.get("first"), m.get("second"));
|
||||
}
|
||||
|
||||
}
|
|
@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ObjectCache;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
@ -62,6 +63,26 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO
|
|||
|
||||
public class TestPolicy extends SolrTestCaseJ4 {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private Suggester createSuggester(SolrCloudManager cloudManager, Map jsonObj, Suggester seed) throws IOException, InterruptedException {
|
||||
Policy.Session session = null;
|
||||
if (seed != null) session = seed.session;
|
||||
else {
|
||||
session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
|
||||
}
|
||||
|
||||
Map m = (Map) jsonObj.get("suggester");
|
||||
Suggester result = session.getSuggester(CollectionParams.CollectionAction.get((String) m.get("action")));
|
||||
m = (Map) m.get("hints");
|
||||
m.forEach((k, v) -> {
|
||||
Hint hint = Hint.get(k.toString());
|
||||
result.hint(hint, hint.parse(v));
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
private SolrCloudManager createCloudManager(Map jsonObj) {
|
||||
return cloudManagerWithData(jsonObj);
|
||||
}
|
||||
|
||||
public static String clusterState = "{'gettingstarted':{" +
|
||||
" 'router':{'name':'compositeId'}," +
|
||||
|
@ -129,52 +150,52 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
public void testValidate() {
|
||||
expectError("replica", -1, "must be greater than" );
|
||||
expectError("replica","hello", "not a valid number" );
|
||||
assertEquals( 1l, Clause.validate("replica", "1", true));
|
||||
assertEquals("c", Clause.validate("collection", "c", true));
|
||||
assertEquals( "s", Clause.validate("shard", "s",true));
|
||||
assertEquals( "overseer", Clause.validate("nodeRole", "overseer",true));
|
||||
expectError("replica", -1, "must be greater than");
|
||||
expectError("replica", "hello", "not a valid number");
|
||||
assertEquals(1l, Clause.validate("replica", "1", true));
|
||||
assertEquals("c", Clause.validate("collection", "c", true));
|
||||
assertEquals("s", Clause.validate("shard", "s", true));
|
||||
assertEquals("overseer", Clause.validate("nodeRole", "overseer", true));
|
||||
|
||||
expectError("nodeRole", "wrong","must be one of");
|
||||
expectError("nodeRole", "wrong", "must be one of");
|
||||
|
||||
expectError("sysLoadAvg", "101","must be less than ");
|
||||
expectError("sysLoadAvg", 101,"must be less than ");
|
||||
expectError("sysLoadAvg", "-1","must be greater than");
|
||||
expectError("sysLoadAvg", -1,"must be greater than");
|
||||
expectError("sysLoadAvg", "101", "must be less than ");
|
||||
expectError("sysLoadAvg", 101, "must be less than ");
|
||||
expectError("sysLoadAvg", "-1", "must be greater than");
|
||||
expectError("sysLoadAvg", -1, "must be greater than");
|
||||
|
||||
assertEquals(12.46d,Clause.validate("sysLoadAvg", "12.46",true));
|
||||
assertEquals(12.46,Clause.validate("sysLoadAvg", 12.46d,true));
|
||||
assertEquals(12.46d, Clause.validate("sysLoadAvg", "12.46", true));
|
||||
assertEquals(12.46, Clause.validate("sysLoadAvg", 12.46d, true));
|
||||
|
||||
|
||||
expectError("ip_1", "300","must be less than ");
|
||||
expectError("ip_1", 300,"must be less than ");
|
||||
expectError("ip_1", "-1","must be greater than");
|
||||
expectError("ip_1", -1,"must be greater than");
|
||||
expectError("ip_1", "300", "must be less than ");
|
||||
expectError("ip_1", 300, "must be less than ");
|
||||
expectError("ip_1", "-1", "must be greater than");
|
||||
expectError("ip_1", -1, "must be greater than");
|
||||
|
||||
assertEquals(1l,Clause.validate("ip_1", "1",true));
|
||||
assertEquals(1l, Clause.validate("ip_1", "1", true));
|
||||
|
||||
expectError("heapUsage", "-1","must be greater than");
|
||||
expectError("heapUsage", -1,"must be greater than");
|
||||
assertEquals(69.9d,Clause.validate("heapUsage", "69.9",true));
|
||||
assertEquals(69.9d,Clause.validate("heapUsage", 69.9d,true));
|
||||
expectError("heapUsage", "-1", "must be greater than");
|
||||
expectError("heapUsage", -1, "must be greater than");
|
||||
assertEquals(69.9d, Clause.validate("heapUsage", "69.9", true));
|
||||
assertEquals(69.9d, Clause.validate("heapUsage", 69.9d, true));
|
||||
|
||||
expectError("port", "70000","must be less than ");
|
||||
expectError("port", 70000,"must be less than ");
|
||||
expectError("port", "0","must be greater than");
|
||||
expectError("port", 0,"must be greater than");
|
||||
expectError("port", "70000", "must be less than ");
|
||||
expectError("port", 70000, "must be less than ");
|
||||
expectError("port", "0", "must be greater than");
|
||||
expectError("port", 0, "must be greater than");
|
||||
|
||||
expectError("cores", "-1","must be greater than");
|
||||
expectError("cores", "-1", "must be greater than");
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static void expectError(String name, Object val, String msg){
|
||||
private static void expectError(String name, Object val, String msg) {
|
||||
try {
|
||||
Clause.validate(name, val,true);
|
||||
fail("expected exception containing "+msg);
|
||||
Clause.validate(name, val, true);
|
||||
fail("expected exception containing " + msg);
|
||||
} catch (Exception e) {
|
||||
assertTrue("expected exception containing "+msg,e.getMessage().contains(msg));
|
||||
assertTrue("expected exception containing " + msg, e.getMessage().contains(msg));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -282,7 +303,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata));
|
||||
SolrRequest op = session.getSuggester(MOVEREPLICA).hint(Hint.SRC_NODE, "127.0.0.1:65427_solr").getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertEquals( "127.0.0.1:65434_solr",op.getParams().get("targetNode") );
|
||||
assertEquals("127.0.0.1:65434_solr", op.getParams().get("targetNode"));
|
||||
}
|
||||
|
||||
public void testNodeLostMultipleReplica() {
|
||||
|
@ -419,7 +440,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
private static SolrCloudManager cloudManagerWithData(String data) {
|
||||
final Map m = (Map) Utils.fromJSONString(data);
|
||||
return cloudManagerWithData((Map) Utils.fromJSONString(data));
|
||||
}
|
||||
|
||||
private static SolrCloudManager cloudManagerWithData(Map m) {
|
||||
Map replicaInfo = (Map) m.get("replicaInfo");
|
||||
replicaInfo.forEach((node, val) -> {
|
||||
Map m1 = (Map) val;
|
||||
|
@ -433,15 +457,26 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
String name = m3.keySet().iterator().next().toString();
|
||||
m3 = (Map) m3.get(name);
|
||||
Replica.Type type = Replica.Type.get((String) m3.get("type"));
|
||||
l3.set(i, new ReplicaInfo(name,name
|
||||
l3.set(i, new ReplicaInfo(name, name
|
||||
, coll.toString(), shard.toString(), type, (String) node, m3));
|
||||
}
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
});
|
||||
AutoScalingConfig asc = m.containsKey("autoscalingJson") ? new AutoScalingConfig((Map<String, Object>) m.get("autoscalingJson")) : null;
|
||||
return new DelegatingCloudManager(null) {
|
||||
|
||||
@Override
|
||||
public DistribStateManager getDistribStateManager() {
|
||||
return new DelegatingDistribStateManager(null) {
|
||||
@Override
|
||||
public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
|
||||
return asc;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterStateProvider getClusterStateProvider() {
|
||||
return new DelegatingClusterStateProvider(null) {
|
||||
|
@ -497,8 +532,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
|
||||
SolrRequest op = suggester.getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
|
||||
assertEquals("PULL type node must be in 'slowdisk' node","node1", op.getParams().get("node"));
|
||||
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
|
||||
assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node"));
|
||||
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(ADDREPLICA)
|
||||
|
@ -506,8 +541,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
|
||||
op = suggester.getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
|
||||
assertEquals("PULL type node must be in 'slowdisk' node","node1", op.getParams().get("node"));
|
||||
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
|
||||
assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node"));
|
||||
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(ADDREPLICA)
|
||||
|
@ -515,8 +550,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
|
||||
op = suggester.getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
|
||||
assertEquals("TLOG type node must be in 'ssd' node","node3", op.getParams().get("node"));
|
||||
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
|
||||
assertEquals("TLOG type node must be in 'ssd' node", "node3", op.getParams().get("node"));
|
||||
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(ADDREPLICA)
|
||||
|
@ -524,15 +559,15 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
|
||||
op = suggester.getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
|
||||
assertEquals("TLOG type node must be in 'ssd' node","node3", op.getParams().get("node"));
|
||||
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
|
||||
assertEquals("TLOG type node must be in 'ssd' node", "node3", op.getParams().get("node"));
|
||||
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
|
||||
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
|
||||
op = suggester.getSuggestion();
|
||||
assertNull("No node should qualify for this" ,op);
|
||||
assertNull("No node should qualify for this", op);
|
||||
|
||||
}
|
||||
|
||||
|
@ -669,7 +704,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
Map policies = (Map) Utils.fromJSONString("{" +
|
||||
" 'cluster-preferences': [" +
|
||||
" { 'maximize': 'freedisk', 'precision': 50}," +
|
||||
" { 'minimize': 'cores', 'precision': 50}" +
|
||||
" { 'minimize': 'cores', 'precision': 1}" +
|
||||
" ]," +
|
||||
" 'cluster-policy': [" +
|
||||
" { 'replica': 0, 'nodeRole': 'overseer'}" +
|
||||
|
@ -698,16 +733,16 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
int countNewColl2Op = 0;
|
||||
while ((op = suggester.getSuggestion()) != null) {
|
||||
countOp++;
|
||||
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
|
||||
String collection = op.getParams().get("collection");
|
||||
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
|
||||
if (collection.equals("newColl")) countNewCollOp++;
|
||||
else countNewColl2Op++;
|
||||
assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node"));
|
||||
suggester = suggester.getSession().getSuggester(ADDREPLICA)
|
||||
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard1"));
|
||||
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
|
||||
String collection = op.getParams().get("collection");
|
||||
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
|
||||
if (collection.equals("newColl")) countNewCollOp++;
|
||||
else countNewColl2Op++;
|
||||
assertEquals("PULL type node must be in 'slowdisk' node","node1", op.getParams().get("node"));
|
||||
}
|
||||
assertEquals(2, countOp);
|
||||
assertEquals(1, countNewCollOp);
|
||||
|
@ -723,17 +758,17 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
|
||||
while ((op = suggester.getSuggestion()) != null) {
|
||||
countOp++;
|
||||
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
|
||||
String collection = op.getParams().get("collection");
|
||||
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
|
||||
if (collection.equals("newColl")) countNewCollOp++;
|
||||
else countNewColl2Op++;
|
||||
assertEquals("TLOG type node must be in 'ssd' node", "node3", op.getParams().get("node"));
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard2"))
|
||||
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
|
||||
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
|
||||
String collection = op.getParams().get("collection");
|
||||
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
|
||||
if (collection.equals("newColl")) countNewCollOp++;
|
||||
else countNewColl2Op++;
|
||||
assertEquals("TLOG type node must be in 'ssd' node","node3", op.getParams().get("node"));
|
||||
}
|
||||
assertEquals(3, countOp);
|
||||
assertEquals(1, countNewCollOp);
|
||||
|
@ -741,9 +776,44 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
public void testRow() {
|
||||
Row row = new Row("nodex", new Cell[]{new Cell(0, "node", "nodex")}, false, new HashMap<>(), true);
|
||||
Policy policy = new Policy();
|
||||
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
|
||||
@Override
|
||||
public NodeStateProvider getNodeStateProvider() {
|
||||
return new DelegatingNodeStateProvider(null) {
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> o = (Map<String, Map<String, List<ReplicaInfo>>>) Utils.fromJSONString("{c1: {s0:[{}]}}");
|
||||
Utils.setObjectByPath(o, "c1/s0[0]", new ReplicaInfo("r0", "c1.s0", "c1", "s0", Replica.Type.NRT, "nodex", new HashMap<>()));
|
||||
return o;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||
return Utils.makeMap("node", "nodex", "cores", 1);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterStateProvider getClusterStateProvider() {
|
||||
return new DelegatingClusterStateProvider(null) {
|
||||
@Override
|
||||
public String getPolicyNameByCollection(String coll) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
return Collections.singleton("nodex");
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
Row row = session.getNode("nodex");
|
||||
Row r1 = row.addReplica("c1", "s1", null);
|
||||
Row r2 = r1.addReplica("c1", "s1",null);
|
||||
Row r2 = r1.addReplica("c1", "s1", null);
|
||||
assertEquals(1, r1.collectionVsShardVsReplicas.get("c1").get("s1").size());
|
||||
assertEquals(2, r2.collectionVsShardVsReplicas.get("c1").get("s1").size());
|
||||
assertTrue(r2.collectionVsShardVsReplicas.get("c1").get("s1").get(0) instanceof ReplicaInfo);
|
||||
|
@ -831,7 +901,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertTrue(violations.stream().anyMatch(violation -> (violation.getClause().replica.getOperand() == Operand.LESS_THAN && "node".equals(violation.getClause().tag.getName()))));
|
||||
|
||||
Suggester suggester = session.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("gettingstarted","r1"));
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("gettingstarted", "r1"));
|
||||
SolrParams operation = suggester.getSuggestion().getParams();
|
||||
assertEquals("node2", operation.get("node"));
|
||||
|
||||
|
@ -974,7 +1044,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
for (int i = 0; i < 3; i++) {
|
||||
Suggester suggester = session.getSuggester(ADDREPLICA);
|
||||
SolrRequest op = suggester
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl","shard1"))
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
|
||||
.getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertEquals("node3", op.getParams().get("node"));
|
||||
|
@ -1085,7 +1155,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
Suggester suggester = session.getSuggester(MOVEREPLICA)
|
||||
.hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr");
|
||||
SolrRequest op = suggester.getSuggestion();
|
||||
assertNotNull(op);
|
||||
assertNotNull("expect a non null operation", op);
|
||||
}
|
||||
|
||||
public void testOtherTag() {
|
||||
|
@ -1229,8 +1299,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
};
|
||||
}
|
||||
|
||||
public void testEmptyClusterState(){
|
||||
String autoScaleJson = " {'policies':{'c1':[{" +
|
||||
public void testEmptyClusterState() {
|
||||
String autoScaleJson = " {'policies':{'c1':[{" +
|
||||
" 'replica':1," +
|
||||
" 'shard':'#EACH'," +
|
||||
" 'port':'50096'}]}}";
|
||||
|
@ -1247,7 +1317,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
return new DelegatingClusterStateProvider(null) {
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
return new HashSet<>(Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr"));
|
||||
return new HashSet<>(Arrays.asList("127.0.0.1:50097_solr", "127.0.0.1:50096_solr"));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -1270,10 +1340,10 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
}
|
||||
};
|
||||
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
|
||||
"newColl", new AutoScalingConfig((Map<String, Object>)Utils.fromJSONString(autoScaleJson)),
|
||||
"newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),
|
||||
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, 0, 0, null);
|
||||
|
||||
assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr")) );
|
||||
assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr")));
|
||||
}
|
||||
|
||||
public void testMultiReplicaPlacement() {
|
||||
|
@ -1333,11 +1403,11 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
};
|
||||
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
|
||||
"newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),
|
||||
cloudManager, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3,0,0, null);
|
||||
assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2", "node1", "node3").contains(it.node)) );
|
||||
cloudManager, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3, 0, 0, null);
|
||||
assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2", "node1", "node3").contains(it.node)));
|
||||
}
|
||||
|
||||
public void testMoveReplicaSuggester(){
|
||||
public void testMoveReplicaSuggester() {
|
||||
String dataproviderdata = "{" +
|
||||
" 'liveNodes':[" +
|
||||
" '10.0.0.6:7574_solr'," +
|
||||
|
@ -1402,7 +1472,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
.hint(Hint.TARGET_NODE, "127.0.0.1:51147_solr");
|
||||
SolrRequest op = suggester.getSuggestion();
|
||||
log.info("" + op);
|
||||
assertNotNull(op);
|
||||
assertNotNull("operation expected ", op);
|
||||
}
|
||||
|
||||
public void testReplicaCountSuggestions() {
|
||||
|
@ -1440,7 +1510,6 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
assertEquals("core_node2", Utils.getObjectByPath(m, true, "operation/command/move-replica/replica"));
|
||||
}
|
||||
|
||||
// @Ignore
|
||||
public void testFreeDiskSuggestions() {
|
||||
String dataproviderdata = "{" +
|
||||
" liveNodes:[node1,node2]," +
|
||||
|
@ -1457,8 +1526,6 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
|
||||
String autoScalingjson = " { cluster-policy:[" +
|
||||
// " { cores :'<10', node :'#ANY'}," +
|
||||
// " { replica :'<2', shard:'#EACH' node:'#ANY'}," +
|
||||
" { replica :'0', freedisk:'<1000'}," +
|
||||
" { nodeRole : overseer, replica :0}]," +
|
||||
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
|
||||
|
@ -1559,7 +1626,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(cfg, cloudManagerWithData(dataproviderdata));
|
||||
assertEquals(2, suggestions.size());
|
||||
for (Suggester.SuggestionInfo suggestion : suggestions) {
|
||||
Utils.getObjectByPath(suggestion ,true, "operation/move-replica/targetNode");
|
||||
Utils.getObjectByPath(suggestion, true, "operation/move-replica/targetNode");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1651,18 +1718,18 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
" { nodeRole:overseer,replica:0}]}";
|
||||
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
|
||||
Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata));
|
||||
Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA)
|
||||
Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("coll1", "shard1"))
|
||||
.hint(Hint.MINFREEDISK, 150);
|
||||
CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) suggester.getSuggestion();
|
||||
|
||||
assertEquals("127.0.0.1:51078_solr" , op.getNode());
|
||||
assertEquals("127.0.0.1:51078_solr", op.getNode());
|
||||
|
||||
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA)
|
||||
suggester = session.getSuggester(CollectionAction.ADDREPLICA)
|
||||
.hint(Hint.COLL_SHARD, new Pair<>("coll1", "shard1"));
|
||||
op = (CollectionAdminRequest.AddReplica) suggester.getSuggestion();
|
||||
|
||||
assertEquals("127.0.0.1:51147_solr" , op.getNode());
|
||||
assertEquals("127.0.0.1:51147_solr", op.getNode());
|
||||
}
|
||||
|
||||
public void testDiskSpaceReqd() {
|
||||
|
@ -1744,14 +1811,15 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
cloudManager, null, Arrays.asList("shard1", "shard2"), 1, 0, 0, null);
|
||||
assertTrue(locations.stream().allMatch(it -> "node3".equals(it.node)));
|
||||
}
|
||||
public void testMoveReplicaLeaderlast(){
|
||||
|
||||
List<Pair<ReplicaInfo, Row>> validReplicas = new ArrayList<>();
|
||||
public void testMoveReplicaLeaderlast() {
|
||||
|
||||
List<Pair<ReplicaInfo, Row>> validReplicas = new ArrayList<>();
|
||||
Replica replica = new Replica("r1", Utils.makeMap("leader", "true"));
|
||||
ReplicaInfo replicaInfo = new ReplicaInfo("c1", "s1", replica, new HashMap<>());
|
||||
validReplicas.add(new Pair<>(replicaInfo, null));
|
||||
|
||||
replicaInfo = new ReplicaInfo("r4", "c1_s2_r1","c1", "s2", Replica.Type.NRT, "n1", Collections.singletonMap("leader", "true"));
|
||||
replicaInfo = new ReplicaInfo("r4", "c1_s2_r1", "c1", "s2", Replica.Type.NRT, "n1", Collections.singletonMap("leader", "true"));
|
||||
validReplicas.add(new Pair<>(replicaInfo, null));
|
||||
|
||||
|
||||
|
@ -1772,4 +1840,322 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
}
|
||||
|
||||
public void testScheduledTriggerFailure() throws Exception {
|
||||
String state = "{" +
|
||||
" 'liveNodes': [" +
|
||||
" '127.0.0.1:49221_solr'," +
|
||||
" '127.0.0.1:49210_solr'" +
|
||||
" ]," +
|
||||
" 'suggester': {" +
|
||||
" 'action': 'MOVEREPLICA'," +
|
||||
" 'hints': {}" +
|
||||
" }," +
|
||||
" 'replicaInfo': {" +
|
||||
" '127.0.0.1:49210_solr': {" +
|
||||
" 'testScheduledTrigger': {" +
|
||||
" 'shard1': [" +
|
||||
" {" +
|
||||
" 'core_node3': {" +
|
||||
" 'base_url': 'http://127.0.0.1:49210/solr'," +
|
||||
" 'node_name': '127.0.0.1:49210_solr'," +
|
||||
" 'core': 'testScheduledTrigger_shard1_replica_n1'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'shard': 'shard1'," +
|
||||
" 'collection': 'testScheduledTrigger'" +
|
||||
" }" +
|
||||
" }," +
|
||||
" {" +
|
||||
" 'core_node6': {" +
|
||||
" 'base_url': 'http://127.0.0.1:49210/solr'," +
|
||||
" 'node_name': '127.0.0.1:49210_solr'," +
|
||||
" 'core': 'testScheduledTrigger_shard1_replica_n4'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'shard': 'shard1'," +
|
||||
" 'collection': 'testScheduledTrigger'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
" }," +
|
||||
" '127.0.0.1:49221_solr': {" +
|
||||
" 'testScheduledTrigger': {" +
|
||||
" 'shard1': [" +
|
||||
" {" +
|
||||
" 'core_node5': {" +
|
||||
" 'core': 'testScheduledTrigger_shard1_replica_n2'," +
|
||||
" 'leader': 'true'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'base_url': 'http://127.0.0.1:49221/solr'," +
|
||||
" 'node_name': '127.0.0.1:49221_solr'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'shard': 'shard1'," +
|
||||
" 'collection': 'testScheduledTrigger'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
" }" +
|
||||
" }," +
|
||||
" 'nodeValues': {" +
|
||||
" '127.0.0.1:49210_solr': {" +
|
||||
" 'node': '127.0.0.1:49210_solr'," +
|
||||
" 'cores': 2," +
|
||||
" 'freedisk': 197.39717864990234" +
|
||||
" }," +
|
||||
" '127.0.0.1:49221_solr': {" +
|
||||
" 'node': '127.0.0.1:49221_solr'," +
|
||||
" 'cores': 1," +
|
||||
" 'freedisk': 197.39717864990234" +
|
||||
" }" +
|
||||
" }," +
|
||||
" 'autoscalingJson': {" +
|
||||
" 'cluster-preferences': [" +
|
||||
" {" +
|
||||
" 'minimize': 'cores'," +
|
||||
" 'precision': 1" +
|
||||
" }," +
|
||||
" {" +
|
||||
" 'maximize': 'freedisk'" +
|
||||
" }" +
|
||||
" ]," +
|
||||
" 'cluster-policy': [" +
|
||||
" {" +
|
||||
" 'cores': '<3'," +
|
||||
" 'node': '#EACH'" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
"}";
|
||||
Map jsonObj = (Map) Utils.fromJSONString(state);
|
||||
SolrCloudManager cloudManager = createCloudManager(jsonObj);
|
||||
Suggester suggester = createSuggester(cloudManager, jsonObj, null);
|
||||
int count = 0;
|
||||
while (count < 10) {
|
||||
CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion();
|
||||
if (op == null) break;
|
||||
count++;
|
||||
log.info("OP:{}", op.getParams());
|
||||
suggester = createSuggester(cloudManager, jsonObj, suggester);
|
||||
}
|
||||
|
||||
assertEquals(0, count);
|
||||
}
|
||||
|
||||
public void testUtilizeNodeFailure() throws Exception {
|
||||
String state = "{'liveNodes': ['127.0.0.1:50417_solr', '127.0.0.1:50418_solr', '127.0.0.1:50419_solr', '127.0.0.1:50420_solr', '127.0.0.1:50443_solr']," +
|
||||
" 'suggester': {" +
|
||||
" 'action': 'MOVEREPLICA'," +
|
||||
" 'hints': {'TARGET_NODE': ['127.0.0.1:50443_solr']}" +
|
||||
" }," +
|
||||
" 'replicaInfo': {" +
|
||||
" '127.0.0.1:50418_solr': {" +
|
||||
" 'utilizenodecoll': {" +
|
||||
" 'shard2': [" +
|
||||
" {" +
|
||||
" 'core_node7': {" +
|
||||
" 'core': 'utilizenodecoll_shard2_replica_n4'," +
|
||||
" 'leader': 'true'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'base_url': 'http://127.0.0.1:50418/solr'," +
|
||||
" 'node_name': '127.0.0.1:50418_solr'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'shard': 'shard2'," +
|
||||
" 'collection': 'utilizenodecoll'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
" }," +
|
||||
" '127.0.0.1:50417_solr': {" +
|
||||
" 'utilizenodecoll': {" +
|
||||
" 'shard2': [" +
|
||||
" {" +
|
||||
" 'core_node8': {" +
|
||||
" 'base_url': 'http://127.0.0.1:50417/solr'," +
|
||||
" 'node_name': '127.0.0.1:50417_solr'," +
|
||||
" 'core': 'utilizenodecoll_shard2_replica_n6'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'shard': 'shard2'," +
|
||||
" 'collection': 'utilizenodecoll'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
" }," +
|
||||
" '127.0.0.1:50419_solr': {" +
|
||||
" 'utilizenodecoll': {" +
|
||||
" 'shard1': [" +
|
||||
" {" +
|
||||
" 'core_node5': {" +
|
||||
" 'base_url': 'http://127.0.0.1:50419/solr'," +
|
||||
" 'node_name': '127.0.0.1:50419_solr'," +
|
||||
" 'core': 'utilizenodecoll_shard1_replica_n2'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'shard': 'shard1'," +
|
||||
" 'collection': 'utilizenodecoll'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
" }," +
|
||||
" '127.0.0.1:50420_solr': {" +
|
||||
" 'utilizenodecoll': {" +
|
||||
" 'shard1': [" +
|
||||
" {" +
|
||||
" 'core_node3': {" +
|
||||
" 'core': 'utilizenodecoll_shard1_replica_n1'," +
|
||||
" 'leader': 'true'," +
|
||||
" 'INDEX.sizeInBytes': 6.426125764846802E-8," +
|
||||
" 'base_url': 'http://127.0.0.1:50420/solr'," +
|
||||
" 'node_name': '127.0.0.1:50420_solr'," +
|
||||
" 'state': 'active'," +
|
||||
" 'type': 'NRT'," +
|
||||
" 'shard': 'shard1'," +
|
||||
" 'collection': 'utilizenodecoll'" +
|
||||
" }" +
|
||||
" }" +
|
||||
" ]" +
|
||||
" }" +
|
||||
" }," +
|
||||
" '127.0.0.1:50443_solr': {}" +
|
||||
" }," +
|
||||
" 'nodeValues': {" +
|
||||
" '127.0.0.1:50418_solr': {" +
|
||||
" 'cores': 1," +
|
||||
" 'freedisk': 187.70782089233398" +
|
||||
" }," +
|
||||
" '127.0.0.1:50417_solr': {" +
|
||||
" 'cores': 1," +
|
||||
" 'freedisk': 187.70782089233398" +
|
||||
" }," +
|
||||
" '127.0.0.1:50419_solr': {" +
|
||||
" 'cores': 1," +
|
||||
" 'freedisk': 187.70782089233398" +
|
||||
" }," +
|
||||
" '127.0.0.1:50420_solr': {" +
|
||||
" 'cores': 1," +
|
||||
" 'freedisk': 187.70782089233398" +
|
||||
" }," +
|
||||
" '127.0.0.1:50443_solr': {" +
|
||||
" 'cores': 0," +
|
||||
" 'freedisk': 187.70782089233398" +
|
||||
" }" +
|
||||
" }," +
|
||||
" 'autoscalingJson': {" +
|
||||
" 'cluster-preferences': [" +
|
||||
" {'minimize': 'cores', 'precision': 1}," +
|
||||
" {'maximize': 'freedisk'}" +
|
||||
" ]" +
|
||||
" }" +
|
||||
"}";
|
||||
Map jsonObj = (Map) Utils.fromJSONString(state);
|
||||
SolrCloudManager cloudManager = createCloudManager(jsonObj);
|
||||
Suggester suggester = createSuggester(cloudManager, jsonObj, null);
|
||||
int count = 0;
|
||||
while (count < 100) {
|
||||
CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion();
|
||||
if (op == null) break;
|
||||
count++;
|
||||
log.info("OP:{}", op.getParams());
|
||||
suggester = createSuggester(cloudManager, jsonObj, suggester);
|
||||
}
|
||||
|
||||
assertEquals("count = "+count ,0,count);
|
||||
}
|
||||
public void testUtilizeNodeFailure2() throws Exception {
|
||||
String state = "{ 'liveNodes':[" +
|
||||
" '127.0.0.1:51075_solr'," +
|
||||
" '127.0.0.1:51076_solr'," +
|
||||
" '127.0.0.1:51077_solr'," +
|
||||
" '127.0.0.1:51097_solr']," +
|
||||
" 'suggester':{" +
|
||||
" 'action':'MOVEREPLICA'," +
|
||||
" 'hints':{'TARGET_NODE':['127.0.0.1:51097_solr']}}," +
|
||||
" 'replicaInfo':{" +
|
||||
" '127.0.0.1:51076_solr':{'utilizenodecoll':{'shard1':[{'core_node5':{" +
|
||||
" 'base_url':'https://127.0.0.1:51076/solr'," +
|
||||
" 'node_name':'127.0.0.1:51076_solr'," +
|
||||
" 'core':'utilizenodecoll_shard1_replica_n2'," +
|
||||
" 'state':'active'," +
|
||||
" 'type':'NRT'," +
|
||||
" 'INDEX.sizeInBytes':6.426125764846802E-8," +
|
||||
" 'shard':'shard1'," +
|
||||
" 'collection':'utilizenodecoll'}}]}}," +
|
||||
" '127.0.0.1:51077_solr':{'utilizenodecoll':{" +
|
||||
" 'shard2':[{'core_node8':{" +
|
||||
" 'base_url':'https://127.0.0.1:51077/solr'," +
|
||||
" 'node_name':'127.0.0.1:51077_solr'," +
|
||||
" 'core':'utilizenodecoll_shard2_replica_n6'," +
|
||||
" 'state':'active'," +
|
||||
" 'type':'NRT'," +
|
||||
" 'INDEX.sizeInBytes':6.426125764846802E-8," +
|
||||
" 'shard':'shard2'," +
|
||||
" 'collection':'utilizenodecoll'}}]," +
|
||||
" 'shard1':[{'core_node3':{" +
|
||||
" 'core':'utilizenodecoll_shard1_replica_n1'," +
|
||||
" 'leader':'true'," +
|
||||
" 'INDEX.sizeInBytes':6.426125764846802E-8," +
|
||||
" 'base_url':'https://127.0.0.1:51077/solr'," +
|
||||
" 'node_name':'127.0.0.1:51077_solr'," +
|
||||
" 'state':'active'," +
|
||||
" 'type':'NRT'," +
|
||||
" 'shard':'shard1'," +
|
||||
" 'collection':'utilizenodecoll'}}]}}," +
|
||||
" '127.0.0.1:51097_solr':{}," +
|
||||
" '127.0.0.1:51075_solr':{'utilizenodecoll':{'shard2':[{'core_node7':{" +
|
||||
" 'core':'utilizenodecoll_shard2_replica_n4'," +
|
||||
" 'leader':'true'," +
|
||||
" 'INDEX.sizeInBytes':6.426125764846802E-8," +
|
||||
" 'base_url':'https://127.0.0.1:51075/solr'," +
|
||||
" 'node_name':'127.0.0.1:51075_solr'," +
|
||||
" 'state':'active'," +
|
||||
" 'type':'NRT'," +
|
||||
" 'shard':'shard2'," +
|
||||
" 'collection':'utilizenodecoll'}}]}}}," +
|
||||
" 'nodeValues':{" +
|
||||
" '127.0.0.1:51076_solr':{" +
|
||||
" 'cores':1," +
|
||||
" 'freedisk':188.7262191772461}," +
|
||||
" '127.0.0.1:51077_solr':{" +
|
||||
" 'cores':2," +
|
||||
" 'freedisk':188.7262191772461}," +
|
||||
" '127.0.0.1:51097_solr':{" +
|
||||
" 'cores':0," +
|
||||
" 'freedisk':188.7262191772461}," +
|
||||
" '127.0.0.1:51075_solr':{" +
|
||||
" 'cores':1," +
|
||||
" 'freedisk':188.7262191772461}}," +
|
||||
" 'autoscalingJson':{" +
|
||||
" 'cluster-preferences':[" +
|
||||
" {" +
|
||||
" 'minimize':'cores'," +
|
||||
" 'precision':1}," +
|
||||
" {'maximize':'freedisk'}]" +
|
||||
" }}";
|
||||
Map jsonObj = (Map) Utils.fromJSONString(state);
|
||||
SolrCloudManager cloudManager = createCloudManager(jsonObj);
|
||||
Suggester suggester = createSuggester(cloudManager, jsonObj, null);
|
||||
int count = 0;
|
||||
while (count < 100) {
|
||||
CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion();
|
||||
if (op == null) break;
|
||||
count++;
|
||||
log.info("OP:{}", op.getParams());
|
||||
suggester = createSuggester(cloudManager, jsonObj, suggester);
|
||||
}
|
||||
|
||||
assertEquals("count = "+count ,1,count);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue