mirror of https://github.com/apache/lucene.git
SOLR-10602: Triggers should be able to restore state from old instances when taking over
This commit is contained in:
parent
9bb492e038
commit
dc9ab49967
|
@ -96,6 +96,8 @@ Bug Fixes
|
||||||
* SOLR-9837: Fix 55% performance regression of FieldCache uninvert time of
|
* SOLR-9837: Fix 55% performance regression of FieldCache uninvert time of
|
||||||
numeric fields. (yonik)
|
numeric fields. (yonik)
|
||||||
|
|
||||||
|
* SOLR-10602: Triggers should be able to restore state from old instances when taking over. (shalin)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -101,6 +101,8 @@ public class AutoScaling {
|
||||||
public TriggerListener<E> getListener();
|
public TriggerListener<E> getListener();
|
||||||
|
|
||||||
public boolean isClosed();
|
public boolean isClosed();
|
||||||
|
|
||||||
|
public void restoreState(Trigger<E> old);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TriggerFactory implements Closeable {
|
public static class TriggerFactory implements Closeable {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -78,6 +79,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
||||||
this.enabled = (boolean) properties.getOrDefault("enabled", true);
|
this.enabled = (boolean) properties.getOrDefault("enabled", true);
|
||||||
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
|
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
|
||||||
this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
|
this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
|
||||||
|
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -143,6 +145,20 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restoreState(AutoScaling.Trigger<NodeAddedEvent> old) {
|
||||||
|
assert old.isClosed();
|
||||||
|
if (old instanceof NodeAddedTrigger) {
|
||||||
|
NodeAddedTrigger that = (NodeAddedTrigger) old;
|
||||||
|
assert this.name.equals(that.name);
|
||||||
|
this.lastLiveNodes = new HashSet<>(that.lastLiveNodes);
|
||||||
|
this.nodeNameVsTimeAdded = new HashMap<>(that.nodeNameVsTimeAdded);
|
||||||
|
} else {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
||||||
|
"Unable to restore state from an unknown type of trigger");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -152,7 +168,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
||||||
throw new RuntimeException("Trigger has been closed");
|
throw new RuntimeException("Trigger has been closed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.debug("Running NodeAddedTrigger");
|
log.debug("Running NodeAddedTrigger {}", name);
|
||||||
|
|
||||||
ZkStateReader reader = container.getZkController().getZkStateReader();
|
ZkStateReader reader = container.getZkController().getZkStateReader();
|
||||||
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
|
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
|
||||||
|
@ -171,19 +187,21 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
|
||||||
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
|
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
|
||||||
copyOfNew.removeAll(lastLiveNodes);
|
copyOfNew.removeAll(lastLiveNodes);
|
||||||
copyOfNew.forEach(n -> {
|
copyOfNew.forEach(n -> {
|
||||||
log.info("Tracking new node: {}", n);
|
long nanoTime = System.nanoTime();
|
||||||
nodeNameVsTimeAdded.put(n, System.nanoTime());
|
nodeNameVsTimeAdded.put(n, nanoTime);
|
||||||
|
log.info("Tracking new node: {} at {} nanotime", n, nanoTime);
|
||||||
});
|
});
|
||||||
|
|
||||||
// has enough time expired to trigger events for a node?
|
// has enough time expired to trigger events for a node?
|
||||||
for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
|
for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
|
||||||
String nodeName = entry.getKey();
|
String nodeName = entry.getKey();
|
||||||
Long timeAdded = entry.getValue();
|
Long timeAdded = entry.getValue();
|
||||||
if (TimeUnit.SECONDS.convert(System.nanoTime() - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
|
long now = System.nanoTime();
|
||||||
|
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
|
||||||
// fire!
|
// fire!
|
||||||
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
|
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
log.info("NodeAddedTrigger firing registered listener");
|
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
|
||||||
listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName));
|
listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName));
|
||||||
}
|
}
|
||||||
trackingKeySet.remove(nodeName);
|
trackingKeySet.remove(nodeName);
|
||||||
|
|
|
@ -32,9 +32,9 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -146,6 +146,20 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void restoreState(AutoScaling.Trigger<NodeLostEvent> old) {
|
||||||
|
assert old.isClosed();
|
||||||
|
if (old instanceof NodeLostTrigger) {
|
||||||
|
NodeLostTrigger that = (NodeLostTrigger) old;
|
||||||
|
assert this.name.equals(that.name);
|
||||||
|
this.lastLiveNodes = new HashSet<>(that.lastLiveNodes);
|
||||||
|
this.nodeNameVsTimeRemoved = new HashMap<>(that.nodeNameVsTimeRemoved);
|
||||||
|
} else {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
||||||
|
"Unable to restore state from an unknown type of trigger");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -155,7 +169,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
|
||||||
throw new RuntimeException("Trigger has been closed");
|
throw new RuntimeException("Trigger has been closed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.debug("Running NodeLostTrigger");
|
log.debug("Running NodeLostTrigger: {}", name);
|
||||||
|
|
||||||
ZkStateReader reader = container.getZkController().getZkStateReader();
|
ZkStateReader reader = container.getZkController().getZkStateReader();
|
||||||
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
|
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
|
||||||
|
|
|
@ -179,12 +179,19 @@ public class OverseerTriggerThread implements Runnable, Closeable {
|
||||||
}
|
}
|
||||||
final Stat stat = new Stat();
|
final Stat stat = new Stat();
|
||||||
final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, this, stat, true);
|
final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, this, stat, true);
|
||||||
|
log.debug("{} watcher fired with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, stat.getVersion());
|
||||||
if (znodeVersion >= stat.getVersion()) {
|
if (znodeVersion >= stat.getVersion()) {
|
||||||
// protect against reordered watcher fires by ensuring that we only move forward
|
// protect against reordered watcher fires by ensuring that we only move forward
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
znodeVersion = stat.getVersion();
|
znodeVersion = stat.getVersion();
|
||||||
Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
|
Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
|
||||||
|
|
||||||
|
// remove all active triggers that have been removed from ZK
|
||||||
|
Set<String> trackingKeySet = activeTriggers.keySet();
|
||||||
|
trackingKeySet.retainAll(triggerMap.keySet());
|
||||||
|
|
||||||
|
// now lets add or remove triggers which have been enabled or disabled respectively
|
||||||
for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
|
for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
|
||||||
String triggerName = entry.getKey();
|
String triggerName = entry.getKey();
|
||||||
AutoScaling.Trigger trigger = entry.getValue();
|
AutoScaling.Trigger trigger = entry.getValue();
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
|
||||||
*/
|
*/
|
||||||
public class ScheduledTriggers implements Closeable {
|
public class ScheduledTriggers implements Closeable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
static final int SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
|
||||||
|
|
||||||
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
|
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
|
||||||
|
|
||||||
|
@ -67,7 +68,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
// how many triggers we have and secondly, that many threads will always be instantiated and kept around idle
|
// how many triggers we have and secondly, that many threads will always be instantiated and kept around idle
|
||||||
// so it is wasteful as well. Hopefully 4 is a good compromise.
|
// so it is wasteful as well. Hopefully 4 is a good compromise.
|
||||||
scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4,
|
scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(4,
|
||||||
new DefaultSolrThreadFactory("ScheduledTrigger-"));
|
new DefaultSolrThreadFactory("ScheduledTrigger"));
|
||||||
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
|
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
|
||||||
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||||
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
|
||||||
|
@ -93,6 +94,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
IOUtils.closeQuietly(old);
|
IOUtils.closeQuietly(old);
|
||||||
|
newTrigger.restoreState(old.trigger);
|
||||||
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
|
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
|
||||||
}
|
}
|
||||||
newTrigger.setListener(event -> {
|
newTrigger.setListener(event -> {
|
||||||
|
@ -115,7 +117,7 @@ public class ScheduledTriggers implements Closeable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, 1, TimeUnit.SECONDS);
|
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -43,27 +43,10 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void testTrigger() throws Exception {
|
||||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
Map<String, Object> props = new HashMap<>();
|
|
||||||
props.put("event", "nodeLost");
|
|
||||||
long waitForSeconds = 1 + random().nextInt(5);
|
long waitForSeconds = 1 + random().nextInt(5);
|
||||||
props.put("waitFor", waitForSeconds);
|
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||||
props.put("enabled", true);
|
|
||||||
List<Map<String, String>> actions = new ArrayList<>(3);
|
|
||||||
Map<String, String> map = new HashMap<>(2);
|
|
||||||
map.put("name", "compute_plan");
|
|
||||||
map.put("class", "solr.ComputePlanAction");
|
|
||||||
actions.add(map);
|
|
||||||
map = new HashMap<>(2);
|
|
||||||
map.put("name", "execute_plan");
|
|
||||||
map.put("class", "solr.ExecutePlanAction");
|
|
||||||
actions.add(map);
|
|
||||||
map = new HashMap<>(2);
|
|
||||||
map.put("name", "log_plan");
|
|
||||||
map.put("class", "solr.LogPlanAction");
|
|
||||||
actions.add(map);
|
|
||||||
props.put("actions", actions);
|
|
||||||
|
|
||||||
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
||||||
|
@ -130,4 +113,84 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
|
||||||
assertFalse(fired.get());
|
assertFalse(fired.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestoreState() throws Exception {
|
||||||
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
long waitForSeconds = 1 + random().nextInt(5);
|
||||||
|
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||||
|
|
||||||
|
// add a new node but update the trigger before the waitFor period expires
|
||||||
|
// and assert that the new trigger still fires
|
||||||
|
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
|
||||||
|
final long waitTime = 2;
|
||||||
|
props.put("waitFor", waitTime);
|
||||||
|
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
||||||
|
trigger.run();
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
trigger.run(); // this run should detect the new node
|
||||||
|
trigger.close(); // close the old trigger
|
||||||
|
|
||||||
|
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, container)) {
|
||||||
|
try {
|
||||||
|
newTrigger.restoreState(trigger);
|
||||||
|
fail("Trigger should only be able to restore state from an old trigger of the same name");
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
|
||||||
|
AtomicBoolean fired = new AtomicBoolean(false);
|
||||||
|
AtomicReference<NodeAddedTrigger.NodeAddedEvent> eventRef = new AtomicReference<>();
|
||||||
|
newTrigger.setListener(event -> {
|
||||||
|
if (fired.compareAndSet(false, true)) {
|
||||||
|
eventRef.set(event);
|
||||||
|
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS)) {
|
||||||
|
fail("NodeAddedListener was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fail("NodeAddedTrigger was fired more than once!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
newTrigger.restoreState(trigger); // restore state from the old trigger
|
||||||
|
int counter = 0;
|
||||||
|
do {
|
||||||
|
newTrigger.run();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
if (counter++ > 10) {
|
||||||
|
fail("Newly added node was not discovered by trigger even after 10 seconds");
|
||||||
|
}
|
||||||
|
} while (!fired.get());
|
||||||
|
|
||||||
|
// ensure the event was fired
|
||||||
|
assertTrue(fired.get());
|
||||||
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
|
||||||
|
assertNotNull(nodeAddedEvent);
|
||||||
|
assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> createTriggerProps(long waitForSeconds) {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put("event", "nodeLost");
|
||||||
|
props.put("waitFor", waitForSeconds);
|
||||||
|
props.put("enabled", true);
|
||||||
|
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||||
|
Map<String, String> map = new HashMap<>(2);
|
||||||
|
map.put("name", "compute_plan");
|
||||||
|
map.put("class", "solr.ComputePlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
map = new HashMap<>(2);
|
||||||
|
map.put("name", "execute_plan");
|
||||||
|
map.put("class", "solr.ExecutePlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
map = new HashMap<>(2);
|
||||||
|
map.put("name", "log_plan");
|
||||||
|
map.put("class", "solr.LogPlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
props.put("actions", actions);
|
||||||
|
return props;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,27 +44,10 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void testTrigger() throws Exception {
|
||||||
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
Map<String, Object> props = new HashMap<>();
|
|
||||||
props.put("event", "nodeLost");
|
|
||||||
long waitForSeconds = 1 + random().nextInt(5);
|
long waitForSeconds = 1 + random().nextInt(5);
|
||||||
props.put("waitFor", waitForSeconds);
|
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||||
props.put("enabled", true);
|
|
||||||
List<Map<String, String>> actions = new ArrayList<>(3);
|
|
||||||
Map<String, String> map = new HashMap<>(2);
|
|
||||||
map.put("name", "compute_plan");
|
|
||||||
map.put("class", "solr.ComputePlanAction");
|
|
||||||
actions.add(map);
|
|
||||||
map = new HashMap<>(2);
|
|
||||||
map.put("name", "execute_plan");
|
|
||||||
map.put("class", "solr.ExecutePlanAction");
|
|
||||||
actions.add(map);
|
|
||||||
map = new HashMap<>(2);
|
|
||||||
map.put("name", "log_plan");
|
|
||||||
map.put("class", "solr.LogPlanAction");
|
|
||||||
actions.add(map);
|
|
||||||
props.put("actions", actions);
|
|
||||||
|
|
||||||
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||||
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
||||||
|
@ -119,7 +102,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
fail("NodeLostListener was fired more than once!");
|
fail("NodeLostListener was fired more than once!");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
trigger.run(); // first run should detect the new node
|
trigger.run(); // first run should detect the lost node
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
do {
|
do {
|
||||||
if (container.getZkController().getZkStateReader().getClusterState().getLiveNodes().size() == 3) {
|
if (container.getZkController().getZkStateReader().getClusterState().getLiveNodes().size() == 3) {
|
||||||
|
@ -144,4 +127,83 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
|
||||||
assertFalse(fired.get());
|
assertFalse(fired.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestoreState() throws Exception {
|
||||||
|
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
|
||||||
|
long waitForSeconds = 1 + random().nextInt(5);
|
||||||
|
Map<String, Object> props = createTriggerProps(waitForSeconds);
|
||||||
|
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
String lostNodeName = newNode.getNodeName();
|
||||||
|
|
||||||
|
// remove a node but update the trigger before the waitFor period expires
|
||||||
|
// and assert that the new trigger still fires
|
||||||
|
|
||||||
|
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
|
||||||
|
trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
|
||||||
|
trigger.run();
|
||||||
|
newNode.stop();
|
||||||
|
trigger.run(); // this run should detect the lost node
|
||||||
|
trigger.close(); // close the old trigger
|
||||||
|
|
||||||
|
try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, container)) {
|
||||||
|
try {
|
||||||
|
newTrigger.restoreState(trigger);
|
||||||
|
fail("Trigger should only be able to restore state from an old trigger of the same name");
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
|
||||||
|
AtomicBoolean fired = new AtomicBoolean(false);
|
||||||
|
AtomicReference<NodeLostTrigger.NodeLostEvent> eventRef = new AtomicReference<>();
|
||||||
|
newTrigger.setListener(event -> {
|
||||||
|
if (fired.compareAndSet(false, true)) {
|
||||||
|
eventRef.set(event);
|
||||||
|
if (System.nanoTime() - event.getEventNanoTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
|
||||||
|
fail("NodeLostListener was fired before the configured waitFor period");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fail("NodeLostListener was fired more than once!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
newTrigger.restoreState(trigger); // restore state from the old trigger
|
||||||
|
int counter = 0;
|
||||||
|
do {
|
||||||
|
newTrigger.run();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
if (counter++ > 10) {
|
||||||
|
fail("Lost node was not discovered by trigger even after 10 seconds");
|
||||||
|
}
|
||||||
|
} while (!fired.get());
|
||||||
|
|
||||||
|
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
|
||||||
|
assertNotNull(nodeLostEvent);
|
||||||
|
assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> createTriggerProps(long waitForSeconds) {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put("event", "nodeLost");
|
||||||
|
props.put("waitFor", waitForSeconds);
|
||||||
|
props.put("enabled", true);
|
||||||
|
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||||
|
Map<String, String> map = new HashMap<>(2);
|
||||||
|
map.put("name", "compute_plan");
|
||||||
|
map.put("class", "solr.ComputePlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
map = new HashMap<>(2);
|
||||||
|
map.put("name", "execute_plan");
|
||||||
|
map.put("class", "solr.ExecutePlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
map = new HashMap<>(2);
|
||||||
|
map.put("name", "log_plan");
|
||||||
|
map.put("class", "solr.LogPlanAction");
|
||||||
|
actions.add(map);
|
||||||
|
props.put("actions", actions);
|
||||||
|
return props;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -26,18 +27,27 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.util.LogLevel;
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.apache.solr.util.TimeOut;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.SCHEDULED_TRIGGER_DELAY_SECONDS;
|
||||||
|
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An end-to-end integration test for triggers
|
* An end-to-end integration test for triggers
|
||||||
*/
|
*/
|
||||||
|
@ -51,28 +61,159 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
private static AtomicBoolean triggerFired;
|
private static AtomicBoolean triggerFired;
|
||||||
private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
|
private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
|
||||||
|
|
||||||
|
private String path;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
configureCluster(2)
|
configureCluster(2)
|
||||||
.addConfig("conf", configset("cloud-minimal"))
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
.configure();
|
.configure();
|
||||||
waitForSeconds = 1 + random().nextInt(3);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupTest() {
|
public void setupTest() throws KeeperException, InterruptedException, IOException, SolrServerException {
|
||||||
|
waitForSeconds = 1 + random().nextInt(3);
|
||||||
actionCreated = new CountDownLatch(1);
|
actionCreated = new CountDownLatch(1);
|
||||||
triggerFiredLatch = new CountDownLatch(1);
|
triggerFiredLatch = new CountDownLatch(1);
|
||||||
triggerFired = new AtomicBoolean(false);
|
triggerFired = new AtomicBoolean(false);
|
||||||
eventRef = new AtomicReference<>();
|
eventRef = new AtomicReference<>();
|
||||||
|
// clear any persisted auto scaling configuration
|
||||||
|
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||||
|
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||||
|
// todo nocommit -- add testing for the v2 path
|
||||||
|
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
|
||||||
|
this.path = "/admin/autoscaling";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeLostTriggerRestoreState() throws Exception {
|
||||||
|
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||||
|
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
|
||||||
|
|
||||||
|
// start a new node
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
String nodeName = newNode.getNodeName();
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
waitForSeconds = 5;
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_restore_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS);
|
||||||
|
while (actionCreated.getCount() == 0 && !timeOut.hasTimedOut()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds",actionCreated.getCount() > 0);
|
||||||
|
|
||||||
|
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
||||||
|
int index = -1;
|
||||||
|
for (int i = 0; i < jettySolrRunners.size(); i++) {
|
||||||
|
JettySolrRunner runner = jettySolrRunners.get(i);
|
||||||
|
if (runner == newNode) index = i;
|
||||||
|
}
|
||||||
|
assertFalse(index == -1);
|
||||||
|
cluster.stopJettySolrRunner(index);
|
||||||
|
|
||||||
|
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
||||||
|
Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
waitForSeconds = 0;
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_lost_restore_trigger'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// wait until the second instance of action is created
|
||||||
|
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("Two TriggerAction instances should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
|
||||||
|
assertNotNull(nodeLostEvent);
|
||||||
|
assertEquals("The node added trigger was fired but for a different node",
|
||||||
|
nodeName, nodeLostEvent.getNodeName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeAddedTriggerRestoreState() throws Exception {
|
||||||
|
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||||
|
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
|
||||||
|
|
||||||
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
waitForSeconds = 5;
|
||||||
|
String setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_restore_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
NamedList<Object> response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS);
|
||||||
|
while (actionCreated.getCount() == 0 && !timeOut.hasTimedOut()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
}
|
||||||
|
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds",actionCreated.getCount() > 0);
|
||||||
|
|
||||||
|
// start a new node
|
||||||
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
|
||||||
|
// ensure that the old trigger sees the new node, todo find a better way to do this
|
||||||
|
Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
waitForSeconds = 0;
|
||||||
|
setTriggerCommand = "{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_restore_trigger'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}";
|
||||||
|
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
|
||||||
|
response = solrClient.request(req);
|
||||||
|
assertEquals(response.get("result").toString(), "success");
|
||||||
|
|
||||||
|
// wait until the second instance of action is created
|
||||||
|
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
|
||||||
|
fail("Two TriggerAction instances should have been created by now");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
assertTrue("The trigger did not fire at all", await);
|
||||||
|
assertTrue(triggerFired.get());
|
||||||
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
|
||||||
|
assertNotNull(nodeAddedEvent);
|
||||||
|
assertEquals("The node added trigger was fired but for a different node",
|
||||||
|
newNode.getNodeName(), nodeAddedEvent.getNodeName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeAddedTrigger() throws Exception {
|
public void testNodeAddedTrigger() throws Exception {
|
||||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
// todo nocommit -- add testing for the v2 path
|
|
||||||
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
|
|
||||||
String path = "/admin/autoscaling";
|
|
||||||
String setTriggerCommand = "{" +
|
String setTriggerCommand = "{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
|
@ -102,9 +243,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
@Test
|
@Test
|
||||||
public void testNodeLostTrigger() throws Exception {
|
public void testNodeLostTrigger() throws Exception {
|
||||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
// todo nocommit -- add testing for the v2 path
|
|
||||||
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
|
|
||||||
String path = "/admin/autoscaling";
|
|
||||||
String setTriggerCommand = "{" +
|
String setTriggerCommand = "{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
"'name' : 'node_lost_trigger'," +
|
"'name' : 'node_lost_trigger'," +
|
||||||
|
|
Loading…
Reference in New Issue