SOLR-10340: New set-listener and remove-listener API

This commit is contained in:
Shalin Shekhar Mangar 2017-03-24 14:07:56 +05:30
parent e4b3df41a9
commit 0fb9d1bd1b
5 changed files with 374 additions and 28 deletions

View File

@ -63,6 +63,8 @@ New Features
* SOLR-10339: New set-trigger and remove-trigger APIs for autoscaling. (shalin)
* SOLR-10340: New set-listener and remove-listener API for autoscaling. (shalin)
Bug Fixes
----------------------
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.

View File

@ -17,6 +17,11 @@
package org.apache.solr.cloud.autoscaling;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
public class AutoScaling {
public enum EventType {
@ -29,4 +34,42 @@ public class AutoScaling {
INDEXRATE
}
public enum TriggerStage {
STARTED,
ABORTED,
SUCCEEDED,
FAILED,
BEFORE_ACTION,
AFTER_ACTION
}
public static interface TriggerListener {
public void triggerFired(Trigger trigger, Event event);
}
public static class HttpCallbackListener implements TriggerListener {
@Override
public void triggerFired(Trigger trigger, Event event) {
}
}
public static interface Trigger {
public String getName();
public EventType getEventType();
public boolean isEnabled();
public Map<String, Object> getProperties();
}
public static interface Event {
public String getSource();
public Date getTime();
public EventType getType();
}
}

View File

@ -20,10 +20,13 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.api.Api;
@ -88,10 +91,114 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
break;
case "remove-trigger":
handleRemoveTrigger(req, rsp, op);
break;
case "set-listener":
handleSetListener(req, rsp, op);
break;
case "remove-listener":
handleRemoveListener(req, rsp, op);
break;
}
}
}
private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String listenerName = op.getStr("name");
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
}
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> listeners = (Map<String, Object>) autoScalingConf.get("listeners");
if (listeners == null || !listeners.containsKey(listenerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No listener exists with name: " + listenerName);
}
zkSetListener(container.getZkController().getZkStateReader(), listenerName, null);
rsp.getValues().add("result", "success");
}
private void handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String listenerName = op.getStr("name");
String triggerName = op.getStr("trigger");
List<String> stageNames = op.getStrs("stage", Collections.emptyList());
String listenerClass = op.getStr("class");
List<String> beforeActions = op.getStrs("beforeAction", Collections.emptyList());
List<String> afterActions = op.getStrs("afterAction", Collections.emptyList());
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
}
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
if (triggers == null || !triggers.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A trigger with the name " + triggerName + " does not exist");
}
Map<String, Object> triggerProps = (Map<String, Object>) triggers.get(triggerName);
if (stageNames.isEmpty() && beforeActions.isEmpty() && afterActions.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Either 'stage' or 'beforeAction' or 'afterAction' must be specified");
}
for (String stage : stageNames) {
try {
AutoScaling.TriggerStage.valueOf(stage);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid stage name: " + stage);
}
}
if (listenerClass == null || listenerClass.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The 'class' of the listener cannot be null or empty");
}
// validate that we can load the listener class
// todo nocommit -- what about MemClassLoader?
try {
container.getResourceLoader().findClass(listenerClass, AutoScaling.TriggerListener.class);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Listener not found: " + listenerClass, e);
}
List<Map<String, String>> actions = (List<Map<String, String>>) triggerProps.get("actions");
Set<String> actionNames = new HashSet<>();
actionNames.addAll(beforeActions);
actionNames.addAll(afterActions);
for (Map<String, String> action : actions) {
actionNames.remove(action.get("name"));
}
if (!actionNames.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger '" + triggerName + "' does not have actions named: " + actionNames);
}
// todo - handle races between competing set-trigger and set-listener invocations
zkSetListener(container.getZkController().getZkStateReader(), listenerName, op.getValuesExcluding("name"));
rsp.getValues().add("result", "success");
}
private void zkSetListener(ZkStateReader reader, String listenerName, Map<String, Object> listenerProperties) throws KeeperException, InterruptedException {
while (true) {
Stat stat = new Stat();
ZkNodeProps loaded = null;
byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
loaded = ZkNodeProps.load(data);
Map<String, Object> listeners = (Map<String, Object>) loaded.get("listeners");
if (listeners == null) listeners = new HashMap<>(1);
if (listenerProperties != null) {
listeners.put(listenerName, listenerProperties);
} else {
listeners.remove(listenerName);
}
loaded = loaded.plus("listeners", listeners);
try {
reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
} catch (KeeperException.BadVersionException bve) {
// somebody else has changed the configuration so we must retry
continue;
}
break;
}
}
private void handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
@ -144,7 +251,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
try {
container.getResourceLoader().findClass(klass, TriggerAction.class);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error trying to find Action: " + klass, e);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action not found: " + klass, e);
}
}
@ -154,6 +261,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
boolean removeListeners = op.getBoolean("removeListeners", false);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
@ -163,6 +271,26 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (triggers == null || !triggers.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
Map<String, Map<String, Object>> listeners = (Map<String, Map<String, Object>>) autoScalingConf.get("listeners");
Set<String> activeListeners = new HashSet<>();
if (listeners != null) {
for (Map.Entry<String, Map<String, Object>> entry : listeners.entrySet()) {
Map<String, Object> listenerProps = entry.getValue();
if (triggerName.equals(listenerProps.get("trigger")) && !removeListeners) {
activeListeners.add(entry.getKey());
}
}
}
if (removeListeners) {
for (String activeListener : activeListeners) {
zkSetListener(container.getZkController().getZkStateReader(), activeListener, null);
}
} else if (!activeListeners.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"No listeners should exist for trigger: " + triggerName + ". Found listeners: " + activeListeners);
}
zkSetTrigger(container.getZkController().getZkStateReader(), triggerName, null);
rsp.getValues().add("result", "success");
}

