SOLR-10606: Correctly handle #EACH trigger suspend / resume. Report names of

actually modified trigger names.
This commit is contained in:
Andrzej Bialecki 2017-05-30 16:21:56 +02:00
parent cae6b6ef13
commit ee2be2024e
3 changed files with 179 additions and 63 deletions

View File

@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CommonParams.JSON;
import static org.apache.solr.common.params.AutoScalingParams.*;
/**
* Handler for /cluster/autoscaling
@ -74,16 +75,16 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
public AutoScalingHandler(CoreContainer container) {
this.container = container;
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
map.put("class", "solr.ComputePlanAction");
map.put(NAME, "compute_plan");
map.put(CLASS, "solr.ComputePlanAction");
DEFAULT_ACTIONS.add(map);
map = new HashMap<>(2);
map.put("name", "execute_plan");
map.put("class", "solr.ExecutePlanAction");
map.put(NAME, "execute_plan");
map.put(CLASS, "solr.ExecutePlanAction");
DEFAULT_ACTIONS.add(map);
map = new HashMap<>(2);
map.put("name", "log_plan");
map.put("class", "solr.LogPlanAction");
map.put(NAME, "log_plan");
map.put(CLASS, "solr.LogPlanAction");
DEFAULT_ACTIONS.add(map);
}
@ -107,7 +108,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
Map<String, Object> map = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
if (parts.size() == 2) {
rsp.getValues().addAll(map);
} else if (parts.size() == 3 && "diagnostics".equals(parts.get(2))) {
} else if (parts.size() == 3 && DIAGNOSTICS.equals(parts.get(2))) {
handleDiagnostics(rsp, map);
}
} else {
@ -121,34 +122,34 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
for (CommandOperation op : ops) {
switch (op.name) {
case "set-trigger":
case CMD_SET_TRIGGER:
handleSetTrigger(req, rsp, op);
break;
case "remove-trigger":
case CMD_REMOVE_TRIGGER:
handleRemoveTrigger(req, rsp, op);
break;
case "set-listener":
case CMD_SET_LISTENER:
handleSetListener(req, rsp, op);
break;
case "remove-listener":
case CMD_REMOVE_LISTENER:
handleRemoveListener(req, rsp, op);
break;
case "suspend-trigger":
case CMD_SUSPEND_TRIGGER:
handleSuspendTrigger(req, rsp, op);
break;
case "resume-trigger":
case CMD_RESUME_TRIGGER:
handleResumeTrigger(req, rsp, op);
break;
case "set-policy":
case CMD_SET_POLICY:
handleSetPolicies(req, rsp, op);
break;
case "remove-policy":
case CMD_REMOVE_POLICY:
handleRemovePolicy(req, rsp, op);
break;
case "set-cluster-preferences":
case CMD_SET_CLUSTER_PREFERENCES:
handleSetClusterPreferences(req, rsp, op);
break;
case "set-cluster-policy":
case CMD_SET_CLUSTER_POLICY:
handleSetClusterPolicy(req, rsp, op);
break;
default:
@ -248,34 +249,48 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
triggerProps.put("enabled", true);
zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
Set<String> changed = new HashSet<>();
if (triggers == null) {
if (Policy.EACH.equals(triggerName)) {
// no harm no foul
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
} else {
if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
Boolean enabled = (Boolean)triggerProps.get(ENABLED);
if (enabled != null && !enabled) {
triggerProps.put(ENABLED, true);
zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
changed.add(entry.getKey());
}
}
}
}
rsp.getValues().add("changed", changed);
rsp.getValues().add("result", "success");
}
private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
String timeout = op.getStr("timeout", null);
String timeout = op.getStr(TIMEOUT, null);
Date resumeTime = null;
if (timeout != null) {
try {
@ -289,24 +304,39 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
triggerProps.put("enabled", false);
if (resumeTime != null) {
triggerProps.put("resumeAt", resumeTime.getTime());
Set<String> changed = new HashSet<>();
if (triggers == null) {
if (Policy.EACH.equals(triggerName)) {
// no harm no foul
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
} else {
if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
Boolean enabled = (Boolean)triggerProps.get(ENABLED);
if (enabled == null || enabled) {
triggerProps.put(ENABLED, false);
if (resumeTime != null) {
triggerProps.put(RESUME_AT, resumeTime.getTime());
}
zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
changed.add(entry.getKey());
}
}
zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
}
}
rsp.getValues().add("changed", changed);
rsp.getValues().add("result", "success");
}
private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String listenerName = op.getStr("name");
String listenerName = op.getStr(NAME);
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
@ -321,12 +351,12 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String listenerName = op.getStr("name");
String triggerName = op.getStr("trigger");
List<String> stageNames = op.getStrs("stage", Collections.emptyList());
String listenerClass = op.getStr("class");
List<String> beforeActions = op.getStrs("beforeAction", Collections.emptyList());
List<String> afterActions = op.getStrs("afterAction", Collections.emptyList());
String listenerName = op.getStr(NAME);
String triggerName = op.getStr(TRIGGER);
List<String> stageNames = op.getStrs(STAGE, Collections.emptyList());
String listenerClass = op.getStr(CLASS);
List<String> beforeActions = op.getStrs(BEFORE_ACTION, Collections.emptyList());
List<String> afterActions = op.getStrs(AFTER_ACTION, Collections.emptyList());
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
@ -367,7 +397,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
actionNames.addAll(beforeActions);
actionNames.addAll(afterActions);
for (Map<String, String> action : actions) {
actionNames.remove(action.get("name"));
actionNames.remove(action.get(NAME));
}
if (!actionNames.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger '" + triggerName + "' does not have actions named: " + actionNames);
@ -403,19 +433,19 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
String eventTypeStr = op.getStr("event");
String eventTypeStr = op.getStr(EVENT);
if (eventTypeStr == null || eventTypeStr.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The event type cannot be null or empty in trigger: " + triggerName);
}
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
String waitForStr = op.getStr("waitFor", null);
String waitForStr = op.getStr(WAIT_FOR, null);
if (waitForStr != null) {
int seconds = 0;
try {
@ -423,25 +453,25 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value in trigger: " + triggerName);
}
op.getDataMap().put("waitFor", seconds);
op.getDataMap().put(WAIT_FOR, seconds);
}
Integer lowerBound = op.getInt("lowerBound", null);
Integer upperBound = op.getInt("upperBound", null);
Integer lowerBound = op.getInt(LOWER_BOUND, null);
Integer upperBound = op.getInt(UPPER_BOUND, null);
List<Map<String, String>> actions = (List<Map<String, String>>) op.getVal("actions");
List<Map<String, String>> actions = (List<Map<String, String>>) op.getVal(ACTIONS);
if (actions == null) {
actions = DEFAULT_ACTIONS;
op.getDataMap().put("actions", actions);
op.getDataMap().put(ACTIONS, actions);
}
// validate that we can load all the actions
// todo nocommit -- what about MemClassLoader?
for (Map<String, String> action : actions) {
if (!action.containsKey("name") || !action.containsKey("class")) {
if (!action.containsKey(NAME) || !action.containsKey(CLASS)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No 'name' or 'class' specified for action: " + action);
}
String klass = action.get("class");
String klass = action.get(CLASS);
try {
container.getResourceLoader().findClass(klass, TriggerAction.class);
} catch (Exception e) {
@ -474,8 +504,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
String triggerName = op.getStr("name");
boolean removeListeners = op.getBoolean("removeListeners", false);
String triggerName = op.getStr(NAME);
boolean removeListeners = op.getBoolean(REMOVE_LISTENERS, false);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
@ -491,7 +521,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (listeners != null) {
for (Map.Entry<String, Map<String, Object>> entry : listeners.entrySet()) {
Map<String, Object> listenerProps = entry.getValue();
if (triggerName.equals(listenerProps.get("trigger")) && !removeListeners) {
if (triggerName.equals(listenerProps.get(TRIGGER)) && !removeListeners) {
activeListeners.add(entry.getKey());
}
}

View File

@ -75,14 +75,34 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
@Test
public void testSuspendTrigger() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String suspendEachCommand = "{\n" +
"\t\"suspend-trigger\" : {\n" +
"\t\t\"name\" : \"" + Policy.EACH + "\"\n" +
"\t}\n" +
"}";
String resumeEachCommand = "{\n" +
"\t\"resume-trigger\" : {\n" +
"\t\t\"name\" : \"" + Policy.EACH + "\"\n" +
"\t}\n" +
"}";
// these should be no-ops because there are no triggers, and it should succeed
SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendEachCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
assertEquals(response.get("changed").toString(), "[]");
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeEachCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
assertEquals(response.get("changed").toString(), "[]");
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_trigger'," +
"'event' : 'nodeLost'," +
"'waitFor' : '10m'," +
"'enabled' : true}}";
SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
setTriggerCommand = "{" +
@ -105,6 +125,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
assertEquals(response.get("changed").toString(), "[node_lost_trigger]");
byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
ZkNodeProps loaded = ZkNodeProps.load(data);
@ -122,12 +143,15 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
suspendTriggerCommand = "{" +
"'suspend-trigger' : {" +
"'name' : '#EACH'" +
"'name' : '" + Policy.EACH + "'" +
"}" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
List<String> changed = (List<String>)response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_added_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
@ -148,6 +172,9 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
changed = (List<String>)response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_added_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
@ -162,12 +189,15 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
resumeTriggerCommand = "{" +
"'resume-trigger' : {" +
"'name' : '#EACH'" +
"'name' : '" + Policy.EACH + "'" +
"}" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
changed = (List<String>)response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_lost_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
@ -189,6 +219,9 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
changed = (List<String>)response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_lost_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common.params;
/**
* Requests parameters for autoscaling.
*/
public interface AutoScalingParams {
// parameters
String DIAGNOSTICS = "diagnostics";
String NAME = "name";
String TRIGGER = "trigger";
String EVENT = "event";
String ACTIONS = "actions";
String WAIT_FOR = "waitFor";
String LOWER_BOUND = "lowerBound";
String UPPER_BOUND = "upperBound";
String STAGE = "stage";
String CLASS = "class";
String ENABLED = "enabled";
String RESUME_AT = "resumeAt";
String BEFORE_ACTION = "beforeAction";
String AFTER_ACTION = "afterAction";
String TIMEOUT = "timeout";
String REMOVE_LISTENERS = "removeListeners";
// commands
String CMD_SET_TRIGGER = "set-trigger";
String CMD_REMOVE_TRIGGER = "remove-trigger";
String CMD_SET_LISTENER = "set-listener";
String CMD_REMOVE_LISTENER = "remove-listener";
String CMD_SUSPEND_TRIGGER = "suspend-trigger";
String CMD_RESUME_TRIGGER = "resume-trigger";
String CMD_SET_POLICY = "set-policy";
String CMD_REMOVE_POLICY = "remove-policy";
String CMD_SET_CLUSTER_PREFERENCES = "set-cluster-preferences";
String CMD_SET_CLUSTER_POLICY = "set-cluster-policy";
}