SOLR-12715: NodeAddedTrigger should support adding replicas to new nodes by setting preferredOperation=addreplica

This commit adds support for preferredOperation configuration parameter which defaults to movereplica. Changes ComputePlanAction to add all (collection,shard) pair as hints to AddReplicaSuggester when addreplica is selected as the preferred operation.
This commit is contained in:
Shalin Shekhar Mangar 2018-09-04 17:05:26 +05:30
parent 982ee3931b
commit 34a85014d5
8 changed files with 232 additions and 19 deletions

View File

@ -195,6 +195,9 @@ New Features
* SOLR-12629: The predict evaluator should work with the polyfit function (Joel Bernstein)
* SOLR-12715: NodeAddedTrigger should support adding replicas to new nodes by setting preferredOperation=addreplica.
(shalin)
Bug Fixes
----------------------

View File

@ -33,7 +33,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
public class AutoAddReplicasPlanAction extends ComputePlanAction {
@Override
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) {
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) throws IOException {
// for backward compatibility
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
@ -37,9 +38,11 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
@ -89,8 +92,7 @@ public class ComputePlanAction extends TriggerActionBase {
log.trace("-- state: {}", clusterState);
}
try {
Suggester initialSuggester = getSuggester(session, event, context, cloudManager);
Suggester suggester = initialSuggester;
Suggester suggester = getSuggester(session, event, context, cloudManager);
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
int requestedOperations = getRequestedNumOps(event);
if (requestedOperations > maxOperations) {
@ -197,12 +199,11 @@ public class ComputePlanAction extends TriggerActionBase {
private static final String START = "__start__";
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) {
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) throws IOException {
Suggester suggester;
switch (event.getEventType()) {
case NODEADDED:
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
suggester = getNodeAddedSuggester(cloudManager, session, event);
break;
case NODELOST:
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
@ -239,4 +240,37 @@ public class ComputePlanAction extends TriggerActionBase {
}
return suggester;
}
private Suggester getNodeAddedSuggester(SolrCloudManager cloudManager, Policy.Session session, TriggerEvent event) throws IOException {
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
Suggester suggester = session.getSuggester(action)
.hint(Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
switch (action) {
case ADDREPLICA:
// add all collection/shard pairs and let policy engine figure out which one
// to place on the target node
// todo in future we can prune ineligible collection/shard pairs
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
Set<Pair<String, String>> collShards = new HashSet<>();
clusterState.getCollectionStates().forEach((collectionName, collectionRef) -> {
DocCollection docCollection = collectionRef.get();
if (docCollection != null) {
docCollection.getActiveSlices().stream()
.map(slice -> new Pair<>(collectionName, slice.getName()))
.forEach(collShards::add);
}
});
suggester.hint(Suggester.Hint.COLL_SHARD, collShards);
break;
case MOVEREPLICA:
case NONE:
break;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unsupported preferredOperation=" + preferredOp + " for node added event");
}
return suggester;
}
}

View File

@ -24,17 +24,23 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
/**
* Trigger for the {@link TriggerEventType#NODEADDED} event
*/
@ -45,8 +51,11 @@ public class NodeAddedTrigger extends TriggerBase {
private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
private String preferredOp;
public NodeAddedTrigger(String name) {
super(TriggerEventType.NODEADDED, name);
TriggerUtils.validProperties(validProperties, PREFERRED_OP);
}
@Override
@ -71,7 +80,23 @@ public class NodeAddedTrigger extends TriggerBase {
} catch (Exception e) {
log.warn("Exception retrieving nodeLost markers", e);
}
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
preferredOp = preferredOp.toLowerCase(Locale.ROOT);
// verify
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp);
switch (action) {
case ADDREPLICA:
case MOVEREPLICA:
case NONE:
break;
default:
throw new TriggerValidationException("Unsupported preferredOperation=" + preferredOp + " specified for node added trigger");
}
}
@Override
@ -158,7 +183,7 @@ public class NodeAddedTrigger extends TriggerBase {
if (processor != null) {
log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name,
nodeNames, times, cloudManager.getTimeSource().getTimeNs());
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames))) {
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp))) {
// remove from tracking set only if the fire was accepted
nodeNames.forEach(n -> {
nodeNameVsTimeAdded.remove(n);
@ -195,11 +220,12 @@ public class NodeAddedTrigger extends TriggerBase {
public static class NodeAddedEvent extends TriggerEvent {
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames) {
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames, String preferredOp) {
// use the oldest time as the time of the event
super(eventType, source, times.get(0), null);
properties.put(NODE_NAMES, nodeNames);
properties.put(EVENT_TIMES, times);
properties.put(PREFERRED_OP, preferredOp);
}
}
}

View File

@ -21,8 +21,10 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -31,8 +33,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@ -44,9 +46,11 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
@ -135,9 +139,15 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
reset();
}
private void reset() {
fired.set(false);
triggerFiredLatch = new CountDownLatch(1);
actionContextPropsRef.set(null);
eventRef.set(null);
AssertingTriggerAction.expectedNode = null;
}
private void deleteChildrenRecursively(String path) throws Exception {
@ -243,8 +253,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
@LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
public void testNodeWithMultipleReplicasLost() throws Exception {
AssertingTriggerAction.expectedNode = null;
// start 3 more nodes
cluster.startJettySolrRunner();
cluster.startJettySolrRunner();
@ -318,7 +326,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
@Test
public void testNodeAdded() throws Exception {
AssertingTriggerAction.expectedNode = null;
CloudSolrClient solrClient = cluster.getSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
@ -421,8 +428,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
//2018-06-18 (commented) @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void testSelectedCollections() throws Exception {
log.info("Found number of jetties: {}", cluster.getJettySolrRunners().size());
AssertingTriggerAction.expectedNode = null;
// start 3 more nodes
cluster.startJettySolrRunner();
cluster.startJettySolrRunner();
@ -500,4 +505,111 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
assertFalse("not expected testSelected3", "testSelected3".equals(params.get("collection")));
}
@Test
public void testNodeAddedTriggerWithAddReplicaPreferredOp_1Shard() throws Exception {
String collectionNamePrefix = "testNodeAddedTriggerWithAddReplicaPreferredOp_1Shard";
int numShards = 1;
int numCollections = 5;
nodeAddedTriggerWithAddReplicaPreferredOp(collectionNamePrefix, numShards, numCollections);
}
@Test
public void testNodeAddedTriggerWithAddReplicaPreferredOp_2Shard() throws Exception {
String collectionNamePrefix = "testNodeAddedTriggerWithAddReplicaPreferredOp_2Shard";
int numShards = 2;
int numCollections = 5;
nodeAddedTriggerWithAddReplicaPreferredOp(collectionNamePrefix, numShards, numCollections);
}
private void nodeAddedTriggerWithAddReplicaPreferredOp(String collectionNamePrefix, int numShards, int numCollections) throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '1s'," +
"'enabled' : true," +
"'" + AutoScalingParams.PREFERRED_OP + "':'addreplica'," +
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name':'test','class':'" + AssertingTriggerAction.class.getName() + "'}]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// the default policy limits 1 replica per node, we need more right now
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<" + (1 + numCollections * numShards) + "', 'node':'#ANY'}," +
" {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
" {'nodeRole':'overseer', 'replica':0}" +
" ]" +
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionNamePrefix + "_0",
"conf", numShards, 1);
create.process(solrClient);
waitForState("Timed out waiting for replicas of new collection to be active",
collectionNamePrefix + "_0", (liveNodes, collectionState) ->
collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
JettySolrRunner newNode = cluster.startJettySolrRunner();
assertTrue(triggerFiredLatch.await(30, TimeUnit.SECONDS));
assertTrue(fired.get());
Map actionContext = actionContextPropsRef.get();
List operations = (List) actionContext.get("operations");
assertNotNull(operations);
assertEquals(numShards, operations.size());
Set<String> affectedShards = new HashSet<>(2);
for (Object operation : operations) {
assertTrue(operation instanceof CollectionAdminRequest.AddReplica);
CollectionAdminRequest.AddReplica addReplica = (CollectionAdminRequest.AddReplica) operation;
assertEquals(newNode.getNodeName(), addReplica.getNode());
assertEquals(collectionNamePrefix + "_0", addReplica.getCollection());
affectedShards.add(addReplica.getShard());
}
assertEquals(numShards, affectedShards.size());
for (int i = 1; i < numCollections; i++) {
create = CollectionAdminRequest.createCollection(collectionNamePrefix + "_" + i,
"conf", numShards, 2);
create.process(solrClient);
waitForState("Timed out waiting for replicas of new collection to be active",
collectionNamePrefix + "_" + i, (liveNodes, collectionState) ->
collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
}
reset();
newNode = cluster.startJettySolrRunner();
assertTrue(triggerFiredLatch.await(30, TimeUnit.SECONDS));
assertTrue(fired.get());
actionContext = actionContextPropsRef.get();
operations = (List) actionContext.get("operations");
assertNotNull(operations);
assertEquals(numCollections * numShards, operations.size());
Set<String> affectedCollections = new HashSet<>(numCollections);
affectedShards = new HashSet<>(numShards);
Set<Pair<String, String>> affectedCollShards = new HashSet<>(numCollections * numShards);
for (Object operation : operations) {
assertTrue(operation instanceof CollectionAdminRequest.AddReplica);
CollectionAdminRequest.AddReplica addReplica = (CollectionAdminRequest.AddReplica) operation;
assertEquals(newNode.getNodeName(), addReplica.getNode());
affectedCollections.add(addReplica.getCollection());
affectedShards.add(addReplica.getShard());
affectedCollShards.add(new Pair<>(addReplica.getCollection(), addReplica.getShard()));
}
assertEquals(numCollections, affectedCollections.size());
assertEquals(numShards, affectedShards.size());
assertEquals(numCollections * numShards, affectedCollShards.size());
}
}

View File

@ -179,11 +179,11 @@ public class MetricTriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'metric'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'metric': '" + tag + "'" +
"'metric': '" + tag + "'," +
"'above' : 100.0," +
"'collection': '" + collectionName + "'" +
"'shard':'" + shardId + "'" +
"'preferredOperation':'addreplica'" +
"'collection': '" + collectionName + "'," +
"'shard':'" + shardId + "'," +
"'preferredOperation':'addreplica'," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +

View File

@ -31,7 +31,7 @@ Triggers execute on the node that runs `Overseer`. They are scheduled to run per
== Event Types
Currently the following event types (and corresponding trigger implementations) are defined:
* `nodeAdded`: generated when a new node joins the cluster
* `nodeAdded`: generated when a node joins the cluster
* `nodeLost`: generated when a node leaves the cluster
* `metric`: generated when the configured metric crosses a configured lower or upper threshold value
* `indexSize`: generated when a shard size (defined as index size in bytes or number of documents)
@ -57,6 +57,40 @@ generated, which may significantly differ due to the rate limits set by `waitFor
`properties`:: (map, optional) Any additional properties. Currently includes e.g., `nodeNames` property that
indicates the nodes that were lost or added.
== Node Added Trigger
The `NodeAddedTrigger` generates `nodeAdded` events when a node joins the cluster. It can be used to either move replicas
from other nodes to the new node or to add new replicas.
Apart from the parameters described at <<#trigger-configuration, Trigger Configuration>>, this trigger supports the following configuration:
`preferredOperation`:: (string, optional, defaults to `MOVEREPLICA`) The operation to be performed in response to an event generated by this trigger. By default, replicas will be moved from other nodes to the added node. The only other supported value is `ADDREPLICA` which adds more replicas of the existing collections on the new node.
.Example: Node Added Trigger to move replicas to new node
[source,json]
----
{
"set-trigger": {
"name": "node_added_trigger",
"event": "nodeAdded",
"waitFor": "5s"
}
}
----
.Example: Node Added Trigger to add replicas on new node
[source,json]
----
{
"set-trigger": {
"name": "node_added_trigger",
"event": "nodeAdded",
"waitFor": "5s",
"preferredOperation": "addreplica"
}
}
----
== Auto Add Replicas Trigger
When a collection has the parameter `autoAddReplicas` set to true then a trigger configuration named `.auto_add_replicas` is automatically created to watch for nodes going away. This trigger produces `nodeLost` events,

View File

@ -1708,6 +1708,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return this;
}
public String getShard() {
return shard;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());