From 34a85014d5a09e07ddd48d735a8a96a57e92a44d Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Tue, 4 Sep 2018 17:05:26 +0530 Subject: [PATCH] SOLR-12715: NodeAddedTrigger should support adding replicas to new nodes by setting preferredOperation=addreplica This commit adds support for preferredOperation configuration parameter which defaults to movereplica. Changes ComputePlanAction to add all (collection,shard) pair as hints to AddReplicaSuggester when addreplica is selected as the preferred operation. --- solr/CHANGES.txt | 3 + .../AutoAddReplicasPlanAction.java | 2 +- .../cloud/autoscaling/ComputePlanAction.java | 44 ++++++- .../cloud/autoscaling/NodeAddedTrigger.java | 30 ++++- .../autoscaling/ComputePlanActionTest.java | 124 +++++++++++++++++- .../MetricTriggerIntegrationTest.java | 8 +- .../src/solrcloud-autoscaling-triggers.adoc | 36 ++++- .../solrj/request/CollectionAdminRequest.java | 4 + 8 files changed, 232 insertions(+), 19 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 1ba77adb78f..d8ca50bf687 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -195,6 +195,9 @@ New Features * SOLR-12629: The predict evaluator should work with the polyfit function (Joel Bernstein) +* SOLR-12715: NodeAddedTrigger should support adding replicas to new nodes by setting preferredOperation=addreplica. + (shalin) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java index 4189aa4329e..fdd34742799 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java @@ -33,7 +33,7 @@ import org.apache.solr.common.cloud.ZkStateReader; public class AutoAddReplicasPlanAction extends ComputePlanAction { @Override - protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) { + protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) throws IOException { // for backward compatibility ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider(); String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 923a27a2a87..3fd0d3496da 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud.autoscaling; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; @@ -37,9 +38,11 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.params.AutoScalingParams; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.SolrResourceLoader; import org.slf4j.Logger; @@ -89,8 +92,7 @@ public class ComputePlanAction extends TriggerActionBase { log.trace("-- state: {}", clusterState); } try { - Suggester initialSuggester = getSuggester(session, event, context, cloudManager); - Suggester suggester = initialSuggester; + Suggester suggester = getSuggester(session, event, context, cloudManager); int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState); int requestedOperations = getRequestedNumOps(event); if (requestedOperations > maxOperations) { @@ -197,12 +199,11 @@ public class ComputePlanAction extends TriggerActionBase { private static final String START = "__start__"; - protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) { + protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) throws IOException { Suggester suggester; switch (event.getEventType()) { case NODEADDED: - suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA) - .hint(Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES)); + suggester = getNodeAddedSuggester(cloudManager, session, event); break; case NODELOST: suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA) @@ -239,4 +240,37 @@ public class ComputePlanAction extends TriggerActionBase { } return suggester; } + + private Suggester getNodeAddedSuggester(SolrCloudManager cloudManager, Policy.Session session, TriggerEvent event) throws IOException { + String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower()); + CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp); + + Suggester suggester = session.getSuggester(action) + .hint(Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES)); + switch (action) { + case ADDREPLICA: + // add all collection/shard pairs and let policy engine figure out which one + // to place on the target node + // todo in future we can prune ineligible collection/shard pairs + ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState(); + Set> collShards = new HashSet<>(); + clusterState.getCollectionStates().forEach((collectionName, collectionRef) -> { + DocCollection docCollection = collectionRef.get(); + if (docCollection != null) { + docCollection.getActiveSlices().stream() + .map(slice -> new Pair<>(collectionName, slice.getName())) + .forEach(collShards::add); + } + }); + suggester.hint(Suggester.Hint.COLL_SHARD, collShards); + break; + case MOVEREPLICA: + case NONE: + break; + default: + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Unsupported preferredOperation=" + preferredOp + " for node added event"); + } + return suggester; + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java index ca28dc4efb8..62029448c97 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java @@ -24,17 +24,23 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.core.SolrResourceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP; + /** * Trigger for the {@link TriggerEventType#NODEADDED} event */ @@ -45,8 +51,11 @@ public class NodeAddedTrigger extends TriggerBase { private Map nodeNameVsTimeAdded = new HashMap<>(); + private String preferredOp; + public NodeAddedTrigger(String name) { super(TriggerEventType.NODEADDED, name); + TriggerUtils.validProperties(validProperties, PREFERRED_OP); } @Override @@ -71,7 +80,23 @@ public class NodeAddedTrigger extends TriggerBase { } catch (Exception e) { log.warn("Exception retrieving nodeLost markers", e); } + } + @Override + public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map properties) throws TriggerValidationException { + super.configure(loader, cloudManager, properties); + preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower()); + preferredOp = preferredOp.toLowerCase(Locale.ROOT); + // verify + CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp); + switch (action) { + case ADDREPLICA: + case MOVEREPLICA: + case NONE: + break; + default: + throw new TriggerValidationException("Unsupported preferredOperation=" + preferredOp + " specified for node added trigger"); + } } @Override @@ -158,7 +183,7 @@ public class NodeAddedTrigger extends TriggerBase { if (processor != null) { log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name, nodeNames, times, cloudManager.getTimeSource().getTimeNs()); - if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames))) { + if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp))) { // remove from tracking set only if the fire was accepted nodeNames.forEach(n -> { nodeNameVsTimeAdded.remove(n); @@ -195,11 +220,12 @@ public class NodeAddedTrigger extends TriggerBase { public static class NodeAddedEvent extends TriggerEvent { - public NodeAddedEvent(TriggerEventType eventType, String source, List times, List nodeNames) { + public NodeAddedEvent(TriggerEventType eventType, String source, List times, List nodeNames, String preferredOp) { // use the oldest time as the time of the event super(eventType, source, times.get(0), null); properties.put(NODE_NAMES, nodeNames); properties.put(EVENT_TIMES, times); + properties.put(PREFERRED_OP, preferredOp); } } } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java index 8791388f60f..cd56f429bc2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,8 +33,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.cloud.NodeStateProvider; -import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -44,9 +46,11 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.rule.ImplicitSnitch; +import org.apache.solr.common.params.AutoScalingParams; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.util.LogLevel; @@ -135,9 +139,15 @@ public class ComputePlanActionTest extends SolrCloudTestCase { deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + reset(); + } + + private void reset() { fired.set(false); triggerFiredLatch = new CountDownLatch(1); actionContextPropsRef.set(null); + eventRef.set(null); + AssertingTriggerAction.expectedNode = null; } private void deleteChildrenRecursively(String path) throws Exception { @@ -243,8 +253,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase { @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018 public void testNodeWithMultipleReplicasLost() throws Exception { - AssertingTriggerAction.expectedNode = null; - // start 3 more nodes cluster.startJettySolrRunner(); cluster.startJettySolrRunner(); @@ -318,7 +326,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase { @Test public void testNodeAdded() throws Exception { - AssertingTriggerAction.expectedNode = null; CloudSolrClient solrClient = cluster.getSolrClient(); String setTriggerCommand = "{" + "'set-trigger' : {" + @@ -421,8 +428,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase { //2018-06-18 (commented) @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018 public void testSelectedCollections() throws Exception { log.info("Found number of jetties: {}", cluster.getJettySolrRunners().size()); - AssertingTriggerAction.expectedNode = null; - // start 3 more nodes cluster.startJettySolrRunner(); cluster.startJettySolrRunner(); @@ -500,4 +505,111 @@ public class ComputePlanActionTest extends SolrCloudTestCase { assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action"))); assertFalse("not expected testSelected3", "testSelected3".equals(params.get("collection"))); } + + @Test + public void testNodeAddedTriggerWithAddReplicaPreferredOp_1Shard() throws Exception { + String collectionNamePrefix = "testNodeAddedTriggerWithAddReplicaPreferredOp_1Shard"; + int numShards = 1; + int numCollections = 5; + + nodeAddedTriggerWithAddReplicaPreferredOp(collectionNamePrefix, numShards, numCollections); + } + + @Test + public void testNodeAddedTriggerWithAddReplicaPreferredOp_2Shard() throws Exception { + String collectionNamePrefix = "testNodeAddedTriggerWithAddReplicaPreferredOp_2Shard"; + int numShards = 2; + int numCollections = 5; + + nodeAddedTriggerWithAddReplicaPreferredOp(collectionNamePrefix, numShards, numCollections); + } + + private void nodeAddedTriggerWithAddReplicaPreferredOp(String collectionNamePrefix, int numShards, int numCollections) throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'node_added_trigger'," + + "'event' : 'nodeAdded'," + + "'waitFor' : '1s'," + + "'enabled' : true," + + "'" + AutoScalingParams.PREFERRED_OP + "':'addreplica'," + + "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," + + "{'name':'test','class':'" + AssertingTriggerAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + // the default policy limits 1 replica per node, we need more right now + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<" + (1 + numCollections * numShards) + "', 'node':'#ANY'}," + + " {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," + + " {'nodeRole':'overseer', 'replica':0}" + + " ]" + + "}"; + req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionNamePrefix + "_0", + "conf", numShards, 1); + create.process(solrClient); + + waitForState("Timed out waiting for replicas of new collection to be active", + collectionNamePrefix + "_0", (liveNodes, collectionState) -> + collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes))); + + JettySolrRunner newNode = cluster.startJettySolrRunner(); + assertTrue(triggerFiredLatch.await(30, TimeUnit.SECONDS)); + assertTrue(fired.get()); + Map actionContext = actionContextPropsRef.get(); + List operations = (List) actionContext.get("operations"); + assertNotNull(operations); + assertEquals(numShards, operations.size()); + Set affectedShards = new HashSet<>(2); + for (Object operation : operations) { + assertTrue(operation instanceof CollectionAdminRequest.AddReplica); + CollectionAdminRequest.AddReplica addReplica = (CollectionAdminRequest.AddReplica) operation; + assertEquals(newNode.getNodeName(), addReplica.getNode()); + assertEquals(collectionNamePrefix + "_0", addReplica.getCollection()); + affectedShards.add(addReplica.getShard()); + } + assertEquals(numShards, affectedShards.size()); + + for (int i = 1; i < numCollections; i++) { + create = CollectionAdminRequest.createCollection(collectionNamePrefix + "_" + i, + "conf", numShards, 2); + create.process(solrClient); + + waitForState("Timed out waiting for replicas of new collection to be active", + collectionNamePrefix + "_" + i, (liveNodes, collectionState) -> + collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes))); + } + + reset(); + + newNode = cluster.startJettySolrRunner(); + assertTrue(triggerFiredLatch.await(30, TimeUnit.SECONDS)); + assertTrue(fired.get()); + actionContext = actionContextPropsRef.get(); + operations = (List) actionContext.get("operations"); + assertNotNull(operations); + assertEquals(numCollections * numShards, operations.size()); + Set affectedCollections = new HashSet<>(numCollections); + affectedShards = new HashSet<>(numShards); + Set> affectedCollShards = new HashSet<>(numCollections * numShards); + for (Object operation : operations) { + assertTrue(operation instanceof CollectionAdminRequest.AddReplica); + CollectionAdminRequest.AddReplica addReplica = (CollectionAdminRequest.AddReplica) operation; + assertEquals(newNode.getNodeName(), addReplica.getNode()); + affectedCollections.add(addReplica.getCollection()); + affectedShards.add(addReplica.getShard()); + affectedCollShards.add(new Pair<>(addReplica.getCollection(), addReplica.getShard())); + } + assertEquals(numCollections, affectedCollections.size()); + assertEquals(numShards, affectedShards.size()); + assertEquals(numCollections * numShards, affectedCollShards.size()); + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java index fb4a605261b..7131357b80f 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java @@ -179,11 +179,11 @@ public class MetricTriggerIntegrationTest extends SolrCloudTestCase { "'event' : 'metric'," + "'waitFor' : '" + waitForSeconds + "s'," + "'enabled' : true," + - "'metric': '" + tag + "'" + + "'metric': '" + tag + "'," + "'above' : 100.0," + - "'collection': '" + collectionName + "'" + - "'shard':'" + shardId + "'" + - "'preferredOperation':'addreplica'" + + "'collection': '" + collectionName + "'," + + "'shard':'" + shardId + "'," + + "'preferredOperation':'addreplica'," + "'actions' : [" + "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," + "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," + diff --git a/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc b/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc index 9ac1a34f008..e5906c055f9 100644 --- a/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc +++ b/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc @@ -31,7 +31,7 @@ Triggers execute on the node that runs `Overseer`. They are scheduled to run per == Event Types Currently the following event types (and corresponding trigger implementations) are defined: -* `nodeAdded`: generated when a new node joins the cluster +* `nodeAdded`: generated when a node joins the cluster * `nodeLost`: generated when a node leaves the cluster * `metric`: generated when the configured metric crosses a configured lower or upper threshold value * `indexSize`: generated when a shard size (defined as index size in bytes or number of documents) @@ -57,6 +57,40 @@ generated, which may significantly differ due to the rate limits set by `waitFor `properties`:: (map, optional) Any additional properties. Currently includes e.g., `nodeNames` property that indicates the nodes that were lost or added. +== Node Added Trigger + +The `NodeAddedTrigger` generates `nodeAdded` events when a node joins the cluster. It can be used to either move replicas +from other nodes to the new node or to add new replicas. + +Apart from the parameters described at <<#trigger-configuration, Trigger Configuration>>, this trigger supports the following configuration: + +`preferredOperation`:: (string, optional, defaults to `MOVEREPLICA`) The operation to be performed in response to an event generated by this trigger. By default, replicas will be moved from other nodes to the added node. The only other supported value is `ADDREPLICA` which adds more replicas of the existing collections on the new node. + +.Example: Node Added Trigger to move replicas to new node +[source,json] +---- +{ + "set-trigger": { + "name": "node_added_trigger", + "event": "nodeAdded", + "waitFor": "5s" + } +} +---- + +.Example: Node Added Trigger to add replicas on new node +[source,json] +---- +{ + "set-trigger": { + "name": "node_added_trigger", + "event": "nodeAdded", + "waitFor": "5s", + "preferredOperation": "addreplica" + } +} +---- + == Auto Add Replicas Trigger When a collection has the parameter `autoAddReplicas` set to true then a trigger configuration named `.auto_add_replicas` is automatically created to watch for nodes going away. This trigger produces `nodeLost` events, diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 50cd65df5a9..9667f3777dc 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -1708,6 +1708,10 @@ public abstract class CollectionAdminRequest return this; } + public String getShard() { + return shard; + } + @Override public SolrParams getParams() { ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());