SOLR-11202: Implement a set-property command for AutoScaling API

This commit is contained in:
Shalin Shekhar Mangar 2017-11-28 16:08:32 +05:30
parent dae529de58
commit 207e546122
11 changed files with 516 additions and 78 deletions

View File

@ -97,6 +97,8 @@ New Features
* SOLR-9743: A new UTILIZENODE command (noble)
* SOLR-11202: Implement a set-property command for AutoScaling API. (ab, shalin)
Bug Fixes
----------------------

View File

@ -45,6 +45,13 @@ public class ActionThrottle {
this.minMsBetweenActions = minMsBetweenActions;
this.timeSource = timeSource;
}
public ActionThrottle(String name, long minMsBetweenActions, long lastActionStartedAt) {
this.name = name;
this.minMsBetweenActions = minMsBetweenActions;
this.lastActionStartedAt = lastActionStartedAt;
this.timeSource = TimeSource.NANO_TIME;
}
public void markAttemptingAction() {
lastActionStartedAt = timeSource.getTime();
@ -75,4 +82,8 @@ public class ActionThrottle {
}
}
}
public Long getLastActionStartedAt() {
return lastActionStartedAt;
}
}

View File

@ -202,6 +202,9 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
case CMD_SET_CLUSTER_POLICY:
currentConfig = handleSetClusterPolicy(req, rsp, op, currentConfig);
break;
case CMD_SET_PROPERTIES:
currentConfig = handleSetProperties(req, rsp, op, currentConfig);
break;
default:
op.addError("Unknown command: " + op.name);
}
@ -228,6 +231,17 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
rsp.getValues().add("result", "success");
}
private AutoScalingConfig handleSetProperties(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op, AutoScalingConfig currentConfig) {
Map<String, Object> map = op.getDataMap() == null ? Collections.emptyMap() : op.getDataMap();
Map<String, Object> configProps = new HashMap<>(currentConfig.getProperties());
configProps.putAll(map);
// remove a key which is set to null
map.forEach((k, v) -> {
if (v == null) configProps.remove(k);
});
return currentConfig.withProperties(configProps);
}
private void handleDiagnostics(SolrQueryResponse rsp, AutoScalingConfig autoScalingConf) throws IOException {
Policy policy = autoScalingConf.getPolicy();
try (CloudSolrClient build = new CloudSolrClient.Builder()

View File

@ -38,7 +38,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -65,6 +67,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.ExecutePlanAction.waitForTaskToFinish;
import static org.apache.solr.common.params.AutoScalingParams.ACTION_THROTTLE_PERIOD_SECONDS;
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS;
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_CORE_POOL_SIZE;
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS;
/**
* Responsible for scheduling active triggers, starting and stopping them and
@ -73,8 +79,18 @@ import static org.apache.solr.cloud.autoscaling.ExecutePlanAction.waitForTaskToF
public class ScheduledTriggers implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
static final int DEFAULT_COOLDOWN_PERIOD_MS = 5000;
static final int DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS = 5;
static final int DEFAULT_COOLDOWN_PERIOD_SECONDS = 5;
static final int DEFAULT_TRIGGER_CORE_POOL_SIZE = 4;
static final Map<String, Object> DEFAULT_PROPERTIES = new HashMap<>();
static {
DEFAULT_PROPERTIES.put(TRIGGER_SCHEDULE_DELAY_SECONDS, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
DEFAULT_PROPERTIES.put(TRIGGER_COOLDOWN_PERIOD_SECONDS, DEFAULT_COOLDOWN_PERIOD_SECONDS);
DEFAULT_PROPERTIES.put(TRIGGER_CORE_POOL_SIZE, DEFAULT_TRIGGER_CORE_POOL_SIZE);
DEFAULT_PROPERTIES.put(ACTION_THROTTLE_PERIOD_SECONDS, DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS);
}
private final Map<String, ScheduledTrigger> scheduledTriggers = new ConcurrentHashMap<>();
@ -96,9 +112,11 @@ public class ScheduledTriggers implements Closeable {
private final AtomicLong cooldownStart = new AtomicLong();
private final AtomicLong cooldownPeriod = new AtomicLong(TimeUnit.MILLISECONDS.toNanos(DEFAULT_COOLDOWN_PERIOD_MS));
private final AtomicLong cooldownPeriod = new AtomicLong(TimeUnit.SECONDS.toNanos(DEFAULT_COOLDOWN_PERIOD_SECONDS));
private final ActionThrottle actionThrottle;
private final AtomicInteger triggerDelay = new AtomicInteger(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
private final AtomicReference<ActionThrottle> actionThrottle;
private final SolrCloudManager dataProvider;
@ -113,34 +131,72 @@ public class ScheduledTriggers implements Closeable {
private AutoScalingConfig autoScalingConfig;
public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager dataProvider) {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
// how many triggers we have and secondly, that many threads will always be instantiated and kept around idle
// so it is wasteful as well. Hopefully 4 is a good compromise.
scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4,
scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(DEFAULT_TRIGGER_CORE_POOL_SIZE,
new DefaultSolrThreadFactory("ScheduledTrigger"));
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
// todo make the wait time configurable
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
actionThrottle = new AtomicReference<>(new ActionThrottle("action", TimeUnit.SECONDS.toMillis(DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS)));
this.dataProvider = dataProvider;
this.stateManager = dataProvider.getDistribStateManager();
this.loader = loader;
queueStats = new Stats();
listeners = new TriggerListeners();
// initialize cooldown timer
// todo: make the cooldownPeriod configurable
cooldownStart.set(System.nanoTime() - cooldownPeriod.get());
}
/**
* Set the current autoscaling config. This is invoked by {@link OverseerTriggerThread} when autoscaling.json is updated,
* and it re-initializes trigger listeners.
* and it re-initializes trigger listeners and other properties used by the framework
* @param autoScalingConfig current autoscaling.json
*/
public void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
Map<String, Object> currentProps = new HashMap<>(DEFAULT_PROPERTIES);
if (this.autoScalingConfig != null) {
currentProps.putAll(this.autoScalingConfig.getProperties());
}
for (Map.Entry<String, Object> entry : currentProps.entrySet()) {
Map<String, Object> newProps = autoScalingConfig.getProperties();
String key = entry.getKey();
if (newProps.containsKey(key) && !entry.getValue().equals(newProps.get(key))) {
log.debug("Changing value of autoscaling property: {} from: {} to: {}", key, entry.getValue(), newProps.get(key));
switch (key) {
case TRIGGER_SCHEDULE_DELAY_SECONDS:
triggerDelay.set(((Number) newProps.get(key)).intValue());
synchronized (this) {
scheduledTriggers.forEach((s, scheduledTrigger) -> {
if (scheduledTrigger.scheduledFuture.cancel(false)) {
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(
scheduledTrigger, 0, triggerDelay.get(), TimeUnit.SECONDS);
} else {
log.debug("Failed to cancel scheduled task: {}", s);
}
});
}
break;
case TRIGGER_COOLDOWN_PERIOD_SECONDS:
cooldownPeriod.set(TimeUnit.SECONDS.toNanos(((Number) newProps.get(key)).longValue()));
break;
case TRIGGER_CORE_POOL_SIZE:
this.scheduledThreadPoolExecutor.setCorePoolSize(((Number) newProps.get(key)).intValue());
break;
case ACTION_THROTTLE_PERIOD_SECONDS:
long minMsBetweenActions = TimeUnit.SECONDS.toMillis(((Number) newProps.get(key)).longValue());
ActionThrottle oldThrottle = this.actionThrottle.get();
ActionThrottle newThrottle = null;
if (oldThrottle.getLastActionStartedAt() != null) {
newThrottle = new ActionThrottle("action",
minMsBetweenActions,
oldThrottle.getLastActionStartedAt());
} else {
newThrottle = new ActionThrottle("action", minMsBetweenActions);
}
this.actionThrottle.set(newThrottle);
break;
}
}
}
this.autoScalingConfig = autoScalingConfig;
listeners.setAutoScalingConfig(autoScalingConfig);
}
@ -232,6 +288,7 @@ public class ScheduledTriggers implements Closeable {
log.debug("-- processing actions for " + event);
try {
// let the action executor thread wait instead of the trigger thread so we use the throttle here
ActionThrottle actionThrottle = this.actionThrottle.get();
actionThrottle.minimumWaitBetweenActions();
actionThrottle.markAttemptingAction();
@ -285,7 +342,7 @@ public class ScheduledTriggers implements Closeable {
}
});
newTrigger.init(); // mark as ready for scheduling
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, triggerDelay.get(), TimeUnit.SECONDS);
}
private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
@ -454,36 +511,43 @@ public class ScheduledTriggers implements Closeable {
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
// is still executing. There is additional protection against that scenario in the event listener.
if (!hasPendingActions.get()) {
// replay accumulated events on first run, if any
if (replay) {
TriggerEvent event;
// peek first without removing - we may crash before calling the listener
while ((event = queue.peekEvent()) != null) {
// override REPLAYING=true
event.getProperties().put(TriggerEvent.REPLAYING, true);
if (! trigger.getProcessor().process(event)) {
log.error("Failed to re-play event, discarding: " + event);
// this synchronization is usually never under contention
// but the only reason to have it here is to ensure that when the set-properties API is used
// to change the schedule delay, we can safely cancel the old scheduled task
// and create another one with the new delay without worrying about concurrent
// execution of the same trigger instance
synchronized (ScheduledTrigger.this) {
// replay accumulated events on first run, if any
if (replay) {
TriggerEvent event;
// peek first without removing - we may crash before calling the listener
while ((event = queue.peekEvent()) != null) {
// override REPLAYING=true
event.getProperties().put(TriggerEvent.REPLAYING, true);
if (! trigger.getProcessor().process(event)) {
log.error("Failed to re-play event, discarding: " + event);
}
queue.pollEvent(); // always remove it from queue
}
queue.pollEvent(); // always remove it from queue
// now restore saved state to possibly generate new events from old state on the first run
try {
trigger.restoreState();
} catch (Exception e) {
// log but don't throw - see below
log.error("Error restoring trigger state " + trigger.getName(), e);
}
replay = false;
}
// now restore saved state to possibly generate new events from old state on the first run
try {
trigger.restoreState();
trigger.run();
} catch (Exception e) {
// log but don't throw - see below
log.error("Error restoring trigger state " + trigger.getName(), e);
// log but do not propagate exception because an exception thrown from a scheduled operation
// will suppress future executions
log.error("Unexpected exception from trigger: " + trigger.getName(), e);
} finally {
// checkpoint after each run
trigger.saveState();
}
replay = false;
}
try {
trigger.run();
} catch (Exception e) {
// log but do not propagate exception because an exception thrown from a scheduled operation
// will suppress future executions
log.error("Unexpected exception from trigger: " + trigger.getName(), e);
} finally {
// checkpoint after each run
trigger.saveState();
}
}
}

View File

@ -25,9 +25,9 @@ import org.apache.solr.util.TimeSource;
import org.junit.Test;
public class ActionThrottleTest extends SolrTestCaseJ4 {
static class TestNanoTimeSource extends TimeSource {
private List<Long> returnValues;
private int index = 0;
@ -39,41 +39,41 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
public long getTime() {
return returnValues.get(index++);
}
}
// use the same time source as ActionThrottle
private static final TimeSource timeSource = TimeSource.NANO_TIME;
@Test
public void testBasics() throws Exception {
ActionThrottle at = new ActionThrottle("test", 1000);
long start = timeSource.getTime();
at.minimumWaitBetweenActions();
// should be no wait
assertTrue(TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS) < 1000);
at.markAttemptingAction();
if (random().nextBoolean()) Thread.sleep(100);
at.minimumWaitBetweenActions();
long elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
start = timeSource.getTime();
at.markAttemptingAction();
at.minimumWaitBetweenActions();
Thread.sleep(random().nextInt(1000));
elaspsedTime = TimeUnit.MILLISECONDS.convert(timeSource.getTime() - start, TimeUnit.NANOSECONDS);
assertTrue(elaspsedTime + "ms", elaspsedTime >= 995);
}
@ -93,4 +93,11 @@ public class ActionThrottleTest extends SolrTestCaseJ4 {
}
public void testCreateNewThrottleWithLastValue() throws Exception {
ActionThrottle throttle = new ActionThrottle("xyz", 1000, new TestNanoTimeSource(Arrays.asList(new Long[]{10L, 20L})));
throttle.markAttemptingAction();
assertEquals((Long)10L, throttle.getLastActionStartedAt());
throttle = new ActionThrottle("new_xyz", 1000, throttle.getLastActionStartedAt());
assertEquals((Long)10L, throttle.getLastActionStartedAt());
}
}

View File

@ -37,6 +37,7 @@ import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
@ -56,8 +57,9 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
* Test for AutoScalingHandler
*/
public class AutoScalingHandlerTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final static String CONFIGSET_NAME = "conf";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
@ -90,6 +92,19 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
}
}
public static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String message) {
return createAutoScalingRequest(m, null, message);
}
static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String subPath, String message) {
boolean useV1 = random().nextBoolean();
String path = useV1 ? "/admin/autoscaling" : "/cluster/autoscaling";
path += subPath != null ? subPath : "";
return useV1
? new AutoScalingRequest(m, path, message)
: new V2Request.Builder(path).withMethod(m).withPayload(message).build();
}
@Before
public void beforeTest() throws Exception {
// clear any persisted auto scaling configuration
@ -174,7 +189,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
List<String> changed = (List<String>)response.get("changed");
List<String> changed = (List<String>) response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_added_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
@ -197,7 +212,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
changed = (List<String>)response.get("changed");
changed = (List<String>) response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_added_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
@ -220,7 +235,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
changed = (List<String>)response.get("changed");
changed = (List<String>) response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_lost_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
@ -244,7 +259,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
changed = (List<String>)response.get("changed");
changed = (List<String>) response.get("changed");
assertEquals(1, changed.size());
assertTrue(changed.contains("node_lost_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
@ -799,17 +814,82 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
solrClient.request(CollectionAdminRequest.deleteCollection("COLL1"));
}
public static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String message) {
return createAutoScalingRequest(m, null, message);
}
@Test
public void testSetProperties() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
"\t\t\"pqr\" : \"abc\"\n" +
"\t}\n" +
"}";
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
NamedList<Object> response = solrClient.request(req);
Map properties = (Map) response.get("properties");
assertNotNull(properties);
assertEquals(1, properties.size());
assertEquals("abc", properties.get("pqr"));
static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String subPath, String message) {
boolean useV1 = random().nextBoolean();
String path = useV1 ? "/admin/autoscaling" : "/cluster/autoscaling";
path += subPath != null ? subPath : "";
return useV1
? new AutoScalingRequest(m, path, message)
: new V2Request.Builder(path).withMethod(m).withPayload(message).build();
setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
"\t\t\"xyz\" : 123\n" +
"\t}\n" +
"}";
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
response = solrClient.request(req);
properties = (Map) response.get("properties");
assertNotNull(properties);
assertEquals(2, properties.size());
assertEquals("abc", properties.get("pqr"));
assertEquals(123L, properties.get("xyz"));
setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
"\t\t\"xyz\" : 456\n" +
"\t}\n" +
"}";
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
response = solrClient.request(req);
properties = (Map) response.get("properties");
assertNotNull(properties);
assertEquals(2, properties.size());
assertEquals("abc", properties.get("pqr"));
assertEquals(456L, properties.get("xyz"));
setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
"\t\t\"xyz\" : null\n" +
"\t}\n" +
"}";
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
response = solrClient.request(req);
properties = (Map) response.get("properties");
assertNotNull(properties);
assertEquals(1, properties.size());
assertEquals("abc", properties.get("pqr"));
setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
"\t\t\"" + AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS + "\" : 5\n" +
"\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : 10\n" +
"\t\t\"" + AutoScalingParams.TRIGGER_CORE_POOL_SIZE + "\" : 10\n" +
"\t\t\"" + AutoScalingParams.ACTION_THROTTLE_PERIOD_SECONDS + "\" : 5\n" +
"\t}\n" +
"}";
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
response = solrClient.request(req);
properties = (Map) response.get("properties");
assertNotNull(properties);
assertEquals(5, properties.size());
assertEquals("abc", properties.get("pqr"));
assertEquals(5L, properties.get(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS));
assertEquals(10L, properties.get(AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS));
assertEquals(10L, properties.get(AutoScalingParams.TRIGGER_CORE_POOL_SIZE));
assertEquals(5L, properties.get(AutoScalingParams.ACTION_THROTTLE_PERIOD_SECONDS));
}
static class AutoScalingRequest extends SolrRequest {

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -47,10 +48,12 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.TimeSource;
@ -116,6 +119,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Before
public void setupTest() throws Exception {
throttlingDelayMs.set(TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS));
waitForSeconds = 1 + random().nextInt(3);
actionConstructorCalled = new CountDownLatch(1);
actionInitCalled = new CountDownLatch(1);
@ -251,6 +255,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
static AtomicLong lastActionExecutedAt = new AtomicLong(0);
static AtomicLong throttlingDelayMs = new AtomicLong(TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS));
static ReentrantLock lock = new ReentrantLock();
public static class ThrottlingTesterAction extends TestTriggerAction {
// nanos are very precise so we need a delta for comparison with ms
@ -268,8 +273,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
try {
if (lastActionExecutedAt.get() != 0) {
log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime());
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - lastActionExecutedAt.get(), TimeUnit.NANOSECONDS) < ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS) {
log.info("last action at " + lastActionExecutedAt.get() + " time = " + timeSource.getTime() + " expected diff: " + TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS));
if (timeSource.getTime() - lastActionExecutedAt.get() < TimeUnit.MILLISECONDS.toNanos(throttlingDelayMs.get() - DELTA_MS)) {
log.info("action executed again before minimum wait time from {}", event.getSource());
fail("TriggerListener was fired before the throttling period");
}
@ -1217,7 +1222,182 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
// must be larger than cooldown period
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.MILLISECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_MS));
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
prevTimestamp = ev.timestamp;
long modifiedCooldownPeriodSeconds = 7;
String setPropertiesCommand = "{\n" +
"\t\"set-properties\" : {\n" +
"\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : " + modifiedCooldownPeriodSeconds + "\n" +
"\t}\n" +
"}";
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
response = solrClient.request(req);
// reset the trigger and captured events
listenerEvents.clear();
triggerFiredLatch = new CountDownLatch(1);
triggerFired.compareAndSet(true, false);
JettySolrRunner newNode3 = cluster.startJettySolrRunner();
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
// wait for listener to capture the SUCCEEDED stage
Thread.sleep(2000);
// there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
capturedEvents = listenerEvents.get("bar");
assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
for (int i = 0; i < capturedEvents.size() - 1; i++) {
ev = capturedEvents.get(i);
assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
assertTrue(ev.toString(), ev.message.contains("cooldown"));
}
ev = capturedEvents.get(capturedEvents.size() - 1);
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
// must be larger than the modified cooldown period
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(modifiedCooldownPeriodSeconds));
}
public void testSetProperties() throws Exception {
JettySolrRunner runner = cluster.getJettySolrRunner(0);
SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
AtomicLong diff = new AtomicLong(0);
triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference
try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
scheduledTriggers.setAutoScalingConfig(config);
scheduledTriggers.add(new TriggerBase(TriggerEventType.NODELOST, "x", Collections.emptyMap(), resourceLoader, solrCloudManager) {
@Override
protected Map<String, Object> getState() {
return Collections.singletonMap("x","y");
}
@Override
protected void setState(Map<String, Object> state) {
}
@Override
public void restoreState(AutoScaling.Trigger old) {
}
@Override
public void run() {
if (getTriggerFiredLatch().getCount() == 0) return;
long l = diff.get();
diff.set(timeSource.getTime() - l);
getTriggerFiredLatch().countDown();
}
});
assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS));
assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0);
// change schedule delay
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
scheduledTriggers.setAutoScalingConfig(config);
triggerFiredLatch = new CountDownLatch(2);
assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0);
// reset with default properties
scheduledTriggers.remove("x"); // remove the old trigger
config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
scheduledTriggers.setAutoScalingConfig(config);
// test core thread count
List<AutoScaling.Trigger> triggerList = new ArrayList<>();
final Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
final Set<String> triggerNames = Collections.synchronizedSet(new HashSet<>());
triggerFiredLatch = new CountDownLatch(8);
for (int i = 0; i < 8; i++) {
triggerList.add(new MockTrigger(TriggerEventType.NODELOST, "x" + i, Collections.emptyMap(), resourceLoader, solrCloudManager) {
@Override
public void run() {
try {
// If core pool size is increased then new threads won't be started if existing threads
// aren't busy with tasks. So we make this thread wait longer than necessary
// so that the pool is forced to start threads for other triggers
Thread.sleep(5000);
} catch (InterruptedException e) {
}
if (triggerNames.add(getName())) {
getTriggerFiredLatch().countDown();
threadNames.add(Thread.currentThread().getName());
}
}
});
scheduledTriggers.add(triggerList.get(i));
}
assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
assertEquals("Expected 8 triggers but found: " + triggerNames,8, triggerNames.size());
assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE
+ " threads but found: " + threadNames,
ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size());
// change core pool size
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6));
scheduledTriggers.setAutoScalingConfig(config);
triggerFiredLatch = new CountDownLatch(8);
threadNames.clear();
triggerNames.clear();
assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
assertEquals("Expected 8 triggers but found: " + triggerNames,8, triggerNames.size());
assertEquals("Expected 6 threads but found: " + threadNames,6, threadNames.size());
// reset
for (int i = 0; i < 8; i++) {
scheduledTriggers.remove(triggerList.get(i).getName());
}
config = config.withProperties(Collections.singletonMap(AutoScalingParams.ACTION_THROTTLE_PERIOD_SECONDS, 6));
scheduledTriggers.setAutoScalingConfig(config);
lastActionExecutedAt.set(0);
throttlingDelayMs.set(TimeUnit.SECONDS.toMillis(6));
triggerFiredLatch = new CountDownLatch(2);
Map<String, Object> props = map("waitFor", 0L, "actions", Collections.singletonList(map("name","throttler", "class", ThrottlingTesterAction.class.getName())));
scheduledTriggers.add(new NodeAddedTrigger("y1", props, resourceLoader, solrCloudManager));
scheduledTriggers.add(new NodeAddedTrigger("y2", props, resourceLoader, solrCloudManager));
JettySolrRunner newNode = cluster.startJettySolrRunner();
assertTrue(getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
if (cluster.getJettySolrRunner(i) == newNode) {
cluster.stopJettySolrRunner(i);
break;
}
}
}
}
public static class MockTrigger extends TriggerBase {
public MockTrigger(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager cloudManager) {
super(eventType, name, properties, loader, cloudManager);
}
@Override
protected Map<String, Object> getState() {
return Collections.emptyMap();
}
@Override
protected void setState(Map<String, Object> state) {
}
@Override
public void restoreState(AutoScaling.Trigger old) {
}
@Override
public void run() {
}
}
public static class TestSearchRateAction extends TriggerActionBase {

View File

@ -507,3 +507,42 @@ The `remove-listener` command can be used to remove an existing listener. It acc
}
}
----
=== Change AutoScaling Properties
The `set-properties` command can be used to change the default properties used by the Autoscaling framework.
The following well known properties can be specified in the payload:
* `triggerScheduleDelaySeconds` (defaults to 1 second): This is the delay in seconds between two executions of a trigger. Every trigger is scheduled using Java's ScheduledThreadPoolExecutor with this delay.
* `triggerCooldownPeriodSeconds` (defaults to 5 seconds): Solr pauses all other triggers for this cool down period after a trigger fires so that the system can stabilize before running triggers again.
* `triggerCorePoolSize` (defaults to 4 threads): The core pool size of the `ScheduledThreadPoolExecutor` used to schedule triggers.
* `actionThrottlePeriodSeconds` (defaults to 5 seconds): This is the minimum throttling delay between executing actions for triggers. It is guaranteed that actions for two trigger events are executed after this delay period.
The command allows setting arbitrary properties in addition to the above well-known properties. Such arbitrary properties can be useful in custom `TriggerAction` instances.
.Change default triggerScheduleDelaySeconds
[source.json]
----
{
"set-properties": {
"triggerScheduleDelaySeconds": 8
}
}
----
The set-properties command replaces older values if present. So using set-properties to set the same value twice will overwrite the old value.
If a property is not specified then it retains the last set value or the default, if no change was made.
A changed value can be unset by using a null value.
.Revert changed value of triggerScheduleDelaySeconds to default value
[source.json]
----
{
"set-properties": {
"triggerScheduleDelaySeconds": null
}
}
----
The changed values of these properties, if any, can be read using the Autoscaling Read API in the `properties` section.