View File

@ -11,36 +11,36 @@
},
"commands": {
"set-trigger": {
"type":"object",
"type": "object",
"description": "The set-trigger command allows you to add and update triggers on various system metrics",
"properties": {
"name": {
"type": "string",
"description": "The name of the trigger"
},
"event" : {
"type" : "string",
"description" : "The event type on which to set a trigger"
"event": {
"type": "string",
"description": "The event type on which to set a trigger"
},
"waitFor" : {
"type" : "string",
"description" : "The amount of time to wait after the trigger condition is satisfied before trigger is activated"
"waitFor": {
"type": "string",
"description": "The amount of time to wait after the trigger condition is satisfied before trigger is activated"
},
"lowerBound" : {
"type" : "number",
"description" : "The lower bound of the condition below which the trigger is activated"
"lowerBound": {
"type": "number",
"description": "The lower bound of the condition below which the trigger is activated"
},
"upperBound" : {
"type" : "number",
"description" : "The upper bound of the condition below which the trigger is activated"
"upperBound": {
"type": "number",
"description": "The upper bound of the condition below which the trigger is activated"
},
"enabled" : {
"type" : "boolean",
"description" : "The state of the trigger"
"enabled": {
"type": "boolean",
"description": "The state of the trigger"
},
"actions" : {
"type" : "array",
"description" : "The actions to be performed in sequence when the trigger is activated",
"actions": {
"type": "array",
"description": "The actions to be performed in sequence when the trigger is activated",
"items": {
"type": "object"
}
@ -53,13 +53,78 @@
},
"remove-trigger": {
"description": "Remove a trigger",
"type":"object",
"properties" : {
"name" : {
"type" : "string",
"description" : "The name of the trigger to be removed"
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the trigger to be removed"
},
"removeListeners": {
"type": "boolean",
"description": "If true, all listeners of this triggers are deleted together with the trigger"
}
}
},
"required": [
"name"
]
},
"set-listener": {
"description": "The set-listener command lets you add a listener to a trigger",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the listener"
},
"trigger": {
"type": "string",
"description": "The name of the trigger to listen to"
},
"stage": {
"type": "array",
"description": "The stage of the trigger for which to listen"
"items": {
"type" : "string"
}
},
"beforeAction": {
"type": "array",
"description": "The name of the action before which the listener should be notified"
"items": {
"type" : "string"
}
},
"afterAction": {
"type": "array",
"description": "The name of the action after which the listener should be notified"
"items": {
"type" : "string"
}
},
"class": {
"type": "string",
"description": "The listener class to be notified once the given stage of the given trigger is activated"
}
},
"required": [
"name",
"trigger",
"class"
],
"additionalProperties": true
},
"remove-listener": {
"description": "Remove a listener",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the listener to be removed"
}
},
"required": [
"name"
]
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -53,7 +54,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
// todo nocommit -- add testing for the v2 path
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
String path = "/admin/autoscaling";
String addTriggerCommand = "{\n" +
String setTriggerCommand = "{\n" +
"\t\"set-trigger\" : {\n" +
"\t\t\"name\" : \"node_lost_trigger\",\n" +
"\t\t\"event\" : \"nodeLost\",\n" +
@ -76,7 +77,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
"\t\t]\n" +
"\t}\n" +
"}";
SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, addTriggerCommand);
SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
@ -91,12 +92,100 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
assertEquals(4, nodeLostTrigger.size());
assertEquals("600", nodeLostTrigger.get("waitFor").toString());
setTriggerCommand = "{\n" +
"\t\"set-trigger\" : {\n" +
"\t\t\"name\" : \"node_lost_trigger\",\n" +
"\t\t\"event\" : \"nodeLost\",\n" +
"\t\t\"waitFor\" : \"20m\",\n" +
"\t\t\"enabled\" : \"false\",\n" +
"\t\t\"actions\" : [\n" +
"\t\t\t{\n" +
"\t\t\t\t\"name\" : \"compute_plan\",\n" +
"\t\t\t\t\"class\" : \"solr.ComputePlanAction\"\n" +
"\t\t\t},\n" +
"\t\t\t{\n" +
"\t\t\t\t\"name\" : \"execute_plan\",\n" +
"\t\t\t\t\"class\" : \"solr.ExecutePlanAction\"\n" +
"\t\t\t},\n" +
"\t\t\t{\n" +
"\t\t\t\t\"name\" : \"log_plan\",\n" +
"\t\t\t\t\"class\" : \"solr.LogPlanAction\",\n" +
"\t\t\t\t\"collection\" : \".system\"\n" +
"\t\t\t}\n" +
"\t\t]\n" +
"\t}\n" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
assertNotNull(triggers);
assertEquals(1, triggers.size());
assertTrue(triggers.containsKey("node_lost_trigger"));
nodeLostTrigger = (Map<String, Object>) triggers.get("node_lost_trigger");
assertEquals(4, nodeLostTrigger.size());
assertEquals("1200", nodeLostTrigger.get("waitFor").toString());
assertEquals("false", nodeLostTrigger.get("enabled").toString());
String setListenerCommand = "{\n" +
"\t\"set-listener\" : \n" +
"\t\t{\n" +
"\t\t\t\"name\" : \"xyz\",\n" +
"\t\t\t\"trigger\" : \"node_lost_trigger\",\n" +
"\t\t\t\"stage\" : [\"STARTED\",\"ABORTED\",\"SUCCEEDED\"],\n" +
"\t\t\t\"beforeAction\" : \"execute_plan\",\n" +
"\t\t\t\"class\" : \"org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener\",\n" +
"\t\t\t\"url\" : \"http://xyz.com/on_node_lost?node={$LOST_NODE_NAME}\"\n" +
"\t\t}\n" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
Map<String, Object> listeners = (Map<String, Object>) loaded.get("listeners");
assertNotNull(listeners);
assertEquals(1, listeners.size());
assertTrue(listeners.containsKey("xyz"));
Map<String, Object> xyzListener = (Map<String, Object>) listeners.get("xyz");
assertEquals(5, xyzListener.size());
assertEquals("org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener", xyzListener.get("class").toString());
String removeTriggerCommand = "{\n" +
"\t\"remove-trigger\" : {\n" +
"\t\t\"name\" : \"node_lost_trigger\"\n" +
"\t}\n" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, removeTriggerCommand);
try {
response = solrClient.request(req);
fail("Trying to remove trigger which has listeners registered should have failed");
} catch (HttpSolrClient.RemoteSolrException e) {
// expected
}
String removeListenerCommand = "{\n" +
"\t\"remove-listener\" : {\n" +
"\t\t\"name\" : \"xyz\"\n" +
"\t}\n" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, removeListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
listeners = (Map<String, Object>) loaded.get("listeners");
assertNotNull(listeners);
assertEquals(0, listeners.size());
removeTriggerCommand = "{\n" +
"\t\"remove-trigger\" : {\n" +
"\t\t\"name\" : \"node_lost_trigger\"\n" +
"\t}\n" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, removeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
@ -104,6 +193,25 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
triggers = (Map<String, Object>) loaded.get("triggers");
assertNotNull(triggers);
assertEquals(0, triggers.size());
setListenerCommand = "{\n" +
"\t\"set-listener\" : \n" +
"\t\t{\n" +
"\t\t\t\"name\" : \"xyz\",\n" +
"\t\t\t\"trigger\" : \"node_lost_trigger\",\n" +
"\t\t\t\"stage\" : [\"STARTED\",\"ABORTED\",\"SUCCEEDED\"],\n" +
"\t\t\t\"beforeAction\" : \"execute_plan\",\n" +
"\t\t\t\"class\" : \"org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener\",\n" +
"\t\t\t\"url\" : \"http://xyz.com/on_node_lost?node={$LOST_NODE_NAME}\"\n" +
"\t\t}\n" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, removeListenerCommand);
try {
response = solrClient.request(req);
fail("Adding a listener on a non-existent trigger should have failed");
} catch (HttpSolrClient.RemoteSolrException e) {
// expected
}
}
static class AutoScalingRequest extends SolrRequest {