mirror of
https://github.com/apache/lucene.git
synced 2025-02-06 18:18:38 +00:00
SOLR-10358: New suspend-trigger and resume-trigger APIs for autoscaling
This commit is contained in:
parent
c98909bcac
commit
7e4dc2b79d
@ -65,6 +65,8 @@ New Features
|
||||
|
||||
* SOLR-10340: New set-listener and remove-listener API for autoscaling. (shalin)
|
||||
|
||||
* SOLR-10358: New suspend-trigger and resume-trigger APIs for autoscaling. (shalin)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
|
||||
|
@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -34,6 +35,7 @@ import org.apache.solr.api.ApiBag;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.SuppressForbidden;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.handler.RequestHandlerBase;
|
||||
@ -98,10 +100,74 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
||||
case "remove-listener":
|
||||
handleRemoveListener(req, rsp, op);
|
||||
break;
|
||||
case "suspend-trigger":
|
||||
handleSuspendTrigger(req, rsp, op);
|
||||
break;
|
||||
case "resume-trigger":
|
||||
handleResumeTrigger(req, rsp, op);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
|
||||
String triggerName = op.getStr("name");
|
||||
|
||||
if (triggerName == null || triggerName.trim().length() == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
|
||||
}
|
||||
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
|
||||
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
|
||||
if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
|
||||
}
|
||||
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
|
||||
if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
|
||||
Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
|
||||
triggerProps.put("enabled", true);
|
||||
zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
|
||||
}
|
||||
}
|
||||
rsp.getValues().add("result", "success");
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "currentTimeMillis is used to find the resume time for the trigger")
|
||||
private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
|
||||
String triggerName = op.getStr("name");
|
||||
|
||||
if (triggerName == null || triggerName.trim().length() == 0) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
|
||||
}
|
||||
|
||||
String timeout = op.getStr("timeout", null);
|
||||
Date resumeTime = null;
|
||||
if (timeout != null) {
|
||||
try {
|
||||
int timeoutSeconds = parseHumanTime(timeout);
|
||||
resumeTime = new Date(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeoutSeconds, TimeUnit.SECONDS));
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'timeout' value for suspend trigger: " + triggerName);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
|
||||
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
|
||||
if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
|
||||
}
|
||||
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
|
||||
if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
|
||||
Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
|
||||
triggerProps.put("enabled", false);
|
||||
if (resumeTime != null) {
|
||||
triggerProps.put("resumeAt", resumeTime.getTime());
|
||||
}
|
||||
zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
|
||||
}
|
||||
}
|
||||
rsp.getValues().add("result", "success");
|
||||
}
|
||||
|
||||
private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
|
||||
String listenerName = op.getStr("name");
|
||||
|
||||
@ -214,21 +280,11 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
||||
|
||||
String waitForStr = op.getStr("waitFor", null);
|
||||
if (waitForStr != null) {
|
||||
char c = waitForStr.charAt(waitForStr.length() - 1);
|
||||
long waitForValue = Long.parseLong(waitForStr.substring(0, waitForStr.length() - 1));
|
||||
int seconds;
|
||||
switch (c) {
|
||||
case 'h':
|
||||
seconds = (int) TimeUnit.HOURS.toSeconds(waitForValue);
|
||||
break;
|
||||
case 'm':
|
||||
seconds = (int) TimeUnit.MINUTES.toSeconds(waitForValue);
|
||||
break;
|
||||
case 's':
|
||||
seconds = (int) waitForValue;
|
||||
break;
|
||||
default:
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value in trigger: " + triggerName);
|
||||
int seconds = 0;
|
||||
try {
|
||||
seconds = parseHumanTime(waitForStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value in trigger: " + triggerName);
|
||||
}
|
||||
op.getDataMap().put("waitFor", seconds);
|
||||
}
|
||||
@ -260,6 +316,26 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
|
||||
rsp.getValues().add("result", "success");
|
||||
}
|
||||
|
||||
private int parseHumanTime(String timeStr) {
|
||||
char c = timeStr.charAt(timeStr.length() - 1);
|
||||
long timeValue = Long.parseLong(timeStr.substring(0, timeStr.length() - 1));
|
||||
int seconds;
|
||||
switch (c) {
|
||||
case 'h':
|
||||
seconds = (int) TimeUnit.HOURS.toSeconds(timeValue);
|
||||
break;
|
||||
case 'm':
|
||||
seconds = (int) TimeUnit.MINUTES.toSeconds(timeValue);
|
||||
break;
|
||||
case 's':
|
||||
seconds = (int) timeValue;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid time value");
|
||||
}
|
||||
return seconds;
|
||||
}
|
||||
|
||||
private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
|
||||
String triggerName = op.getStr("name");
|
||||
boolean removeListeners = op.getBoolean("removeListeners", false);
|
||||
|
@ -125,6 +125,36 @@
|
||||
"required": [
|
||||
"name"
|
||||
]
|
||||
},
|
||||
"suspend-trigger": {
|
||||
"description": "Pauses a trigger until an explicit resume is invoked or if the optional timeout expires",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The name of the trigger to be suspended or '#EACH' to suspend all triggers"
|
||||
},
|
||||
"timeout": {
|
||||
"type": "string",
|
||||
"description": "Optional timeout after which all triggers are resumed automatically"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"name"
|
||||
]
|
||||
},
|
||||
"resume-trigger": {
|
||||
"description": "Resumes a suspended trigger",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "The name of the trigger to be resumed or '#EACH' to resume all triggers"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"name"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,6 +49,139 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
// todo nocommit -- add testing for the v2 path
|
||||
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
|
||||
String path = "/admin/autoscaling";
|
||||
String setTriggerCommand = "{\n" +
|
||||
"\t\"set-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"node_lost_trigger\",\n" +
|
||||
"\t\t\"event\" : \"nodeLost\",\n" +
|
||||
"\t\t\"waitFor\" : \"10m\",\n" +
|
||||
"\t\t\"enabled\" : \"true\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setTriggerCommand = "{\n" +
|
||||
"\t\"set-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"node_added_trigger\",\n" +
|
||||
"\t\t\"event\" : \"nodeAdded\",\n" +
|
||||
"\t\t\"waitFor\" : \"10m\",\n" +
|
||||
"\t\t\"enabled\" : \"true\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String suspendTriggerCommand = "{\n" +
|
||||
"\t\"suspend-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"node_lost_trigger\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
byte[] data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
|
||||
ZkNodeProps loaded = ZkNodeProps.load(data);
|
||||
Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
|
||||
assertNotNull(triggers);
|
||||
assertEquals(2, triggers.size());
|
||||
assertTrue(triggers.containsKey("node_lost_trigger"));
|
||||
assertTrue(triggers.containsKey("node_added_trigger"));
|
||||
Map<String, Object> nodeLostTrigger = (Map<String, Object>) triggers.get("node_lost_trigger");
|
||||
assertEquals(4, nodeLostTrigger.size());
|
||||
assertEquals("false", nodeLostTrigger.get("enabled").toString());
|
||||
Map<String, Object> nodeAddedTrigger = (Map<String, Object>) triggers.get("node_added_trigger");
|
||||
assertEquals(4, nodeAddedTrigger.size());
|
||||
assertEquals("true", nodeAddedTrigger.get("enabled").toString());
|
||||
|
||||
suspendTriggerCommand = "{\n" +
|
||||
"\t\"suspend-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"#EACH\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
|
||||
loaded = ZkNodeProps.load(data);
|
||||
triggers = (Map<String, Object>) loaded.get("triggers");
|
||||
assertNotNull(triggers);
|
||||
assertEquals(2, triggers.size());
|
||||
nodeLostTrigger = (Map<String, Object>) triggers.get("node_lost_trigger");
|
||||
assertEquals(4, nodeLostTrigger.size());
|
||||
assertEquals("false", nodeLostTrigger.get("enabled").toString());
|
||||
nodeAddedTrigger = (Map<String, Object>) triggers.get("node_added_trigger");
|
||||
assertEquals(4, nodeAddedTrigger.size());
|
||||
assertEquals("false", nodeAddedTrigger.get("enabled").toString());
|
||||
|
||||
String resumeTriggerCommand = "{\n" +
|
||||
"\t\"resume-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"node_added_trigger\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
|
||||
loaded = ZkNodeProps.load(data);
|
||||
triggers = (Map<String, Object>) loaded.get("triggers");
|
||||
assertNotNull(triggers);
|
||||
assertEquals(2, triggers.size());
|
||||
nodeLostTrigger = (Map<String, Object>) triggers.get("node_lost_trigger");
|
||||
assertEquals(4, nodeLostTrigger.size());
|
||||
assertEquals("false", nodeLostTrigger.get("enabled").toString());
|
||||
nodeAddedTrigger = (Map<String, Object>) triggers.get("node_added_trigger");
|
||||
assertEquals(4, nodeAddedTrigger.size());
|
||||
assertEquals("true", nodeAddedTrigger.get("enabled").toString());
|
||||
|
||||
resumeTriggerCommand = "{\n" +
|
||||
"\t\"resume-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"#EACH\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
|
||||
loaded = ZkNodeProps.load(data);
|
||||
triggers = (Map<String, Object>) loaded.get("triggers");
|
||||
assertNotNull(triggers);
|
||||
assertEquals(2, triggers.size());
|
||||
nodeLostTrigger = (Map<String, Object>) triggers.get("node_lost_trigger");
|
||||
assertEquals(4, nodeLostTrigger.size());
|
||||
assertEquals("true", nodeLostTrigger.get("enabled").toString());
|
||||
nodeAddedTrigger = (Map<String, Object>) triggers.get("node_added_trigger");
|
||||
assertEquals(4, nodeAddedTrigger.size());
|
||||
assertEquals("true", nodeAddedTrigger.get("enabled").toString());
|
||||
|
||||
suspendTriggerCommand = "{\n" +
|
||||
"\t\"suspend-trigger\" : {\n" +
|
||||
"\t\t\"name\" : \"node_lost_trigger\",\n" +
|
||||
"\t\t\"timeout\" : \"1h\"\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
data = zkClient().getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, null, true);
|
||||
loaded = ZkNodeProps.load(data);
|
||||
triggers = (Map<String, Object>) loaded.get("triggers");
|
||||
assertNotNull(triggers);
|
||||
assertEquals(2, triggers.size());
|
||||
nodeLostTrigger = (Map<String, Object>) triggers.get("node_lost_trigger");
|
||||
assertEquals(5, nodeLostTrigger.size());
|
||||
assertEquals("false", nodeLostTrigger.get("enabled").toString());
|
||||
assertTrue(nodeLostTrigger.containsKey("resumeAt"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
Loading…
x
Reference in New Issue
Block a user