View File

@ -51,6 +51,7 @@ public class AutoScalingConfig implements MapWriter {
private Policy policy;
private Map<String, TriggerConfig> triggers;
private Map<String, TriggerListenerConfig> listeners;
private Map<String, Object> properties;
private final int zkVersion;
@ -324,11 +325,12 @@ public class AutoScalingConfig implements MapWriter {
}
private AutoScalingConfig(Policy policy, Map<String, TriggerConfig> triggerConfigs, Map<String,
TriggerListenerConfig> listenerConfigs, int zkVersion) {
TriggerListenerConfig> listenerConfigs, Map<String, Object> properties, int zkVersion) {
this.policy = policy;
this.triggers = triggerConfigs != null ? Collections.unmodifiableMap(triggerConfigs) : null;
this.listeners = listenerConfigs != null ? Collections.unmodifiableMap(listenerConfigs) : null;
this.jsonMap = null;
this.properties = properties != null ? Collections.unmodifiableMap(properties) : null;
this.zkVersion = zkVersion;
this.empty = policy == null &&
(triggerConfigs == null || triggerConfigs.isEmpty()) &&
@ -422,13 +424,38 @@ public class AutoScalingConfig implements MapWriter {
return listeners;
}
public Map<String, Object> getProperties() {
if (properties == null) {
if (jsonMap != null) {
Map<String, Object> map = (Map<String, Object>) jsonMap.get("properties");
if (map == null) {
this.properties = Collections.emptyMap();
} else {
this.properties = new HashMap<>(map);
}
} else {
this.properties = Collections.emptyMap();
}
}
return properties;
}
/**
* Create a copy of the config with replaced properties.
* @param properties the new properties map
* @return modified copy of the configuration
*/
public AutoScalingConfig withProperties(Map<String, Object> properties) {
return new AutoScalingConfig(policy, getTriggerConfigs(), getTriggerListenerConfigs(), properties, zkVersion);
}
/**
* Create a copy of the config with replaced policy.
* @param policy new policy
* @return modified copy of the configuration
*/
public AutoScalingConfig withPolicy(Policy policy) {
return new AutoScalingConfig(policy, getTriggerConfigs(), getTriggerListenerConfigs(), zkVersion);
return new AutoScalingConfig(policy, getTriggerConfigs(), getTriggerListenerConfigs(), getProperties(), zkVersion);
}
/**
@ -437,7 +464,7 @@ public class AutoScalingConfig implements MapWriter {
* @return modified copy of the configuration
*/
public AutoScalingConfig withTriggerConfigs(Map<String, TriggerConfig> configs) {
return new AutoScalingConfig(getPolicy(), configs, getTriggerListenerConfigs(), zkVersion);
return new AutoScalingConfig(getPolicy(), configs, getTriggerListenerConfigs(), getProperties(), zkVersion);
}
/**
@ -468,7 +495,7 @@ public class AutoScalingConfig implements MapWriter {
* @return modified copy of the configuration
*/
public AutoScalingConfig withTriggerListenerConfigs(Map<String, TriggerListenerConfig> configs) {
return new AutoScalingConfig(getPolicy(), getTriggerConfigs(), configs, zkVersion);
return new AutoScalingConfig(getPolicy(), getTriggerConfigs(), configs, getProperties(), zkVersion);
}
/**
@ -508,6 +535,7 @@ public class AutoScalingConfig implements MapWriter {
ew.put("triggers", getTriggerConfigs());
ew.put("listeners", getTriggerListenerConfigs());
ew.put("properties", getProperties());
}
public String toString() {
@ -523,7 +551,8 @@ public class AutoScalingConfig implements MapWriter {
if (!getPolicy().equals(that.getPolicy())) return false;
if (!getTriggerConfigs().equals(that.getTriggerConfigs())) return false;
return getTriggerListenerConfigs().equals(that.getTriggerListenerConfigs());
if (!getTriggerListenerConfigs().equals(that.getTriggerListenerConfigs())) return false;
return getProperties().equals(that.getProperties());
}
private static List<Object> getList(String key, Map<String, Object> properties) {

View File

@ -58,4 +58,11 @@ public interface AutoScalingParams {
String CMD_REMOVE_POLICY = "remove-policy";
String CMD_SET_CLUSTER_PREFERENCES = "set-cluster-preferences";
String CMD_SET_CLUSTER_POLICY = "set-cluster-policy";
String CMD_SET_PROPERTIES = "set-properties";
// properties
String TRIGGER_SCHEDULE_DELAY_SECONDS = "triggerScheduleDelaySeconds";
String TRIGGER_COOLDOWN_PERIOD_SECONDS = "triggerCooldownPeriodSeconds";
String TRIGGER_CORE_POOL_SIZE = "triggerCorePoolSize";
String ACTION_THROTTLE_PERIOD_SECONDS = "actionThrottlePeriodSeconds";
}

View File

@ -189,6 +189,11 @@
"required": [
"name"
]
},
"set-properties": {
"type": "object",
"description": "The set-properties command allows you to add and update properties used by autoscaling framework itself",
"additionalProperties": true
}
}
}