mirror of https://github.com/apache/lucene.git
SOLR-10643: Throttling strategy for triggers and policy executions
This commit is contained in:
parent
6a3f22ffd4
commit
269fdf4613
|
@ -129,6 +129,8 @@ Other Changes
|
||||||
Add support for selecting specific properties from any compound metric using 'property' parameter to
|
Add support for selecting specific properties from any compound metric using 'property' parameter to
|
||||||
/admin/metrics handler. (ab)
|
/admin/metrics handler. (ab)
|
||||||
|
|
||||||
|
* SOLR-10643: Throttling strategy for triggers and policy executions. (shalin)
|
||||||
|
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
================== 6.6.0 ==================
|
================== 6.6.0 ==================
|
||||||
|
|
|
@ -58,13 +58,19 @@ public class AutoScaling {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
|
public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
|
||||||
public void triggerFired(E event);
|
/**
|
||||||
|
* This method is executed when a trigger is ready to fire.
|
||||||
|
*
|
||||||
|
* @param event a subclass of {@link TriggerEvent}
|
||||||
|
* @return true if the listener was ready to perform actions on the event, false otherwise.
|
||||||
|
*/
|
||||||
|
public boolean triggerFired(E event);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class HttpCallbackListener implements TriggerListener {
|
public static class HttpCallbackListener implements TriggerListener {
|
||||||
@Override
|
@Override
|
||||||
public void triggerFired(TriggerEvent event) {
|
public boolean triggerFired(TriggerEvent event) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,6 +86,11 @@ public class AutoScaling {
|
||||||
* is encouraged that implementations be immutable with the exception of the associated listener
|
* is encouraged that implementations be immutable with the exception of the associated listener
|
||||||
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
|
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
|
||||||
* should use appropriate synchronization around the listener.
|
* should use appropriate synchronization around the listener.
|
||||||
|
* <p>
|
||||||
|
* When a trigger is ready to fire, it calls the {@link TriggerListener#triggerFired(TriggerEvent)} event
|
||||||
|
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
|
||||||
|
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
|
||||||
|
* at the next invocation of the run() method.
|
||||||
*
|
*
|
||||||
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
|
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -202,9 +202,13 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
||||||
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
|
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
|
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
|
||||||
listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName));
|
if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
|
||||||
|
// remove from tracking set only if the fire was accepted
|
||||||
|
trackingKeySet.remove(nodeName);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trackingKeySet.remove(nodeName);
|
||||||
}
|
}
|
||||||
trackingKeySet.remove(nodeName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,9 +197,12 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
||||||
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
|
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
log.info("NodeLostTrigger firing registered listener");
|
log.info("NodeLostTrigger firing registered listener");
|
||||||
listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName));
|
if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName))) {
|
||||||
|
trackingKeySet.remove(nodeName);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trackingKeySet.remove(nodeName);
|
||||||
}
|
}
|
||||||
trackingKeySet.remove(nodeName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,10 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
|
import org.apache.solr.cloud.ActionThrottle;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
import org.apache.solr.common.util.IOUtils;
|
import org.apache.solr.common.util.IOUtils;
|
||||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||||
|
@ -45,7 +47,8 @@ import org.slf4j.LoggerFactory;
|
||||||
*/
|
*/
|
||||||
public class ScheduledTriggers implements Closeable {
|
public class ScheduledTriggers implements Closeable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
static final int SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
|
static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
|
||||||
|
static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
|
||||||
|
|
||||||
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
|
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
|
||||||
|
|
||||||
|
@ -55,12 +58,18 @@ public class ScheduledTriggers implements Closeable {
|
||||||
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
|
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Single threaded executor to run the actions upon a trigger event
|
* Single threaded executor to run the actions upon a trigger event. We rely on this being a single
|
||||||
|
* threaded executor to ensure that trigger fires do not step on each other as well as to ensure
|
||||||
|
* that we do not run scheduled trigger threads while an action has been submitted to this executor
|
||||||
*/
|
*/
|
||||||
private final ExecutorService actionExecutor;
|
private final ExecutorService actionExecutor;
|
||||||
|
|
||||||
private boolean isClosed = false;
|
private boolean isClosed = false;
|
||||||
|
|
||||||
|
private final AtomicBoolean hasPendingActions = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private final ActionThrottle actionThrottle;
|
||||||
|
|
||||||
public ScheduledTriggers() {
|
public ScheduledTriggers() {
|
||||||
// todo make the core pool size configurable
|
// todo make the core pool size configurable
|
||||||
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
|
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
|
||||||
|
@ -72,6 +81,8 @@ public class ScheduledTriggers implements Closeable {
|
||||||
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
|
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
|
||||||
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||||
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
||||||
|
// todo make the wait time configurable
|
||||||
|
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -101,23 +112,36 @@ public class ScheduledTriggers implements Closeable {
|
||||||
AutoScaling.Trigger source = event.getSource();
|
AutoScaling.Trigger source = event.getSource();
|
||||||
if (source.isClosed()) {
|
if (source.isClosed()) {
|
||||||
log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
|
log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
|
||||||
return;
|
// we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
List<TriggerAction> actions = source.getActions();
|
if (hasPendingActions.compareAndSet(false, true)) {
|
||||||
if (actions != null) {
|
List<TriggerAction> actions = source.getActions();
|
||||||
actionExecutor.submit(() -> {
|
if (actions != null) {
|
||||||
for (TriggerAction action : actions) {
|
actionExecutor.submit(() -> {
|
||||||
try {
|
assert hasPendingActions.get();
|
||||||
action.process(event);
|
// let the action executor thread wait instead of the trigger thread so we use the throttle here
|
||||||
} catch (Exception e) {
|
actionThrottle.minimumWaitBetweenActions();
|
||||||
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
|
actionThrottle.markAttemptingAction();
|
||||||
throw e;
|
for (TriggerAction action : actions) {
|
||||||
|
try {
|
||||||
|
action.process(event);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
hasPendingActions.set(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
}
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// there is an action in the queue and we don't want to enqueue another until it is complete
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
|
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -155,7 +179,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
|
ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ScheduledTrigger implements Closeable {
|
private class ScheduledTrigger implements Runnable, Closeable {
|
||||||
AutoScaling.Trigger trigger;
|
AutoScaling.Trigger trigger;
|
||||||
ScheduledFuture<?> scheduledFuture;
|
ScheduledFuture<?> scheduledFuture;
|
||||||
|
|
||||||
|
@ -163,6 +187,22 @@ public class ScheduledTriggers implements Closeable {
|
||||||
this.trigger = trigger;
|
this.trigger = trigger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// fire a trigger only if an action is not pending
|
||||||
|
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
|
||||||
|
// is still executing. There is additional protection against that scenario in the event listener.
|
||||||
|
if (!hasPendingActions.get()) {
|
||||||
|
try {
|
||||||
|
trigger.run();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// log but do not propagate exception because an exception thrown from a scheduled operation
|
||||||
|
// will suppress future executions
|
||||||
|
log.error("Unexpected execution from trigger: " + trigger.getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (scheduledFuture != null) {
|
if (scheduledFuture != null) {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
@ -35,6 +36,12 @@ import org.junit.Test;
|
||||||
* Test for {@link NodeAddedTrigger}
|
* Test for {@link NodeAddedTrigger}
|
||||||
*/
|
*/
|
||||||
public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
private AutoScaling.TriggerListener<NodeAddedTrigger.NodeAddedEvent> noFirstRunListener = event -> {
|
||||||
|
fail("Did not expect the listener to fire on first run!");
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
configureCluster(1)
|
configureCluster(1)
|
||||||
|
@ -49,7 +56,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||||
|
|
||||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(noFirstRunListener);
|
||||||
trigger.run();
|
trigger.run();
|
||||||
|
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
@ -64,6 +71,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
} else {
|
} else {
|
||||||
fail("NodeAddedTrigger was fired more than once!");
|
fail("NodeAddedTrigger was fired more than once!");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
do {
|
do {
|
||||||
|
@ -84,7 +92,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
||||||
final long waitTime = 2;
|
final long waitTime = 2;
|
||||||
props.put("waitFor", waitTime);
|
props.put("waitFor", waitTime);
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(noFirstRunListener);
|
||||||
trigger.run();
|
trigger.run();
|
||||||
|
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
@ -97,6 +105,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
} else {
|
} else {
|
||||||
fail("NodeAddedTrigger was fired more than once!");
|
fail("NodeAddedTrigger was fired more than once!");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
trigger.run(); // first run should detect the new node
|
trigger.run(); // first run should detect the new node
|
||||||
newNode.stop(); // stop the new jetty
|
newNode.stop(); // stop the new jetty
|
||||||
|
@ -114,6 +123,38 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerAcceptance() throws Exception {
|
||||||
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
Map<String, Object> props = createTriggerProps(0);
|
||||||
|
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
||||||
|
trigger.setListener(noFirstRunListener);
|
||||||
|
trigger.run(); // starts tracking live nodes
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
AtomicInteger callCount = new AtomicInteger(0);
|
||||||
|
AtomicBoolean fired = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
trigger.setListener(event -> {
|
||||||
|
if (callCount.incrementAndGet() < 2) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
fired.compareAndSet(false, true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
trigger.run(); // first run should detect the new node and fire immediately but listener isn't ready
|
||||||
|
assertEquals(1, callCount.get());
|
||||||
|
assertFalse(fired.get());
|
||||||
|
trigger.run(); // second run should again fire
|
||||||
|
assertEquals(2, callCount.get());
|
||||||
|
assertTrue(fired.get());
|
||||||
|
trigger.run(); // should not fire
|
||||||
|
assertEquals(2, callCount.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRestoreState() throws Exception {
|
public void testRestoreState() throws Exception {
|
||||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
@ -125,7 +166,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
|
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
|
||||||
final long waitTime = 2;
|
final long waitTime = 2;
|
||||||
props.put("waitFor", waitTime);
|
props.put("waitFor", waitTime);
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(noFirstRunListener);
|
||||||
trigger.run();
|
trigger.run();
|
||||||
|
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
@ -153,6 +194,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
} else {
|
} else {
|
||||||
fail("NodeAddedTrigger was fired more than once!");
|
fail("NodeAddedTrigger was fired more than once!");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
newTrigger.restoreState(trigger); // restore state from the old trigger
|
newTrigger.restoreState(trigger); // restore state from the old trigger
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
@ -36,6 +37,11 @@ import org.junit.Test;
|
||||||
*/
|
*/
|
||||||
public class NodeLostTriggerTest extends SolrCloudTestCase {
|
public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
private AutoScaling.TriggerListener<NodeLostTrigger.NodeLostEvent> noFirstRunListener = event -> {
|
||||||
|
fail("Did not expect the listener to fire on first run!");
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
configureCluster(5)
|
configureCluster(5)
|
||||||
|
@ -50,7 +56,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||||
|
|
||||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(noFirstRunListener);
|
||||||
trigger.run();
|
trigger.run();
|
||||||
String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
|
String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
|
||||||
cluster.stopJettySolrRunner(1);
|
cluster.stopJettySolrRunner(1);
|
||||||
|
@ -66,6 +72,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
} else {
|
} else {
|
||||||
fail("NodeLostListener was fired more than once!");
|
fail("NodeLostListener was fired more than once!");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
do {
|
do {
|
||||||
|
@ -87,7 +94,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||||
final long waitTime = 2;
|
final long waitTime = 2;
|
||||||
props.put("waitFor", waitTime);
|
props.put("waitFor", waitTime);
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(noFirstRunListener);
|
||||||
trigger.run();
|
trigger.run();
|
||||||
|
|
||||||
JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
|
JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
|
||||||
|
@ -101,6 +108,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
} else {
|
} else {
|
||||||
fail("NodeLostListener was fired more than once!");
|
fail("NodeLostListener was fired more than once!");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
trigger.run(); // first run should detect the lost node
|
trigger.run(); // first run should detect the lost node
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
|
@ -128,6 +136,51 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerAcceptance() throws Exception {
|
||||||
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
Map<String, Object> props = createTriggerProps(0);
|
||||||
|
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
|
||||||
|
trigger.setListener(noFirstRunListener);
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
cluster.waitForAllNodes(5);
|
||||||
|
|
||||||
|
trigger.run(); // starts tracking live nodes
|
||||||
|
|
||||||
|
// stop the newly created node
|
||||||
|
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
||||||
|
for (int i = 0; i < jettySolrRunners.size(); i++) {
|
||||||
|
JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
|
||||||
|
if (newNode == jettySolrRunner) {
|
||||||
|
cluster.stopJettySolrRunner(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicInteger callCount = new AtomicInteger(0);
|
||||||
|
AtomicBoolean fired = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
trigger.setListener(event -> {
|
||||||
|
if (callCount.incrementAndGet() < 2) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
fired.compareAndSet(false, true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
|
||||||
|
assertEquals(1, callCount.get());
|
||||||
|
assertFalse(fired.get());
|
||||||
|
trigger.run(); // second run should again fire
|
||||||
|
assertEquals(2, callCount.get());
|
||||||
|
assertTrue(fired.get());
|
||||||
|
trigger.run(); // should not fire
|
||||||
|
assertEquals(2, callCount.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRestoreState() throws Exception {
|
public void testRestoreState() throws Exception {
|
||||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
@ -141,7 +194,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
// and assert that the new trigger still fires
|
// and assert that the new trigger still fires
|
||||||
|
|
||||||
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
|
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(noFirstRunListener);
|
||||||
trigger.run();
|
trigger.run();
|
||||||
newNode.stop();
|
newNode.stop();
|
||||||
trigger.run(); // this run should detect the lost node
|
trigger.run(); // this run should detect the lost node
|
||||||
|
@ -168,6 +221,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
} else {
|
} else {
|
||||||
fail("NodeLostListener was fired more than once!");
|
fail("NodeLostListener was fired more than once!");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
});
|
});
|
||||||
newTrigger.restoreState(trigger); // restore state from the old trigger
|
newTrigger.restoreState(trigger); // restore state from the old trigger
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
|
|
|
@ -24,7 +24,9 @@ import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
@ -45,7 +47,7 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.SCHEDULED_TRIGGER_DELAY_SECONDS;
|
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -85,6 +87,141 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
this.path = "/admin/autoscaling";
|
this.path = "/admin/autoscaling";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTriggerThrottling() throws Exception {
|
||||||
|
// for this test we want to create two triggers so we must assert that the actions were created twice
|
||||||
|
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
|
||||||
|
// similarly we want both triggers to fire
|
||||||
|
triggerFiredLatch = new CountDownLatch(2);
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
|
||||||
|
// first trigger
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_trigger1'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '0s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// second trigger
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_trigger2'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '0s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// wait until the two instances of action are created
|
||||||
|
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("Two TriggerAction instances should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
|
||||||
|
if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("Both triggers should have fired by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset shared state
|
||||||
|
lastActionExecutedAt.set(0);
|
||||||
|
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
|
||||||
|
triggerFiredLatch = new CountDownLatch(2);
|
||||||
|
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_trigger1'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '0s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_trigger2'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '0s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// wait until the two instances of action are created
|
||||||
|
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("Two TriggerAction instances should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop the node we had started earlier
|
||||||
|
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
||||||
|
for (int i = 0; i < jettySolrRunners.size(); i++) {
|
||||||
|
JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
|
||||||
|
if (jettySolrRunner == newNode) {
|
||||||
|
cluster.stopJettySolrRunner(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
fail("Both triggers should have fired by now");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static AtomicLong lastActionExecutedAt = new AtomicLong(0);
|
||||||
|
static ReentrantLock lock = new ReentrantLock();
|
||||||
|
public static class ThrottingTesterAction extends TestTriggerAction {
|
||||||
|
// nanos are very precise so we need a delta for comparison with ms
|
||||||
|
private static final long DELTA_MS = 2;
|
||||||
|
|
||||||
|
// sanity check that an action instance is only invoked once
|
||||||
|
private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(AutoScaling.TriggerEvent event) {
|
||||||
|
boolean locked = lock.tryLock();
|
||||||
|
if (!locked) {
|
||||||
|
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (lastActionExecutedAt.get() != 0) {
|
||||||
|
log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
|
||||||
|
if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
|
||||||
|
log.info("action executed again before minimum wait time from {}", event.getSource().getName());
|
||||||
|
fail("TriggerListener was fired before the throttling period");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (onlyOnce.compareAndSet(false, true)) {
|
||||||
|
log.info("action executed from {}", event.getSource().getName());
|
||||||
|
lastActionExecutedAt.set(System.nanoTime());
|
||||||
|
triggerFiredLatch.countDown();
|
||||||
|
} else {
|
||||||
|
log.info("action executed more than once from {}", event.getSource().getName());
|
||||||
|
fail("Trigger should not have fired more than once!");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (locked) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeLostTriggerRestoreState() throws Exception {
|
public void testNodeLostTriggerRestoreState() throws Exception {
|
||||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||||
|
@ -124,7 +261,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
cluster.stopJettySolrRunner(index);
|
cluster.stopJettySolrRunner(index);
|
||||||
|
|
||||||
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
||||||
Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
|
Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
|
||||||
|
|
||||||
waitForSeconds = 0;
|
waitForSeconds = 0;
|
||||||
setTriggerCommand = "{" +
|
setTriggerCommand = "{" +
|
||||||
|
@ -182,7 +319,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
|
||||||
// ensure that the old trigger sees the new node, todo find a better way to do this
|
// ensure that the old trigger sees the new node, todo find a better way to do this
|
||||||
Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
|
Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
|
||||||
|
|
||||||
waitForSeconds = 0;
|
waitForSeconds = 0;
|
||||||
setTriggerCommand = "{" +
|
setTriggerCommand = "{" +
|
||||||
|
|
Loading…
Reference in New Issue