From 23aee00213a2c48bd578bcf01a5ed435b0bdc881 Mon Sep 17 00:00:00 2001 From: noble Date: Fri, 2 Mar 2018 17:00:15 +1100 Subject: [PATCH] SOLR-12031: Refactor Policy framework to make simulated changes affect more than a single node SOLR-12050: UTILIZENODE does not enforce policy rules --- solr/CHANGES.txt | 5 + .../cloud/api/collections/UtilizeNodeCmd.java | 19 +- .../cloud/autoscaling/ComputePlanAction.java | 16 +- .../apache/solr/cloud/TestUtilizeNode.java | 9 +- .../autoscaling/AddReplicaSuggester.java | 30 +- .../client/solrj/cloud/autoscaling/Cell.java | 20 +- .../solrj/cloud/autoscaling/Clause.java | 5 +- .../autoscaling/MoveReplicaSuggester.java | 66 +-- .../solrj/cloud/autoscaling/Policy.java | 49 +- .../solrj/cloud/autoscaling/PolicyHelper.java | 21 + .../solrj/cloud/autoscaling/Preference.java | 38 +- .../solrj/cloud/autoscaling/ReplicaInfo.java | 1 - .../client/solrj/cloud/autoscaling/Row.java | 82 ++- .../solrj/cloud/autoscaling/Suggester.java | 47 +- .../solrj/cloud/autoscaling/Suggestion.java | 104 +++- .../solrj/cloud/autoscaling/Violation.java | 5 +- .../org/apache/solr/common/util/Pair.java | 5 + .../solrj/cloud/autoscaling/TestPolicy.java | 550 +++++++++++++++--- 18 files changed, 834 insertions(+), 238 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 6ddb6b3b10c..7da91f2d4a7 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -225,6 +225,9 @@ Bug Fixes * SOLR-10720: Aggressive removal of a collection breaks cluster status API. (Alexey Serba, shalin) +* SOLR-12050: UTILIZENODE does not enforce policy rules (hossman, noble) + + Optimizations ---------------------- @@ -327,6 +330,8 @@ Other Changes * SOLR-12028: BadApple and AwaitsFix annotations usage (Erick Erickson, Uwe Schindler) +* SOLR-12031: Refactor Policy framework to make simulated changes affect more than a single node (noble) + ================== 7.2.1 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/UtilizeNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/UtilizeNodeCmd.java index 60da61a3de6..818b16f9c24 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/UtilizeNodeCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/UtilizeNodeCmd.java @@ -85,18 +85,31 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd { } executeAll(requests); PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager()); - Policy.Session session = sessionWrapper.get(); + Policy.Session session = sessionWrapper.get(); + Suggester initialsuggester = session.getSuggester(MOVEREPLICA) + .hint(Suggester.Hint.TARGET_NODE, nodeName); + Suggester suggester = null; for (; ; ) { - Suggester suggester = session.getSuggester(MOVEREPLICA) + suggester = session.getSuggester(MOVEREPLICA) .hint(Suggester.Hint.TARGET_NODE, nodeName); - session = suggester.getSession(); SolrRequest request = suggester.getSuggestion(); + if (requests.size() > 10) { + log.info("too_many_suggestions"); + PolicyHelper.logState(ocmh.overseer.getSolrCloudManager(), initialsuggester); + break; + } + log.info("SUGGESTION: {}", request); if (request == null) break; + session = suggester.getSession(); requests.add(new ZkNodeProps(COLLECTION_PROP, request.getParams().get(COLLECTION_PROP), CollectionParams.TARGET_NODE, request.getParams().get(CollectionParams.TARGET_NODE), REPLICA_PROP, request.getParams().get(REPLICA_PROP), ASYNC, request.getParams().get(ASYNC))); } + log.info("total_suggestions: {}", requests.size()); + if (requests.size() == 0) { + PolicyHelper.logState(ocmh.overseer.getSolrCloudManager(), initialsuggester); + } sessionWrapper.returnSession(session); try { executeAll(requests); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 45b0ddf9f58..91990dba0de 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory; /** * This class is responsible for using the configured policy and preferences * with the hints provided by the trigger event to compute the required cluster operations. - * + *

* The cluster operations computed here are put into the {@link ActionContext}'s properties * with the key name "operations". The value is a List of SolrRequest objects. */ @@ -81,7 +81,8 @@ public class ComputePlanAction extends TriggerActionBase { log.trace("-- state: {}", clusterState); } try { - Suggester suggester = getSuggester(session, event, cloudManager); + Suggester intialSuggester = getSuggester(session, event, cloudManager); + Suggester suggester = intialSuggester; int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState); int requestedOperations = getRequestedNumOps(event); if (requestedOperations > maxOperations) { @@ -104,8 +105,15 @@ public class ComputePlanAction extends TriggerActionBase { // break on first null op // unless a specific number of ops was requested + // uncomment the following to log too many operations + /*if (opCount > 10) { + PolicyHelper.logState(cloudManager, intialSuggester); + }*/ + if (operation == null) { if (requestedOperations < 0) { + //uncomment the following to log zero operations +// PolicyHelper.logState(cloudManager, intialSuggester); break; } else { log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try."); @@ -150,7 +158,7 @@ public class ComputePlanAction extends TriggerActionBase { AtomicInteger totalRF = new AtomicInteger(); clusterState.forEachCollection(coll -> totalRF.addAndGet(coll.getReplicationFactor() * coll.getSlices().size())); int totalMax = clusterState.getLiveNodes().size() * totalRF.get() * 3; - int maxOp = (Integer)autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax); + int maxOp = (Integer) autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax); Object o = event.getProperty(AutoScalingParams.MAX_COMPUTE_OPERATIONS, maxOp); try { return Integer.parseInt(String.valueOf(o)); @@ -161,7 +169,7 @@ public class ComputePlanAction extends TriggerActionBase { } protected int getRequestedNumOps(TriggerEvent event) { - Collection ops = (Collection)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList()); + Collection ops = (Collection) event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList()); if (ops.isEmpty()) { return -1; } else { diff --git a/solr/core/src/test/org/apache/solr/cloud/TestUtilizeNode.java b/solr/core/src/test/org/apache/solr/cloud/TestUtilizeNode.java index d36c4371a61..bc64b6da6fd 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestUtilizeNode.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestUtilizeNode.java @@ -29,21 +29,22 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.util.NamedList; +import org.apache.solr.util.LogLevel; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest; +@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.impl.SolrClientDataProvider=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper=TRACE") public class TestUtilizeNode extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @BeforeClass public static void setupCluster() throws Exception { - configureCluster(4) + configureCluster(3) .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf")) .configure(); NamedList overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus()); @@ -71,7 +72,6 @@ public class TestUtilizeNode extends SolrCloudTestCase { } @Test - @AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12050") public void test() throws Exception { cluster.waitForAllNodes(5000); int REPLICATION = 2; @@ -79,7 +79,8 @@ public class TestUtilizeNode extends SolrCloudTestCase { CloudSolrClient cloudClient = cluster.getSolrClient(); log.info("Creating Collection..."); - CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION); + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION) + .setMaxShardsPerNode(2); cloudClient.request(create); log.info("Spinning up additional jettyX..."); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java index 3f96f3e5b10..918313654e8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java @@ -17,18 +17,18 @@ package org.apache.solr.client.solrj.cloud.autoscaling; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.common.MapWriter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.util.Pair; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; + class AddReplicaSuggester extends Suggester { SolrRequest init() { @@ -42,42 +42,38 @@ class AddReplicaSuggester extends Suggester { if (shards.isEmpty()) { throw new RuntimeException("add-replica requires 'collection' and 'shard'"); } - for (Pair shard : shards) { + for (Pair shard : shards) { Replica.Type type = Replica.Type.get((String) hints.get(Hint.REPLICATYPE)); - //iterate through elements and identify the least loaded + //iterate through elemenodesnts and identify the least loaded List leastSeriousViolation = null; - Integer targetNodeIndex = null; + Row bestNode = null; for (int i = getMatrix().size() - 1; i >= 0; i--) { Row row = getMatrix().get(i); - if (!isNodeSuitable(row)) continue; + if (!isNodeSuitableForReplicaAddition(row)) continue; Row tmpRow = row.addReplica(shard.first(), shard.second(), type); - - List errs = testChangedMatrix(strict, getModifiedMatrix(getMatrix(), tmpRow, i)); + List errs = testChangedMatrix(strict, tmpRow.session.matrix); if (!containsNewErrors(errs)) { if (isLessSerious(errs, leastSeriousViolation)) { leastSeriousViolation = errs; - targetNodeIndex = i; + bestNode = tmpRow; } } } - if (targetNodeIndex != null) {// there are no rule violations - getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(shard.first(), shard.second(), type)); + if (bestNode != null) {// there are no rule violations + this.session = bestNode.session; return CollectionAdminRequest .addReplicaToShard(shard.first(), shard.second()) .setType(type) - .setNode(getMatrix().get(targetNodeIndex).node); + .setNode(bestNode.node); } } return null; } - @Override - public void writeMap(MapWriter.EntryWriter ew) throws IOException { - ew.put("action", CollectionParams.CollectionAction.ADDREPLICA.toString()); - super.writeMap(ew); + public CollectionParams.CollectionAction getAction() { + return ADDREPLICA; } - } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Cell.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Cell.java index 0ac2b3660f8..0fa2db2a0ee 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Cell.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Cell.java @@ -23,28 +23,32 @@ import java.util.HashMap; import org.apache.solr.common.MapWriter; import org.apache.solr.common.util.Utils; +/**Each instance represents an attribute that is being tracked by the framework such as , freedisk, cores etc + * + */ public class Cell implements MapWriter { final int index; + final Suggestion.ConditionType type; final String name; Object val, approxVal; + Row row; - public Cell(int index, String name, Object val) { - this.index = index; - this.name = name; - this.val = val; - } - - public Cell(int index, String name, Object val, Object approxVal) { + public Cell(int index, String name, Object val, Object approxVal, Suggestion.ConditionType type, Row row) { this.index = index; this.name = name; this.val = val; this.approxVal = approxVal; + this.type = type; + this.row = row; } @Override public void writeMap(EntryWriter ew) throws IOException { ew.put(name, val); } + public Row getRow(){ + return row; + } @Override public String toString() { @@ -52,7 +56,7 @@ public class Cell implements MapWriter { } public Cell copy() { - return new Cell(index, name, val, approxVal); + return new Cell(index, name, val, approxVal, this.type, row); } public String getName() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java index b62aa56373e..92854fd94ab 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java @@ -151,19 +151,20 @@ public class Clause implements MapWriter, Comparable { class Condition { final String name; final Object val; + final Suggestion.ConditionType varType; final Operand op; Condition(String name, Object val, Operand op) { this.name = name; this.val = val; this.op = op; + varType = Suggestion.getTagType(name); } boolean isPass(Object inputVal) { if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(type); - Suggestion.ConditionType validator = Suggestion.getTagType(name); - if (validator == Suggestion.ConditionType.LAZY) { // we don't know the type + if (varType == Suggestion.ConditionType.LAZY) { // we don't know the type return op.match(parseString(val), parseString(inputVal)) == PASS; } else { return op.match(val, validate(name, inputVal, false)) == PASS; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java index d5918e570ce..c0521cd5d98 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java @@ -17,7 +17,6 @@ package org.apache.solr.client.solrj.cloud.autoscaling; -import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -26,6 +25,8 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.util.Pair; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA; + public class MoveReplicaSuggester extends Suggester { @Override @@ -38,50 +39,50 @@ public class MoveReplicaSuggester extends Suggester { SolrRequest tryEachNode(boolean strict) { //iterate through elements and identify the least loaded List leastSeriousViolation = null; - Integer targetNodeIndex = null; - Integer sourceNodeIndex = null; + Row bestSrcRow = null; + Row bestTargetRow = null; ReplicaInfo sourceReplicaInfo = null; List> validReplicas = getValidReplicas(true, true, -1); validReplicas.sort(leaderLast); - for (Pair fromReplica : validReplicas) { + for (int i1 = 0; i1 < validReplicas.size(); i1++) { + Pair fromReplica = validReplicas.get(i1); Row fromRow = fromReplica.second(); - ReplicaInfo replicaInfo = fromReplica.first(); - String coll = replicaInfo.getCollection(); - String shard = replicaInfo.getShard(); - Pair pair = fromRow.removeReplica(coll, shard, replicaInfo.getType()); - Row srcTmpRow = pair.first(); - if (srcTmpRow == null) { - //no such replica available - continue; - } - - final int i = getMatrix().indexOf(fromRow); + ReplicaInfo ri = fromReplica.first(); + if (ri == null) continue; + final int i = session.indexOf(fromRow.node); int stopAt = force ? 0 : i; - for (int j = getMatrix().size() - 1; j >= stopAt; j--) { - if (j == i) continue; - Row targetRow = getMatrix().get(j); - if (!isNodeSuitable(targetRow)) continue; - targetRow = targetRow.addReplica(coll, shard, replicaInfo.getType()); - List errs = testChangedMatrix(strict, getModifiedMatrix(getModifiedMatrix(getMatrix(), srcTmpRow, i), targetRow, j)); - if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation) && - (force || Policy.compareRows(srcTmpRow, targetRow, session.getPolicy()) < 1)) { + Row targetRow = null; + for (int j = session.matrix.size() - 1; j >= stopAt; j--) { + targetRow = session.matrix.get(j); + if (targetRow.node.equals(fromRow.node)) continue; + if (!isNodeSuitableForReplicaAddition(targetRow)) continue; + targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType());//add replica to target first + Pair pair = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node + if (pair == null) continue;//should not happen + Row srcRowModified = pair.first();//this is the final state of the source row and session + List errs = testChangedMatrix(strict, srcRowModified.session.matrix); + srcRowModified.session.applyRules();// now resort the nodes with the new values + Policy.Session tmpSession = srcRowModified.session; + if (!containsNewErrors(errs) && + isLessSerious(errs, leastSeriousViolation) && + (force || (tmpSession.indexOf(srcRowModified.node) < tmpSession.indexOf(targetRow.node)))) { leastSeriousViolation = errs; - targetNodeIndex = j; - sourceNodeIndex = i; - sourceReplicaInfo = replicaInfo; + bestSrcRow = srcRowModified; + sourceReplicaInfo = ri; + bestTargetRow = targetRow; } } } - if (targetNodeIndex != null && sourceNodeIndex != null) { - getMatrix().set(sourceNodeIndex, getMatrix().get(sourceNodeIndex).removeReplica(sourceReplicaInfo.getCollection(), sourceReplicaInfo.getShard(), sourceReplicaInfo.getType()).first()); - getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(sourceReplicaInfo.getCollection(), sourceReplicaInfo.getShard(), sourceReplicaInfo.getType())); + if (bestSrcRow != null) { + this.session = bestSrcRow.session; return new CollectionAdminRequest.MoveReplica( sourceReplicaInfo.getCollection(), sourceReplicaInfo.getName(), - getMatrix().get(targetNodeIndex).node); + bestTargetRow.node); } return null; } + static Comparator> leaderLast = (r1, r2) -> { if (r1.first().isLeader) return 1; if (r2.first().isLeader) return -1; @@ -90,8 +91,7 @@ public class MoveReplicaSuggester extends Suggester { @Override - public void writeMap(EntryWriter ew) throws IOException { - ew.put("action", CollectionParams.CollectionAction.MOVEREPLICA.toString()); - super.writeMap(ew); + public CollectionParams.CollectionAction getAction() { + return MOVEREPLICA; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java index f2e101659ee..e1f4f9274df 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java @@ -41,6 +41,7 @@ import org.apache.solr.common.MapWriter; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.params.CollectionParams.CollectionAction; +import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; import org.slf4j.Logger; @@ -77,7 +78,7 @@ public class Policy implements MapWriter { final Map> policies; final List clusterPolicy; final List clusterPreferences; - final List params; + final List> params; final List perReplicaAttributes; public Policy() { @@ -111,13 +112,16 @@ public class Policy implements MapWriter { .collect(collectingAndThen(toList(), Collections::unmodifiableList)); this.policies = Collections.unmodifiableMap( - policiesFromMap((Map>>)jsonMap.getOrDefault(POLICIES, emptyMap()), newParams)); - this.params = Collections.unmodifiableList(newParams); + policiesFromMap((Map>>) jsonMap.getOrDefault(POLICIES, emptyMap()), newParams)); + this.params = Collections.unmodifiableList(newParams.stream() + .map(s -> new Pair<>(s, Suggestion.getTagType(s))) + .collect(toList())); perReplicaAttributes = readPerReplicaAttrs(); } + private List readPerReplicaAttrs() { return this.params.stream() - .map(Suggestion.tagVsPerReplicaVal::get) + .map(s -> Suggestion.tagVsPerReplicaVal.get(s.first())) .filter(Objects::nonNull) .collect(Collectors.toList()); } @@ -126,7 +130,11 @@ public class Policy implements MapWriter { this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap(); this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList(); this.clusterPreferences = clusterPreferences != null ? Collections.unmodifiableList(clusterPreferences) : DEFAULT_PREFERENCES; - this.params = Collections.unmodifiableList(buildParams(this.clusterPreferences, this.clusterPolicy, this.policies)); + this.params = Collections.unmodifiableList( + buildParams(this.clusterPreferences, this.clusterPolicy, this.policies).stream() + .map(s -> new Pair<>(s, Suggestion.getTagType(s))) + .collect(toList()) + ); perReplicaAttributes = readPerReplicaAttrs(); } @@ -207,9 +215,9 @@ public class Policy implements MapWriter { } /*This stores the logical state of the system, given a policy and - * a cluster state. - * - */ + * a cluster state. + * + */ public class Session implements MapWriter { final List nodes; final SolrCloudManager cloudManager; @@ -228,6 +236,7 @@ public class Policy implements MapWriter { this.expandedClauses = expandedClauses; this.znodeVersion = znodeVersion; this.nodeStateProvider = nodeStateProvider; + for (Row row : matrix) row.session = this; } @@ -259,7 +268,7 @@ public class Policy implements MapWriter { Collections.sort(expandedClauses); matrix = new ArrayList<>(nodes.size()); - for (String node : nodes) matrix.add(new Row(node, params, perReplicaAttributes,this)); + for (String node : nodes) matrix.add(new Row(node, params, perReplicaAttributes, this)); applyRules(); } @@ -269,7 +278,6 @@ public class Policy implements MapWriter { List perCollPolicy = policies.get(p); if (perCollPolicy == null) { return; -// throw new RuntimeException(StrUtils.formatString("Policy for collection {0} is {1} . It does not exist", c, p)); } } expandedClauses.addAll(mergePolicies(c, policies.getOrDefault(p, emptyList()), clusterPolicy)); @@ -279,9 +287,14 @@ public class Policy implements MapWriter { return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider); } + public Row getNode(String node) { + for (Row row : matrix) if (row.node.equals(node)) return row; + return null; + } + List getMatrixCopy() { return matrix.stream() - .map(Row::copy) + .map(row -> row.copy(this)) .collect(Collectors.toList()); } @@ -303,7 +316,6 @@ public class Policy implements MapWriter { } - public List getViolations() { return violations; } @@ -336,6 +348,11 @@ public class Policy implements MapWriter { public NodeStateProvider getNodeStateProvider() { return nodeStateProvider; } + + public int indexOf(String node) { + for (int i = 0; i < matrix.size(); i++) if (matrix.get(i).node.equals(node)) return i; + throw new RuntimeException("NO such node found " + node); + } } static void setApproxValuesAndSortNodes(List clusterPreferences, List matrix) { @@ -367,7 +384,7 @@ public class Policy implements MapWriter { public enum SortParam { freedisk(0, Integer.MAX_VALUE), cores(0, Integer.MAX_VALUE), heapUsage(0, Integer.MAX_VALUE), sysLoadAvg(0, 100); - public final int min,max; + public final int min, max; SortParam(int min, int max) { this.min = min; @@ -416,8 +433,8 @@ public class Policy implements MapWriter { } public static List mergePolicies(String coll, - List collPolicy, - List globalPolicy) { + List collPolicy, + List globalPolicy) { List merged = insertColl(coll, collPolicy); List global = insertColl(coll, globalPolicy); @@ -455,7 +472,7 @@ public class Policy implements MapWriter { } public List getParams() { - return params; + return params.stream().map(Pair::first).collect(toList()); } /** diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java index a67a4faf9e7..35073de05de 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java @@ -223,6 +223,27 @@ public class PolicyHelper { return suggestionCtx.getSuggestions(); } + + /**Use this to dump the state of a system and to generate a testcase + */ + public static void logState(SolrCloudManager cloudManager, Suggester suggester) { + if(log.isTraceEnabled()) { + log.trace("LOGSTATE: {}", + Utils.toJSONString((MapWriter) ew -> { + ew.put("liveNodes", cloudManager.getClusterStateProvider().getLiveNodes()); + ew.put("suggester", suggester); + if (suggester.session.nodeStateProvider instanceof MapWriter) { + MapWriter nodeStateProvider = (MapWriter) suggester.session.nodeStateProvider; + nodeStateProvider.writeMap(ew); + } + try { + ew.put("autoscalingJson", cloudManager.getDistribStateManager().getAutoScalingConfig()); + } catch (InterruptedException e) { + } + })); + } + } + public enum Status { NULL, //it is just created and not yet used or all operations on it has been competed fully diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java index 0142107ec97..3e73632bf39 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java @@ -44,7 +44,7 @@ public class Preference implements MapWriter { public Preference(Map m, int idx) { this.idx = idx; - this.original = Utils.getDeepCopy(m,3); + this.original = Utils.getDeepCopy(m, 3); sort = Policy.Sort.get(m); name = Policy.SortParam.get(m.get(sort.name()).toString()); Object p = m.getOrDefault("precision", 0); @@ -52,9 +52,9 @@ public class Preference implements MapWriter { if (precision < 0) { throw new RuntimeException("precision must be a positive value "); } - if(precision< name.min || precision> name.max){ + if (precision < name.min || precision > name.max) { throw new RuntimeException(StrUtils.formatString("invalid precision value {0} , must lie between {1} and {2}", - precision, name.min, name.max ) ); + precision, name.min, name.max)); } } @@ -70,11 +70,22 @@ public class Preference implements MapWriter { Object o2 = useApprox ? r2.cells[idx].approxVal : r2.cells[idx].val; int result = 0; if (o1 instanceof Long && o2 instanceof Long) result = ((Long) o1).compareTo((Long) o2); - else if (o1 instanceof Double && o2 instanceof Double) result = ((Double) o1).compareTo((Double) o2); - else if (!o1.getClass().getName().equals(o2.getClass().getName())) { + else if (o1 instanceof Double && o2 instanceof Double) { + result = compareWithTolerance((Double) o1, (Double) o2, useApprox ? 1 : 1); + } else if (!o1.getClass().getName().equals(o2.getClass().getName())) { throw new RuntimeException("Unable to compare " + o1 + " of type: " + o1.getClass().getName() + " from " + r1.cells[idx].toString() + " and " + o2 + " of type: " + o2.getClass().getName() + " from " + r2.cells[idx].toString()); } - return result == 0 ? (next == null ? 0 : next.compare(r1, r2, useApprox)) : sort.sortval * result; + return result == 0 ? + (next == null ? 0 : + next.compare(r1, r2, useApprox)) : sort.sortval * result; + } + + private int compareWithTolerance(Double o1, Double o2, int percentage) { + if (percentage == 0) return o1.compareTo(o2); + if (o1.equals(o2)) return 0; + double delta = Math.abs(o1 - o2); + if ((100 * delta / o1) < percentage) return 0; + return o1.compareTo(o2); } //sets the new value according to precision in val_ @@ -84,10 +95,17 @@ public class Preference implements MapWriter { if (!row.isLive) { continue; } - prevVal = row.cells[idx].approxVal = - (prevVal == null || Double.compare(Math.abs(((Number) prevVal).doubleValue() - ((Number) row.cells[idx].val).doubleValue()), precision) > 0) ? - row.cells[idx].val : - prevVal; + if (prevVal == null) {//this is the first + prevVal = row.cells[idx].approxVal = row.cells[idx].val; + } else { + double prevD = ((Number) prevVal).doubleValue(); + double currD = ((Number) row.cells[idx].val).doubleValue(); + if (Math.abs(prevD - currD) >= precision) { + prevVal = row.cells[idx].approxVal = row.cells[idx].val; + } else { + prevVal = row.cells[idx].approxVal = prevVal; + } + } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java index cfcd95637bd..8c1fba3f22b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java @@ -32,7 +32,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP; public class ReplicaInfo implements MapWriter { -// private final Replica replica; private final String name; private String core, collection, shard; private Replica.Type type; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java index 69f81e64e84..659f315c120 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.solr.common.IteratorWriter; import org.apache.solr.common.MapWriter; @@ -35,37 +36,44 @@ import org.apache.solr.common.util.Utils; import static org.apache.solr.common.params.CoreAdminParams.NODE; - +/** + * Each instance represents a node in the cluster + */ public class Row implements MapWriter { public final String node; final Cell[] cells; + //this holds the details of each replica in the node public Map>> collectionVsShardVsReplicas; boolean anyValueMissing = false; boolean isLive = true; + Policy.Session session; - public Row(String node, List params, List perReplicaAttributes, Policy.Session session) { + public Row(String node, List> params, List perReplicaAttributes, Policy.Session session) { + this.session = session; collectionVsShardVsReplicas = session.nodeStateProvider.getReplicaInfo(node, perReplicaAttributes); if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>(); this.node = node; cells = new Cell[params.size()]; isLive = session.cloudManager.getClusterStateProvider().getLiveNodes().contains(node); - Map vals = isLive ? session.nodeStateProvider.getNodeValues(node, params) : Collections.emptyMap(); + List paramNames = params.stream().map(Pair::first).collect(Collectors.toList()); + Map vals = isLive ? session.nodeStateProvider.getNodeValues(node, paramNames) : Collections.emptyMap(); for (int i = 0; i < params.size(); i++) { - String s = params.get(i); - cells[i] = new Cell(i, s, Clause.validate(s,vals.get(s), false)); - if (NODE.equals(s)) cells[i].val = node; + Pair pair = params.get(i); + cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this); + if (NODE.equals(pair.first())) cells[i].val = node; if (cells[i].val == null) anyValueMissing = true; } } public Row(String node, Cell[] cells, boolean anyValueMissing, Map>> collectionVsShardVsReplicas, boolean isLive) { + Map>> collectionVsShardVsReplicas, boolean isLive, Policy.Session session) { + this.session = session; this.node = node; this.isLive = isLive; this.cells = new Cell[cells.length]; for (int i = 0; i < this.cells.length; i++) { this.cells[i] = cells[i].copy(); - + this.cells[i].row = this; } this.anyValueMissing = anyValueMissing; this.collectionVsShardVsReplicas = collectionVsShardVsReplicas; @@ -79,8 +87,8 @@ public class Row implements MapWriter { }); } - Row copy() { - return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), isLive); + Row copy(Policy.Session session) { + return new Row(node, cells, anyValueMissing, Utils.getDeepCopy(collectionVsShardVsReplicas, 3), isLive, session); } Object getVal(String name) { @@ -101,25 +109,51 @@ public class Row implements MapWriter { return node; } - // this adds a replica to the replica info + /** + * this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica , + * values of certain attributes will be modified, in this node as well as other nodes. Please note that + * the state of the current session is kept intact while this operation is being performed + * + * @param coll collection name + * @param shard shard name + * @param type replica type + */ public Row addReplica(String coll, String shard, Replica.Type type) { - Row row = copy(); + Row row = session.copy().getNode(this.node); + if (row == null) throw new RuntimeException("couldn't get a row"); Map> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>()); List replicas = c.computeIfAbsent(shard, k -> new ArrayList<>()); String replicaname = "" + new Random().nextInt(1000) + 1000; - replicas.add(new ReplicaInfo(replicaname, replicaname, coll, shard, type, this.node, - Collections.singletonMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString()))); + ReplicaInfo ri = new ReplicaInfo(replicaname, replicaname, coll, shard, type, this.node, + Utils.makeMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString())); + replicas.add(ri); for (Cell cell : row.cells) { - if (cell.name.equals("cores")) { - cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1; - } + cell.type.projectAddReplica(cell, ri); } return row; - } + + public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) { + Map> c = collectionVsShardVsReplicas.get(coll); + if (c == null) return null; + List r = c.get(shard); + if (r == null) return null; + int idx = -1; + for (int i = 0; i < r.size(); i++) { + ReplicaInfo info = r.get(i); + if (type == null || info.getType() == type) { + idx = i; + break; + } + } + if (idx == -1) return null; + return r.get(idx); + } + + // this simulates removing a replica from a node public Pair removeReplica(String coll, String shard, Replica.Type type) { - Row row = copy(); + Row row = session.copy().getNode(this.node); Map> c = row.collectionVsShardVsReplicas.get(coll); if (c == null) return null; List r = c.get(shard); @@ -132,14 +166,12 @@ public class Row implements MapWriter { break; } } - if(idx == -1) return null; - + if (idx == -1) return null; + ReplicaInfo removed = r.remove(idx); for (Cell cell : row.cells) { - if (cell.name.equals("cores")) { - cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1; - } + cell.type.projectRemoveReplica(cell, removed); } - return new Pair(row, r.remove(idx)); + return new Pair(row, removed); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java index 57007681ad6..56e1d880bb4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java @@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.impl.ClusterStateProvider; import org.apache.solr.common.MapWriter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.rule.ImplicitSnitch; +import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; @@ -63,7 +64,7 @@ public abstract class Suggester implements MapWriter { public Suggester hint(Hint hint, Object value) { hint.validator.accept(value); if (hint.multiValued) { - Collection values = value instanceof Collection ? (Collection)value : Collections.singletonList(value); + Collection values = value instanceof Collection ? (Collection) value : Collections.singletonList(value); ((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values); } else { hints.put(hint, value == null ? null : String.valueOf(value)); @@ -71,6 +72,10 @@ public abstract class Suggester implements MapWriter { return this; } + public CollectionParams.CollectionAction getAction() { + return null; + } + /** * Normally, only less loaded nodes are used for moving replicas. If this is a violation and a MOVE must be performed, * set the flag to true. @@ -80,7 +85,7 @@ public abstract class Suggester implements MapWriter { return this; } - protected boolean isNodeSuitable(Row row) { + protected boolean isNodeSuitableForReplicaAddition(Row row) { if (!row.isLive) return false; if (!isAllowed(row.node, Hint.TARGET_NODE)) return false; if (!isAllowed(row.getVal(ImplicitSnitch.DISK), Hint.MINFREEDISK)) return false; @@ -115,7 +120,7 @@ public abstract class Suggester implements MapWriter { if (srcNodes != null && !srcNodes.isEmpty()) { // the source node is dead so live nodes may not have it for (String srcNode : srcNodes) { - if(session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) + if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) session.matrix.add(new Row(srcNode, session.getPolicy().params, session.getPolicy().perReplicaAttributes, session)); } } @@ -185,7 +190,7 @@ public abstract class Suggester implements MapWriter { boolean containsNewErrors(List violations) { for (Violation v : violations) { int idx = originalViolations.indexOf(v); - if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true; + if (idx < 0 /*|| originalViolations.get(idx).isLessSerious(v)*/) return true; } return false; } @@ -210,14 +215,14 @@ public abstract class Suggester implements MapWriter { if (!isAllowed(e.getKey(), Hint.COLL)) continue; for (Map.Entry> shard : e.getValue().entrySet()) { if (!isAllowed(new Pair<>(e.getKey(), shard.getKey()), Hint.COLL_SHARD)) continue;//todo fix - if(shard.getValue() == null || shard.getValue().isEmpty()) continue; + if (shard.getValue() == null || shard.getValue().isEmpty()) continue; replicaList.add(new Pair<>(shard.getValue().get(0), r)); } } } List testChangedMatrix(boolean strict, List rows) { - Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences,rows); + Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, rows); List errors = new ArrayList<>(); for (Clause clause : session.expandedClauses) { if (strict || clause.strict) { @@ -230,11 +235,6 @@ public abstract class Suggester implements MapWriter { return errors; } - ArrayList getModifiedMatrix(List matrix, Row tmpRow, int i) { - ArrayList copy = new ArrayList<>(matrix); - copy.set(i, tmpRow); - return copy; - } protected boolean isAllowed(Object v, Hint hint) { Object hintVal = hints.get(hint); @@ -263,7 +263,16 @@ public abstract class Suggester implements MapWriter { } } - }), + }) { + @Override + public Object parse(Object v) { + if (v instanceof Map) { + Map map = (Map) v; + return Pair.parse(map); + } + return super.parse(v); + } + }, SRC_NODE(true), TARGET_NODE(true), REPLICATYPE(false, o -> { @@ -277,7 +286,7 @@ public abstract class Suggester implements MapWriter { }, hintValVsActual -> { Double hintFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.first(), false); Double actualFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.second(), false); - if(actualFreediskInGb == null) return false; + if (actualFreediskInGb == null) return false; return actualFreediskInGb > hintFreediskInGb; }); @@ -304,6 +313,17 @@ public abstract class Suggester implements MapWriter { this.valueValidator = testval; } + public static Hint get(String s) { + for (Hint hint : values()) { + if (hint.name().equals(s)) return hint; + } + return null; + } + + public Object parse(Object v) { + return v; + } + } @@ -316,6 +336,7 @@ public abstract class Suggester implements MapWriter { @Override public void writeMap(EntryWriter ew) throws IOException { + ew.put("action", String.valueOf(getAction())); ew.put("hints", (MapWriter) ew1 -> hints.forEach((hint, o) -> ew1.putNoEx(hint.toString(), o))); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java index b29fb382424..0c9013ed1ef 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java @@ -42,6 +42,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO public class Suggestion { public static final String coreidxsize = "INDEX.sizeInBytes"; + static final Map validatetypes = new HashMap<>(); public static ConditionType getTagType(String name) { @@ -85,7 +86,7 @@ public class Suggestion { SolrRequest op = suggester.getSuggestion(); if (op != null) { session = suggester.getSession(); - suggestions.add(new Suggester.SuggestionInfo( violation, + suggestions.add(new Suggester.SuggestionInfo(violation, ((V2RequestSupport) op.setUseV2(true)).getV2Request())); } return op; @@ -107,17 +108,20 @@ public class Suggestion { .filter(tag -> tag.perReplicaValue != null) .collect(Collectors.toMap(tag -> tag.tagName, tag -> tag.perReplicaValue)); + /** + * Type details of each variable in policies + */ public enum ConditionType { COLL("collection", String.class, null, null, null), SHARD("shard", String.class, null, null, null), REPLICA("replica", Long.class, null, 0L, null), - PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L) , + PORT(ImplicitSnitch.PORT, Long.class, null, 1L, 65535L), IP_1("ip_1", Long.class, null, 0L, 255L), IP_2("ip_2", Long.class, null, 0L, 255L), IP_3("ip_3", Long.class, null, 0L, 255L), IP_4("ip_4", Long.class, null, 0L, 255L), - FREEDISK(ImplicitSnitch.DISK, Double.class, null, 0d, Double.MAX_VALUE, coreidxsize) { + FREEDISK(ImplicitSnitch.DISK, Double.class, null, 0d, Double.MAX_VALUE, coreidxsize, Boolean.TRUE) { @Override public Object convertVal(Object val) { Number value = (Number) super.validate(ImplicitSnitch.DISK, val, false); @@ -127,12 +131,19 @@ public class Suggestion { return value; } + @Override + public int compareViolation(Violation v1, Violation v2) { + return Long.compare( + v1.getViolatingReplicas().stream().mapToLong(v -> v.delta == null? 0 :v.delta).max().orElse(0l), + v2.getViolatingReplicas().stream().mapToLong(v3 -> v3.delta == null? 0 : v3.delta).max().orElse(0l)); + } + @Override public void getSuggestions(SuggestionCtx ctx) { if (ctx.violation == null) return; if (ctx.violation.replicaCountDelta < 0 && !ctx.violation.getViolatingReplicas().isEmpty()) { - Comparator rowComparator = Comparator.comparing(r -> ((Long) r.getVal(ImplicitSnitch.DISK, 0l))); + Comparator rowComparator = Comparator.comparing(r -> ((Double) r.getVal(ImplicitSnitch.DISK, 0d))); List matchingNodes = ctx.session.matrix.stream().filter( row -> ctx.violation.getViolatingReplicas() .stream() @@ -141,29 +152,59 @@ public class Suggestion { .collect(Collectors.toList()); - for (Row r : matchingNodes) { + for (Row node : matchingNodes) { //lets try to start moving the smallest cores off of the node ArrayList replicas = new ArrayList<>(); - r.forEachReplica(replicas::add); + node.forEachReplica(replicas::add); replicas.sort((r1, r2) -> { Long s1 = Clause.parseLong(ConditionType.CORE_IDX.tagName, r1.getVariables().get(ConditionType.CORE_IDX.tagName)); Long s2 = Clause.parseLong(ConditionType.CORE_IDX.tagName, r2.getVariables().get(ConditionType.CORE_IDX.tagName)); if (s1 != null && s2 != null) return s1.compareTo(s2); return 0; }); - long currentDelta = ctx.violation.getClause().tag.delta(r.getVal(ImplicitSnitch.DISK)); + long currentDelta = ctx.violation.getClause().tag.delta(node.getVal(ImplicitSnitch.DISK)); for (ReplicaInfo replica : replicas) { if (currentDelta <= 0) break; if (replica.getVariables().get(ConditionType.CORE_IDX.tagName) == null) continue; Suggester suggester = ctx.session.getSuggester(MOVEREPLICA) .hint(Suggester.Hint.COLL_SHARD, new Pair<>(replica.getCollection(), replica.getShard())) - .hint(Suggester.Hint.SRC_NODE, r.node); + .hint(Suggester.Hint.SRC_NODE, node.node) + .forceOperation(true); if (ctx.addSuggestion(suggester) == null) break; - currentDelta -= Clause.parseLong(ConditionType.CORE_IDX.tagName, replica.getVariables().get(ConditionType.CORE_IDX.tagName)); + currentDelta -= Clause.parseLong(ConditionType.CORE_IDX.tagName, replica.getVariable(ConditionType.CORE_IDX.tagName)); } } } } + + //When a replica is added, freedisk should be incremented + @Override + public void projectAddReplica(Cell cell, ReplicaInfo ri) { + //go through other replicas of this shard and copy the index size value into this + for (Row row : cell.getRow().session.matrix) { + row.forEachReplica(replicaInfo -> { + if (ri != replicaInfo && + ri.getCollection().equals(replicaInfo.getCollection()) && + ri.getShard().equals(replicaInfo.getShard()) && + ri.getVariable(CORE_IDX.tagName) == null && + replicaInfo.getVariable(CORE_IDX.tagName) != null) { + ri.getVariables().put(CORE_IDX.tagName, validate(CORE_IDX.tagName, replicaInfo.getVariable(CORE_IDX.tagName), false)); + } + }); + } + Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false); + if (idxSize == null) return; + Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val; + cell.val = currFreeDisk - idxSize; + } + + @Override + public void projectRemoveReplica(Cell cell, ReplicaInfo ri) { + Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false); + if (idxSize == null) return; + Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val; + cell.val = currFreeDisk + idxSize; + } }, CORE_IDX(coreidxsize, Double.class, null, 0d, Double.MAX_VALUE) { @Override @@ -172,7 +213,7 @@ public class Suggestion { } }, NODE_ROLE(ImplicitSnitch.NODEROLE, String.class, Collections.singleton("overseer"), null, null), - CORES(ImplicitSnitch.CORES, Long.class, null, 0L, Long.MAX_VALUE) { + CORES(ImplicitSnitch.CORES, Long.class, null, 0L, Long.MAX_VALUE, null, Boolean.TRUE) { @Override public void addViolatingReplicas(ViolationCtx ctx) { for (Row r : ctx.allRows) { @@ -194,12 +235,22 @@ public class Suggestion { ctx.addSuggestion(suggester); } } + } + @Override + public void projectAddReplica(Cell cell, ReplicaInfo ri) { + cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1; + } + + @Override + public void projectRemoveReplica(Cell cell, ReplicaInfo ri) { + cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1; } }, - SYSLOADAVG(ImplicitSnitch.SYSLOADAVG, Double.class, null, 0d, 100d), - HEAPUSAGE(ImplicitSnitch.HEAPUSAGE, Double.class, null, 0d, null), - NUMBER("NUMBER", Long.class, null, 0L, Long.MAX_VALUE), + SYSLOADAVG(ImplicitSnitch.SYSLOADAVG, Double.class, null, 0d, 100d, null, Boolean.TRUE), + HEAPUSAGE(ImplicitSnitch.HEAPUSAGE, Double.class, null, 0d, null, null, Boolean.TRUE), + NUMBER("NUMBER", Long.class, null, 0L, Long.MAX_VALUE, null, Boolean.TRUE), + STRING("STRING", String.class, null, null, null), NODE("node", String.class, null, null, null) { @Override @@ -228,7 +279,8 @@ public class Suggestion { perNodeSuggestions(ctx); } }, - DISKTYPE(ImplicitSnitch.DISKTYPE, String.class, unmodifiableSet(new HashSet(Arrays.asList("ssd", "rotational"))), null, null, null) { + DISKTYPE(ImplicitSnitch.DISKTYPE, String.class, + unmodifiableSet(new HashSet(Arrays.asList("ssd", "rotational"))), null, null, null, null) { @Override public void getSuggestions(SuggestionCtx ctx) { perNodeSuggestions(ctx); @@ -239,21 +291,24 @@ public class Suggestion { final Set vals; final Number min; final Number max; + final Boolean additive; public final String tagName; public final String perReplicaValue; ConditionType(String tagName, Class type, Set vals, Number min, Number max) { - this(tagName, type, vals, min, max, null); + this(tagName, type, vals, min, max, null, null); } - ConditionType(String tagName, Class type, Set vals, Number min, Number max, String perReplicaValue) { + ConditionType(String tagName, Class type, Set vals, Number min, Number max, String perReplicaValue, + Boolean additive) { this.tagName = tagName; this.type = type; this.vals = vals; this.min = min; this.max = max; this.perReplicaValue = perReplicaValue; + this.additive = additive; } public void getSuggestions(SuggestionCtx ctx) { @@ -265,7 +320,7 @@ public class Suggestion { row.forEachReplica(replica -> { if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return; if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return; - if(!ctx.currentViolation.matchShard(replica.getShard())) return; + if (!ctx.currentViolation.matchShard(replica.getShard())) return; if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard)) return; ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name)))); @@ -311,6 +366,21 @@ public class Suggestion { } } + + /** + * Simulate a replica addition to a node in the cluster + */ + public void projectAddReplica(Cell cell, ReplicaInfo ri) { + } + + public void projectRemoveReplica(Cell cell, ReplicaInfo ri) { + } + + public int compareViolation(Violation v1, Violation v2) { + if (v2.replicaCountDelta == null || v1.replicaCountDelta == null) return 0; + if (Math.abs(v1.replicaCountDelta) == Math.abs(v2.replicaCountDelta)) return 0; + return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1; + } } private static void perNodeSuggestions(SuggestionCtx ctx) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java index 105dea04d6f..bb5aa6fc7b1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java @@ -91,8 +91,7 @@ public class Violation implements MapWriter { } //if the delta is lower , this violation is less serious public boolean isLessSerious(Violation that) { - return that.replicaCountDelta != null && replicaCountDelta != null && - Math.abs(replicaCountDelta) < Math.abs(that.replicaCountDelta); + return this.getClause().tag.varType.compareViolation(this,that) <0 ; } @Override @@ -102,7 +101,7 @@ public class Violation implements MapWriter { return Objects.equals(this.shard, v.shard) && Objects.equals(this.coll, v.coll) && Objects.equals(this.node, v.node) && - Objects.equals(this.tagKey, v.tagKey) + Objects.equals(this.clause, v.clause) ; } return false; diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Pair.java b/solr/solrj/src/java/org/apache/solr/common/util/Pair.java index b51edd04d21..74f5b2d6ff7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/Pair.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/Pair.java @@ -18,6 +18,7 @@ package org.apache.solr.common.util; import java.io.IOException; import java.io.Serializable; +import java.util.Map; import java.util.Objects; import org.apache.solr.common.MapWriter; @@ -65,4 +66,8 @@ public class Pair implements Serializable, MapWriter { ew.put("second", second); } + public static Pair parse(Map m) { + return new Pair(m.get("first"), m.get("second")); + } + } \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java index 2e509bb9632..56f3f92b74e 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java @@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ObjectCache; import org.apache.solr.common.util.Pair; @@ -62,6 +63,26 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.MO public class TestPolicy extends SolrTestCaseJ4 { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private Suggester createSuggester(SolrCloudManager cloudManager, Map jsonObj, Suggester seed) throws IOException, InterruptedException { + Policy.Session session = null; + if (seed != null) session = seed.session; + else { + session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager); + } + + Map m = (Map) jsonObj.get("suggester"); + Suggester result = session.getSuggester(CollectionParams.CollectionAction.get((String) m.get("action"))); + m = (Map) m.get("hints"); + m.forEach((k, v) -> { + Hint hint = Hint.get(k.toString()); + result.hint(hint, hint.parse(v)); + }); + return result; + } + + private SolrCloudManager createCloudManager(Map jsonObj) { + return cloudManagerWithData(jsonObj); + } public static String clusterState = "{'gettingstarted':{" + " 'router':{'name':'compositeId'}," + @@ -129,52 +150,52 @@ public class TestPolicy extends SolrTestCaseJ4 { } public void testValidate() { - expectError("replica", -1, "must be greater than" ); - expectError("replica","hello", "not a valid number" ); - assertEquals( 1l, Clause.validate("replica", "1", true)); - assertEquals("c", Clause.validate("collection", "c", true)); - assertEquals( "s", Clause.validate("shard", "s",true)); - assertEquals( "overseer", Clause.validate("nodeRole", "overseer",true)); + expectError("replica", -1, "must be greater than"); + expectError("replica", "hello", "not a valid number"); + assertEquals(1l, Clause.validate("replica", "1", true)); + assertEquals("c", Clause.validate("collection", "c", true)); + assertEquals("s", Clause.validate("shard", "s", true)); + assertEquals("overseer", Clause.validate("nodeRole", "overseer", true)); - expectError("nodeRole", "wrong","must be one of"); + expectError("nodeRole", "wrong", "must be one of"); - expectError("sysLoadAvg", "101","must be less than "); - expectError("sysLoadAvg", 101,"must be less than "); - expectError("sysLoadAvg", "-1","must be greater than"); - expectError("sysLoadAvg", -1,"must be greater than"); + expectError("sysLoadAvg", "101", "must be less than "); + expectError("sysLoadAvg", 101, "must be less than "); + expectError("sysLoadAvg", "-1", "must be greater than"); + expectError("sysLoadAvg", -1, "must be greater than"); - assertEquals(12.46d,Clause.validate("sysLoadAvg", "12.46",true)); - assertEquals(12.46,Clause.validate("sysLoadAvg", 12.46d,true)); + assertEquals(12.46d, Clause.validate("sysLoadAvg", "12.46", true)); + assertEquals(12.46, Clause.validate("sysLoadAvg", 12.46d, true)); - expectError("ip_1", "300","must be less than "); - expectError("ip_1", 300,"must be less than "); - expectError("ip_1", "-1","must be greater than"); - expectError("ip_1", -1,"must be greater than"); + expectError("ip_1", "300", "must be less than "); + expectError("ip_1", 300, "must be less than "); + expectError("ip_1", "-1", "must be greater than"); + expectError("ip_1", -1, "must be greater than"); - assertEquals(1l,Clause.validate("ip_1", "1",true)); + assertEquals(1l, Clause.validate("ip_1", "1", true)); - expectError("heapUsage", "-1","must be greater than"); - expectError("heapUsage", -1,"must be greater than"); - assertEquals(69.9d,Clause.validate("heapUsage", "69.9",true)); - assertEquals(69.9d,Clause.validate("heapUsage", 69.9d,true)); + expectError("heapUsage", "-1", "must be greater than"); + expectError("heapUsage", -1, "must be greater than"); + assertEquals(69.9d, Clause.validate("heapUsage", "69.9", true)); + assertEquals(69.9d, Clause.validate("heapUsage", 69.9d, true)); - expectError("port", "70000","must be less than "); - expectError("port", 70000,"must be less than "); - expectError("port", "0","must be greater than"); - expectError("port", 0,"must be greater than"); + expectError("port", "70000", "must be less than "); + expectError("port", 70000, "must be less than "); + expectError("port", "0", "must be greater than"); + expectError("port", 0, "must be greater than"); - expectError("cores", "-1","must be greater than"); + expectError("cores", "-1", "must be greater than"); } - private static void expectError(String name, Object val, String msg){ + private static void expectError(String name, Object val, String msg) { try { - Clause.validate(name, val,true); - fail("expected exception containing "+msg); + Clause.validate(name, val, true); + fail("expected exception containing " + msg); } catch (Exception e) { - assertTrue("expected exception containing "+msg,e.getMessage().contains(msg)); + assertTrue("expected exception containing " + msg, e.getMessage().contains(msg)); } } @@ -282,7 +303,7 @@ public class TestPolicy extends SolrTestCaseJ4 { Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata)); SolrRequest op = session.getSuggester(MOVEREPLICA).hint(Hint.SRC_NODE, "127.0.0.1:65427_solr").getSuggestion(); assertNotNull(op); - assertEquals( "127.0.0.1:65434_solr",op.getParams().get("targetNode") ); + assertEquals("127.0.0.1:65434_solr", op.getParams().get("targetNode")); } public void testNodeLostMultipleReplica() { @@ -419,7 +440,10 @@ public class TestPolicy extends SolrTestCaseJ4 { } private static SolrCloudManager cloudManagerWithData(String data) { - final Map m = (Map) Utils.fromJSONString(data); + return cloudManagerWithData((Map) Utils.fromJSONString(data)); + } + + private static SolrCloudManager cloudManagerWithData(Map m) { Map replicaInfo = (Map) m.get("replicaInfo"); replicaInfo.forEach((node, val) -> { Map m1 = (Map) val; @@ -433,15 +457,26 @@ public class TestPolicy extends SolrTestCaseJ4 { String name = m3.keySet().iterator().next().toString(); m3 = (Map) m3.get(name); Replica.Type type = Replica.Type.get((String) m3.get("type")); - l3.set(i, new ReplicaInfo(name,name + l3.set(i, new ReplicaInfo(name, name , coll.toString(), shard.toString(), type, (String) node, m3)); } }); }); - }); + AutoScalingConfig asc = m.containsKey("autoscalingJson") ? new AutoScalingConfig((Map) m.get("autoscalingJson")) : null; return new DelegatingCloudManager(null) { + + @Override + public DistribStateManager getDistribStateManager() { + return new DelegatingDistribStateManager(null) { + @Override + public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException { + return asc; + } + }; + } + @Override public ClusterStateProvider getClusterStateProvider() { return new DelegatingClusterStateProvider(null) { @@ -497,8 +532,8 @@ public class TestPolicy extends SolrTestCaseJ4 { .hint(Hint.REPLICATYPE, Replica.Type.PULL); SolrRequest op = suggester.getSuggestion(); assertNotNull(op); - assertEquals(Replica.Type.PULL.name(), op.getParams().get("type")); - assertEquals("PULL type node must be in 'slowdisk' node","node1", op.getParams().get("node")); + assertEquals(Replica.Type.PULL.name(), op.getParams().get("type")); + assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node")); suggester = suggester.getSession() .getSuggester(ADDREPLICA) @@ -506,8 +541,8 @@ public class TestPolicy extends SolrTestCaseJ4 { .hint(Hint.REPLICATYPE, Replica.Type.PULL); op = suggester.getSuggestion(); assertNotNull(op); - assertEquals(Replica.Type.PULL.name(), op.getParams().get("type")); - assertEquals("PULL type node must be in 'slowdisk' node","node1", op.getParams().get("node")); + assertEquals(Replica.Type.PULL.name(), op.getParams().get("type")); + assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node")); suggester = suggester.getSession() .getSuggester(ADDREPLICA) @@ -515,8 +550,8 @@ public class TestPolicy extends SolrTestCaseJ4 { .hint(Hint.REPLICATYPE, Replica.Type.TLOG); op = suggester.getSuggestion(); assertNotNull(op); - assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type")); - assertEquals("TLOG type node must be in 'ssd' node","node3", op.getParams().get("node")); + assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type")); + assertEquals("TLOG type node must be in 'ssd' node", "node3", op.getParams().get("node")); suggester = suggester.getSession() .getSuggester(ADDREPLICA) @@ -524,15 +559,15 @@ public class TestPolicy extends SolrTestCaseJ4 { .hint(Hint.REPLICATYPE, Replica.Type.TLOG); op = suggester.getSuggestion(); assertNotNull(op); - assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type")); - assertEquals("TLOG type node must be in 'ssd' node","node3", op.getParams().get("node")); + assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type")); + assertEquals("TLOG type node must be in 'ssd' node", "node3", op.getParams().get("node")); suggester = suggester.getSession() .getSuggester(ADDREPLICA) .hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2")) .hint(Hint.REPLICATYPE, Replica.Type.TLOG); op = suggester.getSuggestion(); - assertNull("No node should qualify for this" ,op); + assertNull("No node should qualify for this", op); } @@ -669,7 +704,7 @@ public class TestPolicy extends SolrTestCaseJ4 { Map policies = (Map) Utils.fromJSONString("{" + " 'cluster-preferences': [" + " { 'maximize': 'freedisk', 'precision': 50}," + - " { 'minimize': 'cores', 'precision': 50}" + + " { 'minimize': 'cores', 'precision': 1}" + " ]," + " 'cluster-policy': [" + " { 'replica': 0, 'nodeRole': 'overseer'}" + @@ -698,16 +733,16 @@ public class TestPolicy extends SolrTestCaseJ4 { int countNewColl2Op = 0; while ((op = suggester.getSuggestion()) != null) { countOp++; + assertEquals(Replica.Type.PULL.name(), op.getParams().get("type")); + String collection = op.getParams().get("collection"); + assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2")); + if (collection.equals("newColl")) countNewCollOp++; + else countNewColl2Op++; + assertEquals("PULL type node must be in 'slowdisk' node", "node1", op.getParams().get("node")); suggester = suggester.getSession().getSuggester(ADDREPLICA) .hint(Hint.REPLICATYPE, Replica.Type.PULL) .hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1")) .hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard1")); - assertEquals(Replica.Type.PULL.name(), op.getParams().get("type")); - String collection = op.getParams().get("collection"); - assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2")); - if (collection.equals("newColl")) countNewCollOp++; - else countNewColl2Op++; - assertEquals("PULL type node must be in 'slowdisk' node","node1", op.getParams().get("node")); } assertEquals(2, countOp); assertEquals(1, countNewCollOp); @@ -723,17 +758,17 @@ public class TestPolicy extends SolrTestCaseJ4 { .hint(Hint.REPLICATYPE, Replica.Type.TLOG); while ((op = suggester.getSuggestion()) != null) { countOp++; + assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type")); + String collection = op.getParams().get("collection"); + assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2")); + if (collection.equals("newColl")) countNewCollOp++; + else countNewColl2Op++; + assertEquals("TLOG type node must be in 'ssd' node", "node3", op.getParams().get("node")); suggester = suggester.getSession() .getSuggester(ADDREPLICA) .hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2")) .hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard2")) .hint(Hint.REPLICATYPE, Replica.Type.TLOG); - assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type")); - String collection = op.getParams().get("collection"); - assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2")); - if (collection.equals("newColl")) countNewCollOp++; - else countNewColl2Op++; - assertEquals("TLOG type node must be in 'ssd' node","node3", op.getParams().get("node")); } assertEquals(3, countOp); assertEquals(1, countNewCollOp); @@ -741,9 +776,44 @@ public class TestPolicy extends SolrTestCaseJ4 { } public void testRow() { - Row row = new Row("nodex", new Cell[]{new Cell(0, "node", "nodex")}, false, new HashMap<>(), true); + Policy policy = new Policy(); + Policy.Session session = policy.createSession(new DelegatingCloudManager(null) { + @Override + public NodeStateProvider getNodeStateProvider() { + return new DelegatingNodeStateProvider(null) { + @Override + public Map>> getReplicaInfo(String node, Collection keys) { + Map>> o = (Map>>) Utils.fromJSONString("{c1: {s0:[{}]}}"); + Utils.setObjectByPath(o, "c1/s0[0]", new ReplicaInfo("r0", "c1.s0", "c1", "s0", Replica.Type.NRT, "nodex", new HashMap<>())); + return o; + } + + @Override + public Map getNodeValues(String node, Collection tags) { + return Utils.makeMap("node", "nodex", "cores", 1); + } + }; + } + + @Override + public ClusterStateProvider getClusterStateProvider() { + return new DelegatingClusterStateProvider(null) { + @Override + public String getPolicyNameByCollection(String coll) { + return null; + } + + @Override + public Set getLiveNodes() { + return Collections.singleton("nodex"); + } + }; + } + }); + + Row row = session.getNode("nodex"); Row r1 = row.addReplica("c1", "s1", null); - Row r2 = r1.addReplica("c1", "s1",null); + Row r2 = r1.addReplica("c1", "s1", null); assertEquals(1, r1.collectionVsShardVsReplicas.get("c1").get("s1").size()); assertEquals(2, r2.collectionVsShardVsReplicas.get("c1").get("s1").size()); assertTrue(r2.collectionVsShardVsReplicas.get("c1").get("s1").get(0) instanceof ReplicaInfo); @@ -831,7 +901,7 @@ public class TestPolicy extends SolrTestCaseJ4 { assertTrue(violations.stream().anyMatch(violation -> (violation.getClause().replica.getOperand() == Operand.LESS_THAN && "node".equals(violation.getClause().tag.getName())))); Suggester suggester = session.getSuggester(ADDREPLICA) - .hint(Hint.COLL_SHARD, new Pair<>("gettingstarted","r1")); + .hint(Hint.COLL_SHARD, new Pair<>("gettingstarted", "r1")); SolrParams operation = suggester.getSuggestion().getParams(); assertEquals("node2", operation.get("node")); @@ -974,7 +1044,7 @@ public class TestPolicy extends SolrTestCaseJ4 { for (int i = 0; i < 3; i++) { Suggester suggester = session.getSuggester(ADDREPLICA); SolrRequest op = suggester - .hint(Hint.COLL_SHARD, new Pair<>("newColl","shard1")) + .hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1")) .getSuggestion(); assertNotNull(op); assertEquals("node3", op.getParams().get("node")); @@ -1085,7 +1155,7 @@ public class TestPolicy extends SolrTestCaseJ4 { Suggester suggester = session.getSuggester(MOVEREPLICA) .hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr"); SolrRequest op = suggester.getSuggestion(); - assertNotNull(op); + assertNotNull("expect a non null operation", op); } public void testOtherTag() { @@ -1229,8 +1299,8 @@ public class TestPolicy extends SolrTestCaseJ4 { }; } - public void testEmptyClusterState(){ - String autoScaleJson = " {'policies':{'c1':[{" + + public void testEmptyClusterState() { + String autoScaleJson = " {'policies':{'c1':[{" + " 'replica':1," + " 'shard':'#EACH'," + " 'port':'50096'}]}}"; @@ -1247,7 +1317,7 @@ public class TestPolicy extends SolrTestCaseJ4 { return new DelegatingClusterStateProvider(null) { @Override public Set getLiveNodes() { - return new HashSet<>(Arrays.asList( "127.0.0.1:50097_solr", "127.0.0.1:50096_solr")); + return new HashSet<>(Arrays.asList("127.0.0.1:50097_solr", "127.0.0.1:50096_solr")); } }; } @@ -1270,10 +1340,10 @@ public class TestPolicy extends SolrTestCaseJ4 { } }; List locations = PolicyHelper.getReplicaLocations( - "newColl", new AutoScalingConfig((Map)Utils.fromJSONString(autoScaleJson)), + "newColl", new AutoScalingConfig((Map) Utils.fromJSONString(autoScaleJson)), dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, 0, 0, null); - assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr")) ); + assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr"))); } public void testMultiReplicaPlacement() { @@ -1333,11 +1403,11 @@ public class TestPolicy extends SolrTestCaseJ4 { }; List locations = PolicyHelper.getReplicaLocations( "newColl", new AutoScalingConfig((Map) Utils.fromJSONString(autoScaleJson)), - cloudManager, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3,0,0, null); - assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2", "node1", "node3").contains(it.node)) ); + cloudManager, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3, 0, 0, null); + assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2", "node1", "node3").contains(it.node))); } - public void testMoveReplicaSuggester(){ + public void testMoveReplicaSuggester() { String dataproviderdata = "{" + " 'liveNodes':[" + " '10.0.0.6:7574_solr'," + @@ -1402,7 +1472,7 @@ public class TestPolicy extends SolrTestCaseJ4 { .hint(Hint.TARGET_NODE, "127.0.0.1:51147_solr"); SolrRequest op = suggester.getSuggestion(); log.info("" + op); - assertNotNull(op); + assertNotNull("operation expected ", op); } public void testReplicaCountSuggestions() { @@ -1440,7 +1510,6 @@ public class TestPolicy extends SolrTestCaseJ4 { assertEquals("core_node2", Utils.getObjectByPath(m, true, "operation/command/move-replica/replica")); } - // @Ignore public void testFreeDiskSuggestions() { String dataproviderdata = "{" + " liveNodes:[node1,node2]," + @@ -1457,8 +1526,6 @@ public class TestPolicy extends SolrTestCaseJ4 { String autoScalingjson = " { cluster-policy:[" + -// " { cores :'<10', node :'#ANY'}," + -// " { replica :'<2', shard:'#EACH' node:'#ANY'}," + " { replica :'0', freedisk:'<1000'}," + " { nodeRole : overseer, replica :0}]," + " cluster-preferences :[{ minimize : cores, precision : 2 }]}"; @@ -1559,7 +1626,7 @@ public class TestPolicy extends SolrTestCaseJ4 { List suggestions = PolicyHelper.getSuggestions(cfg, cloudManagerWithData(dataproviderdata)); assertEquals(2, suggestions.size()); for (Suggester.SuggestionInfo suggestion : suggestions) { - Utils.getObjectByPath(suggestion ,true, "operation/move-replica/targetNode"); + Utils.getObjectByPath(suggestion, true, "operation/move-replica/targetNode"); } } @@ -1651,18 +1718,18 @@ public class TestPolicy extends SolrTestCaseJ4 { " { nodeRole:overseer,replica:0}]}"; Policy policy = new Policy((Map) Utils.fromJSONString(autoScalingjson)); Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata)); - Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA) + Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA) .hint(Hint.COLL_SHARD, new Pair<>("coll1", "shard1")) .hint(Hint.MINFREEDISK, 150); CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) suggester.getSuggestion(); - assertEquals("127.0.0.1:51078_solr" , op.getNode()); + assertEquals("127.0.0.1:51078_solr", op.getNode()); - suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA) + suggester = session.getSuggester(CollectionAction.ADDREPLICA) .hint(Hint.COLL_SHARD, new Pair<>("coll1", "shard1")); op = (CollectionAdminRequest.AddReplica) suggester.getSuggestion(); - assertEquals("127.0.0.1:51147_solr" , op.getNode()); + assertEquals("127.0.0.1:51147_solr", op.getNode()); } public void testDiskSpaceReqd() { @@ -1744,14 +1811,15 @@ public class TestPolicy extends SolrTestCaseJ4 { cloudManager, null, Arrays.asList("shard1", "shard2"), 1, 0, 0, null); assertTrue(locations.stream().allMatch(it -> "node3".equals(it.node))); } - public void testMoveReplicaLeaderlast(){ - List> validReplicas = new ArrayList<>(); + public void testMoveReplicaLeaderlast() { + + List> validReplicas = new ArrayList<>(); Replica replica = new Replica("r1", Utils.makeMap("leader", "true")); ReplicaInfo replicaInfo = new ReplicaInfo("c1", "s1", replica, new HashMap<>()); validReplicas.add(new Pair<>(replicaInfo, null)); - replicaInfo = new ReplicaInfo("r4", "c1_s2_r1","c1", "s2", Replica.Type.NRT, "n1", Collections.singletonMap("leader", "true")); + replicaInfo = new ReplicaInfo("r4", "c1_s2_r1", "c1", "s2", Replica.Type.NRT, "n1", Collections.singletonMap("leader", "true")); validReplicas.add(new Pair<>(replicaInfo, null)); @@ -1772,4 +1840,322 @@ public class TestPolicy extends SolrTestCaseJ4 { } + public void testScheduledTriggerFailure() throws Exception { + String state = "{" + + " 'liveNodes': [" + + " '127.0.0.1:49221_solr'," + + " '127.0.0.1:49210_solr'" + + " ]," + + " 'suggester': {" + + " 'action': 'MOVEREPLICA'," + + " 'hints': {}" + + " }," + + " 'replicaInfo': {" + + " '127.0.0.1:49210_solr': {" + + " 'testScheduledTrigger': {" + + " 'shard1': [" + + " {" + + " 'core_node3': {" + + " 'base_url': 'http://127.0.0.1:49210/solr'," + + " 'node_name': '127.0.0.1:49210_solr'," + + " 'core': 'testScheduledTrigger_shard1_replica_n1'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'shard': 'shard1'," + + " 'collection': 'testScheduledTrigger'" + + " }" + + " }," + + " {" + + " 'core_node6': {" + + " 'base_url': 'http://127.0.0.1:49210/solr'," + + " 'node_name': '127.0.0.1:49210_solr'," + + " 'core': 'testScheduledTrigger_shard1_replica_n4'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'shard': 'shard1'," + + " 'collection': 'testScheduledTrigger'" + + " }" + + " }" + + " ]" + + " }" + + " }," + + " '127.0.0.1:49221_solr': {" + + " 'testScheduledTrigger': {" + + " 'shard1': [" + + " {" + + " 'core_node5': {" + + " 'core': 'testScheduledTrigger_shard1_replica_n2'," + + " 'leader': 'true'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'base_url': 'http://127.0.0.1:49221/solr'," + + " 'node_name': '127.0.0.1:49221_solr'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'shard': 'shard1'," + + " 'collection': 'testScheduledTrigger'" + + " }" + + " }" + + " ]" + + " }" + + " }" + + " }," + + " 'nodeValues': {" + + " '127.0.0.1:49210_solr': {" + + " 'node': '127.0.0.1:49210_solr'," + + " 'cores': 2," + + " 'freedisk': 197.39717864990234" + + " }," + + " '127.0.0.1:49221_solr': {" + + " 'node': '127.0.0.1:49221_solr'," + + " 'cores': 1," + + " 'freedisk': 197.39717864990234" + + " }" + + " }," + + " 'autoscalingJson': {" + + " 'cluster-preferences': [" + + " {" + + " 'minimize': 'cores'," + + " 'precision': 1" + + " }," + + " {" + + " 'maximize': 'freedisk'" + + " }" + + " ]," + + " 'cluster-policy': [" + + " {" + + " 'cores': '<3'," + + " 'node': '#EACH'" + + " }" + + " ]" + + " }" + + "}"; + Map jsonObj = (Map) Utils.fromJSONString(state); + SolrCloudManager cloudManager = createCloudManager(jsonObj); + Suggester suggester = createSuggester(cloudManager, jsonObj, null); + int count = 0; + while (count < 10) { + CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion(); + if (op == null) break; + count++; + log.info("OP:{}", op.getParams()); + suggester = createSuggester(cloudManager, jsonObj, suggester); + } + + assertEquals(0, count); + } + + public void testUtilizeNodeFailure() throws Exception { + String state = "{'liveNodes': ['127.0.0.1:50417_solr', '127.0.0.1:50418_solr', '127.0.0.1:50419_solr', '127.0.0.1:50420_solr', '127.0.0.1:50443_solr']," + + " 'suggester': {" + + " 'action': 'MOVEREPLICA'," + + " 'hints': {'TARGET_NODE': ['127.0.0.1:50443_solr']}" + + " }," + + " 'replicaInfo': {" + + " '127.0.0.1:50418_solr': {" + + " 'utilizenodecoll': {" + + " 'shard2': [" + + " {" + + " 'core_node7': {" + + " 'core': 'utilizenodecoll_shard2_replica_n4'," + + " 'leader': 'true'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'base_url': 'http://127.0.0.1:50418/solr'," + + " 'node_name': '127.0.0.1:50418_solr'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'shard': 'shard2'," + + " 'collection': 'utilizenodecoll'" + + " }" + + " }" + + " ]" + + " }" + + " }," + + " '127.0.0.1:50417_solr': {" + + " 'utilizenodecoll': {" + + " 'shard2': [" + + " {" + + " 'core_node8': {" + + " 'base_url': 'http://127.0.0.1:50417/solr'," + + " 'node_name': '127.0.0.1:50417_solr'," + + " 'core': 'utilizenodecoll_shard2_replica_n6'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'shard': 'shard2'," + + " 'collection': 'utilizenodecoll'" + + " }" + + " }" + + " ]" + + " }" + + " }," + + " '127.0.0.1:50419_solr': {" + + " 'utilizenodecoll': {" + + " 'shard1': [" + + " {" + + " 'core_node5': {" + + " 'base_url': 'http://127.0.0.1:50419/solr'," + + " 'node_name': '127.0.0.1:50419_solr'," + + " 'core': 'utilizenodecoll_shard1_replica_n2'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'shard': 'shard1'," + + " 'collection': 'utilizenodecoll'" + + " }" + + " }" + + " ]" + + " }" + + " }," + + " '127.0.0.1:50420_solr': {" + + " 'utilizenodecoll': {" + + " 'shard1': [" + + " {" + + " 'core_node3': {" + + " 'core': 'utilizenodecoll_shard1_replica_n1'," + + " 'leader': 'true'," + + " 'INDEX.sizeInBytes': 6.426125764846802E-8," + + " 'base_url': 'http://127.0.0.1:50420/solr'," + + " 'node_name': '127.0.0.1:50420_solr'," + + " 'state': 'active'," + + " 'type': 'NRT'," + + " 'shard': 'shard1'," + + " 'collection': 'utilizenodecoll'" + + " }" + + " }" + + " ]" + + " }" + + " }," + + " '127.0.0.1:50443_solr': {}" + + " }," + + " 'nodeValues': {" + + " '127.0.0.1:50418_solr': {" + + " 'cores': 1," + + " 'freedisk': 187.70782089233398" + + " }," + + " '127.0.0.1:50417_solr': {" + + " 'cores': 1," + + " 'freedisk': 187.70782089233398" + + " }," + + " '127.0.0.1:50419_solr': {" + + " 'cores': 1," + + " 'freedisk': 187.70782089233398" + + " }," + + " '127.0.0.1:50420_solr': {" + + " 'cores': 1," + + " 'freedisk': 187.70782089233398" + + " }," + + " '127.0.0.1:50443_solr': {" + + " 'cores': 0," + + " 'freedisk': 187.70782089233398" + + " }" + + " }," + + " 'autoscalingJson': {" + + " 'cluster-preferences': [" + + " {'minimize': 'cores', 'precision': 1}," + + " {'maximize': 'freedisk'}" + + " ]" + + " }" + + "}"; + Map jsonObj = (Map) Utils.fromJSONString(state); + SolrCloudManager cloudManager = createCloudManager(jsonObj); + Suggester suggester = createSuggester(cloudManager, jsonObj, null); + int count = 0; + while (count < 100) { + CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion(); + if (op == null) break; + count++; + log.info("OP:{}", op.getParams()); + suggester = createSuggester(cloudManager, jsonObj, suggester); + } + + assertEquals("count = "+count ,0,count); + } +public void testUtilizeNodeFailure2() throws Exception { + String state = "{ 'liveNodes':[" + + " '127.0.0.1:51075_solr'," + + " '127.0.0.1:51076_solr'," + + " '127.0.0.1:51077_solr'," + + " '127.0.0.1:51097_solr']," + + " 'suggester':{" + + " 'action':'MOVEREPLICA'," + + " 'hints':{'TARGET_NODE':['127.0.0.1:51097_solr']}}," + + " 'replicaInfo':{" + + " '127.0.0.1:51076_solr':{'utilizenodecoll':{'shard1':[{'core_node5':{" + + " 'base_url':'https://127.0.0.1:51076/solr'," + + " 'node_name':'127.0.0.1:51076_solr'," + + " 'core':'utilizenodecoll_shard1_replica_n2'," + + " 'state':'active'," + + " 'type':'NRT'," + + " 'INDEX.sizeInBytes':6.426125764846802E-8," + + " 'shard':'shard1'," + + " 'collection':'utilizenodecoll'}}]}}," + + " '127.0.0.1:51077_solr':{'utilizenodecoll':{" + + " 'shard2':[{'core_node8':{" + + " 'base_url':'https://127.0.0.1:51077/solr'," + + " 'node_name':'127.0.0.1:51077_solr'," + + " 'core':'utilizenodecoll_shard2_replica_n6'," + + " 'state':'active'," + + " 'type':'NRT'," + + " 'INDEX.sizeInBytes':6.426125764846802E-8," + + " 'shard':'shard2'," + + " 'collection':'utilizenodecoll'}}]," + + " 'shard1':[{'core_node3':{" + + " 'core':'utilizenodecoll_shard1_replica_n1'," + + " 'leader':'true'," + + " 'INDEX.sizeInBytes':6.426125764846802E-8," + + " 'base_url':'https://127.0.0.1:51077/solr'," + + " 'node_name':'127.0.0.1:51077_solr'," + + " 'state':'active'," + + " 'type':'NRT'," + + " 'shard':'shard1'," + + " 'collection':'utilizenodecoll'}}]}}," + + " '127.0.0.1:51097_solr':{}," + + " '127.0.0.1:51075_solr':{'utilizenodecoll':{'shard2':[{'core_node7':{" + + " 'core':'utilizenodecoll_shard2_replica_n4'," + + " 'leader':'true'," + + " 'INDEX.sizeInBytes':6.426125764846802E-8," + + " 'base_url':'https://127.0.0.1:51075/solr'," + + " 'node_name':'127.0.0.1:51075_solr'," + + " 'state':'active'," + + " 'type':'NRT'," + + " 'shard':'shard2'," + + " 'collection':'utilizenodecoll'}}]}}}," + + " 'nodeValues':{" + + " '127.0.0.1:51076_solr':{" + + " 'cores':1," + + " 'freedisk':188.7262191772461}," + + " '127.0.0.1:51077_solr':{" + + " 'cores':2," + + " 'freedisk':188.7262191772461}," + + " '127.0.0.1:51097_solr':{" + + " 'cores':0," + + " 'freedisk':188.7262191772461}," + + " '127.0.0.1:51075_solr':{" + + " 'cores':1," + + " 'freedisk':188.7262191772461}}," + + " 'autoscalingJson':{" + + " 'cluster-preferences':[" + + " {" + + " 'minimize':'cores'," + + " 'precision':1}," + + " {'maximize':'freedisk'}]" + + " }}"; + Map jsonObj = (Map) Utils.fromJSONString(state); + SolrCloudManager cloudManager = createCloudManager(jsonObj); + Suggester suggester = createSuggester(cloudManager, jsonObj, null); + int count = 0; + while (count < 100) { + CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion(); + if (op == null) break; + count++; + log.info("OP:{}", op.getParams()); + suggester = createSuggester(cloudManager, jsonObj, suggester); + } + + assertEquals("count = "+count ,1,count); + } + + }