mirror of https://github.com/apache/lucene.git
SOLR-13118: Fix various nodeAdded/nodeLost trigger (integration) tests related to restoriung state
This includes some cleanup and refactoring of unrelated test methods in the same classes to use new helper methods
This commit is contained in:
parent
6e745bd250
commit
5a513fab83
|
@ -511,6 +511,19 @@ public class ScheduledTriggers implements Closeable {
|
||||||
return Collections.unmodifiableSet(new HashSet<>(scheduledTriggerWrappers.keySet())); // shallow copy
|
return Collections.unmodifiableSet(new HashSet<>(scheduledTriggerWrappers.keySet())); // shallow copy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For use in white/grey box testing: The Trigger returned may be inspected,
|
||||||
|
* but should not be modified in any way.
|
||||||
|
*
|
||||||
|
* @param name the name of an existing trigger
|
||||||
|
* @return the current scheduled trigger with that name, or null if none exists
|
||||||
|
* @lucene.internal
|
||||||
|
*/
|
||||||
|
public synchronized AutoScaling.Trigger getTrigger(String name) {
|
||||||
|
TriggerWrapper w = scheduledTriggerWrappers.get(name);
|
||||||
|
return (null == w) ? null : w.trigger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
|
|
@ -222,9 +222,20 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
|
||||||
*/
|
*/
|
||||||
protected abstract void setState(Map<String,Object> state);
|
protected abstract void setState(Map<String,Object> state);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an immutable deep copy of this trigger's state, suitible for saving.
|
||||||
|
* This method is public only for tests that wish to do grey-box introspection
|
||||||
|
*
|
||||||
|
* @see #getState
|
||||||
|
* @lucene.internal
|
||||||
|
*/
|
||||||
|
public Map<String,Object> deepCopyState() {
|
||||||
|
return Utils.getDeepCopy(getState(), 10, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveState() {
|
public void saveState() {
|
||||||
Map<String,Object> state = Utils.getDeepCopy(getState(), 10, false, true);
|
Map<String,Object> state = deepCopyState();
|
||||||
if (lastState != null && lastState.equals(state)) {
|
if (lastState != null && lastState.equals(state)) {
|
||||||
// skip saving if identical
|
// skip saving if identical
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -200,6 +200,7 @@ public class CloudTestUtils {
|
||||||
*
|
*
|
||||||
* @param cloudManager current instance of {@link SolrCloudManager}
|
* @param cloudManager current instance of {@link SolrCloudManager}
|
||||||
* @param triggerName the name of the trigger we need to see sheduled in order to return successfully
|
* @param triggerName the name of the trigger we need to see sheduled in order to return successfully
|
||||||
|
* @see #suspendTrigger
|
||||||
*/
|
*/
|
||||||
public static long waitForTriggerToBeScheduled(final SolrCloudManager cloudManager,
|
public static long waitForTriggerToBeScheduled(final SolrCloudManager cloudManager,
|
||||||
final String triggerName)
|
final String triggerName)
|
||||||
|
@ -230,17 +231,33 @@ public class CloudTestUtils {
|
||||||
*
|
*
|
||||||
* @param cloudManager current instance of {@link SolrCloudManager}
|
* @param cloudManager current instance of {@link SolrCloudManager}
|
||||||
* @param triggerName the name of the trigger to suspend. This must already be scheduled.
|
* @param triggerName the name of the trigger to suspend. This must already be scheduled.
|
||||||
|
* @see #assertAutoScalingRequest
|
||||||
|
* @see #waitForTriggerToBeScheduled
|
||||||
*/
|
*/
|
||||||
public static void suspendTrigger(final SolrCloudManager cloudManager,
|
public static void suspendTrigger(final SolrCloudManager cloudManager,
|
||||||
final String triggerName) throws IOException {
|
final String triggerName) throws IOException {
|
||||||
|
assertAutoScalingRequest(cloudManager, "{'suspend-trigger' : {'name' : '"+triggerName+"'} }");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates & executes an autoscaling request against the current cluster, asserting that
|
||||||
|
* the result is a success.
|
||||||
|
*
|
||||||
|
* @param cloudManager current instance of {@link SolrCloudManager}
|
||||||
|
* @param json The request to POST to the AutoScaling Handler
|
||||||
|
* @see AutoScalingRequest#create
|
||||||
|
*/
|
||||||
|
public static void assertAutoScalingRequest(final SolrCloudManager cloudManager,
|
||||||
|
final String json) throws IOException {
|
||||||
|
// TODO: a lot of code that directly uses AutoScalingRequest.create should use this method
|
||||||
|
|
||||||
final SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST,
|
final SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, json);
|
||||||
"{'suspend-trigger' : {'name' : '"+triggerName+"'} }");
|
|
||||||
final SolrResponse rsp = cloudManager.request(req);
|
final SolrResponse rsp = cloudManager.request(req);
|
||||||
final String result = rsp.getResponse().get("result").toString();
|
final String result = rsp.getResponse().get("result").toString();
|
||||||
Assert.assertEquals("Unexpected 'result' in response: " + rsp,
|
Assert.assertEquals("Unexpected result from auto-scaling command: " + json + " -> " + rsp,
|
||||||
"success", result);
|
"success", result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class for sending (JSON) autoscaling requests that can randomize between V1 and V2 requests
|
* Helper class for sending (JSON) autoscaling requests that can randomize between V1 and V2 requests
|
||||||
|
|
|
@ -18,57 +18,51 @@
|
||||||
package org.apache.solr.cloud.autoscaling;
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|
||||||
import org.apache.solr.cloud.CloudTestUtils;
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
|
|
||||||
import org.apache.solr.cloud.Overseer;
|
import org.apache.solr.cloud.Overseer;
|
||||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.NamedList;
|
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.util.LogLevel;
|
import org.apache.solr.util.LogLevel;
|
||||||
import org.apache.solr.util.TimeOut;
|
import org.apache.solr.util.TimeOut;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
|
||||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||||
|
|
||||||
|
// TODO: this class shares duplicated code with NodeLostTriggerIntegrationTest ... merge?
|
||||||
|
|
||||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
private static CountDownLatch actionConstructorCalled;
|
private static volatile CountDownLatch actionConstructorCalled;
|
||||||
private static CountDownLatch actionInitCalled;
|
private static volatile CountDownLatch actionInitCalled;
|
||||||
private static CountDownLatch triggerFiredLatch;
|
private static volatile CountDownLatch triggerFiredLatch;
|
||||||
private static int waitForSeconds = 1;
|
private static volatile int waitForSeconds = 1;
|
||||||
private static AtomicBoolean triggerFired;
|
private static volatile AtomicBoolean triggerFired;
|
||||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
private static volatile Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
private static SolrCloudManager cloudManager;
|
private static volatile SolrCloudManager cloudManager;
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupCluster() throws Exception {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void after() throws Exception {
|
public void after() throws Exception {
|
||||||
shutdownCluster();
|
shutdownCluster();
|
||||||
|
@ -84,23 +78,18 @@ public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
.addConfig("conf", configset("cloud-minimal"))
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
.configure();
|
.configure();
|
||||||
|
|
||||||
|
final Overseer overseer = cluster.getOpenOverseer();
|
||||||
|
assertNotNull(overseer);
|
||||||
|
cloudManager = overseer.getSolrCloudManager();
|
||||||
|
assertNotNull(cloudManager);
|
||||||
|
|
||||||
// disable .scheduled_maintenance (once it exists)
|
// disable .scheduled_maintenance (once it exists)
|
||||||
CloudTestUtils.waitForTriggerToBeScheduled(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
|
CloudTestUtils.waitForTriggerToBeScheduled(cloudManager, ".scheduled_maintenance");
|
||||||
CloudTestUtils.suspendTrigger(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
|
CloudTestUtils.suspendTrigger(cloudManager, ".scheduled_maintenance");
|
||||||
|
|
||||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
|
||||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
|
||||||
int overseerLeaderIndex = 0;
|
|
||||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
|
||||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
|
||||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
|
||||||
overseerLeaderIndex = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
|
|
||||||
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
|
||||||
// aggressively remove all active scheduled triggers
|
// aggressively remove all active scheduled triggers
|
||||||
|
final ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||||
|
// TODO: is this really safe? is it possible overseer is still in process of adding some to schedule?
|
||||||
scheduledTriggers.removeAll();
|
scheduledTriggers.removeAll();
|
||||||
|
|
||||||
// clear any persisted auto scaling configuration
|
// clear any persisted auto scaling configuration
|
||||||
|
@ -116,7 +105,6 @@ public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
triggerFired = new AtomicBoolean(false);
|
triggerFired = new AtomicBoolean(false);
|
||||||
events.clear();
|
events.clear();
|
||||||
|
|
||||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
|
||||||
// clear any events or markers
|
// clear any events or markers
|
||||||
// todo: consider the impact of such cleanup on regular cluster restarts
|
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||||
|
@ -131,78 +119,108 @@ public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeAddedTriggerRestoreState() throws Exception {
|
public void testNodeAddedTriggerRestoreState() throws Exception {
|
||||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
|
||||||
actionInitCalled = new CountDownLatch(2);
|
final String triggerName = "node_added_restore_trigger";
|
||||||
|
|
||||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
// should be enough to ensure trigger doesn't fire any actions until we replace the trigger
|
||||||
waitForSeconds = 5;
|
waitForSeconds = 500000;
|
||||||
String setTriggerCommand = "{" +
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
(cloudManager,
|
||||||
"'name' : 'node_added_restore_trigger'," +
|
"{" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'set-trigger' : {" +
|
||||||
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
"'name' : '"+triggerName+"'," +
|
||||||
"'enabled' : true," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
"}}";
|
"'enabled' : true," +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
NamedList<Object> response = solrClient.request(req);
|
"}}");
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
|
||||||
Thread.sleep(200);
|
|
||||||
}
|
|
||||||
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
|
||||||
|
|
||||||
// start a new node
|
// start a new node
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
final JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
|
final String nodeName = newNode.getNodeName();
|
||||||
|
|
||||||
|
// poll the internal state of the trigger until it run()s at least once and updates
|
||||||
|
// it's internal state to know the node we added is live
|
||||||
|
//
|
||||||
|
// (this should run roughly once a second)
|
||||||
|
(new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource()))
|
||||||
|
.waitFor("initial trigger never ran to detect new live node", () ->
|
||||||
|
(((Collection<String>) getTriggerState(triggerName).get("lastLiveNodes"))
|
||||||
|
.contains(nodeName)));
|
||||||
|
|
||||||
cluster.waitForAllNodes(30);
|
// since we know the nodeAdded event has been detected, we can recored the current timestamp
|
||||||
|
// (relative to the cluster's time source) and later assert that (restored state) correctly
|
||||||
|
// tracked that the event happened prior to "now"
|
||||||
|
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
|
||||||
|
|
||||||
|
//
|
||||||
|
// now replace the trigger with a new instance to test that the state gets copied over correctly
|
||||||
|
//
|
||||||
|
|
||||||
|
// reset the actionInitCalled counter so we can confirm the second instances is inited
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
// use a low waitTime to ensure it processes the event quickly.
|
||||||
|
// (this updated property also ensures the set-trigger won't be treated as a No-Op)
|
||||||
|
waitForSeconds = 0 + random().nextInt(3);
|
||||||
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
|
(cloudManager,
|
||||||
|
"{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : '"+triggerName+"'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}");
|
||||||
|
|
||||||
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// ensure that the old trigger sees the new node, todo find a better way to do this
|
// the trigger actions should now (eventually) record that the node was added
|
||||||
Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
assertTrue("Second instance of our trigger never fired the action to process the event",
|
||||||
|
triggerFiredLatch.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertEquals("Wrong number of events recorded: " + events.toString(),
|
||||||
|
1, events.size());
|
||||||
|
|
||||||
|
final TriggerEvent event = events.iterator().next();
|
||||||
|
assertNotNull("null event???", event);
|
||||||
|
assertTrue("Event should have been a nodeAdded event: " + event.getClass(),
|
||||||
|
event instanceof NodeAddedTrigger.NodeAddedEvent);
|
||||||
|
|
||||||
waitForSeconds = 0;
|
assertNotNull("event is missing NODE_NAMES: " + event, event.getProperty(TriggerEvent.NODE_NAMES));
|
||||||
setTriggerCommand = "{" +
|
assertEquals("event has incorrect NODE_NAMES: " + event,
|
||||||
"'set-trigger' : {" +
|
Collections.singletonList(nodeName),
|
||||||
"'name' : 'node_added_restore_trigger'," +
|
event.getProperty(TriggerEvent.NODE_NAMES));
|
||||||
"'event' : 'nodeAdded'," +
|
|
||||||
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
assertTrue("event TS is too late, should be before (max) expected TS @ "
|
||||||
"'enabled' : true," +
|
+ maxEventTimeNs + ": " + event,
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
event.getEventTime() < maxEventTimeNs);
|
||||||
"}}";
|
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
assertNotNull("event is missing EVENT_TIMES: " + event, event.getProperty(TriggerEvent.EVENT_TIMES));
|
||||||
response = solrClient.request(req);
|
assertEquals("event has unexpeted number of EVENT_TIMES: " + event,
|
||||||
assertEquals(response.get("result").toString(), "success");
|
1, ((Collection)event.getProperty(TriggerEvent.EVENT_TIMES)).size());
|
||||||
|
assertEquals("event's TS doesn't match EVENT_TIMES: " + event,
|
||||||
// wait until the second instance of action is created
|
event.getEventTime(),
|
||||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
((Collection)event.getProperty(TriggerEvent.EVENT_TIMES)).iterator().next());
|
||||||
fail("Two TriggerAction instances should have been created by now");
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
|
||||||
assertTrue("The trigger did not fire at all", await);
|
|
||||||
assertTrue(triggerFired.get());
|
|
||||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
|
||||||
assertNotNull(nodeAddedEvent);
|
|
||||||
List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
|
||||||
assertTrue(nodeNames.contains(newNode.getNodeName()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeAddedTrigger() throws Exception {
|
public void testNodeAddedTrigger() throws Exception {
|
||||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
(cloudManager,
|
||||||
"'name' : 'node_added_trigger'," +
|
"{" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'set-trigger' : {" +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'enabled' : true," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"}}";
|
"'enabled' : true," +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
NamedList<Object> response = solrClient.request(req);
|
"}}");
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -223,17 +241,16 @@ public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
actionInitCalled = new CountDownLatch(1);
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
// update the trigger with exactly the same data
|
// update the trigger with exactly the same data
|
||||||
setTriggerCommand = "{" +
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
(cloudManager,
|
||||||
"'name' : 'node_added_trigger'," +
|
"{" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'set-trigger' : {" +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'enabled' : true," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"}}";
|
"'enabled' : true," +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
response = solrClient.request(req);
|
"}}");
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// this should be a no-op so the action should have been created but init should not be called
|
// this should be a no-op so the action should have been created but init should not be called
|
||||||
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
@ -277,4 +294,17 @@ public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
super.init();
|
super.init();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for getting a copy of the current (internal) trigger state of a scheduled trigger.
|
||||||
|
*/
|
||||||
|
private Map<String, Object> getTriggerState(final String name) {
|
||||||
|
final Overseer overseer = cluster.getOpenOverseer();
|
||||||
|
final ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||||
|
final AutoScaling.Trigger t = scheduledTriggers.getTrigger(name);
|
||||||
|
assertNotNull(name + " is not a currently scheduled trigger", t);
|
||||||
|
assertTrue(name + " is not a TriggerBase w/state: " + t.getClass(),
|
||||||
|
t instanceof TriggerBase);
|
||||||
|
return ((TriggerBase)t).deepCopyState();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,20 +18,21 @@
|
||||||
package org.apache.solr.cloud.autoscaling;
|
package org.apache.solr.cloud.autoscaling;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.cloud.CloudTestUtils;
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
|
|
||||||
import org.apache.solr.cloud.Overseer;
|
import org.apache.solr.cloud.Overseer;
|
||||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
|
@ -43,31 +44,26 @@ import org.apache.solr.util.TimeOut;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
|
||||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||||
|
|
||||||
|
// TODO: this class shares duplicated code with NodeAddedTriggerIntegrationTest ... merge?
|
||||||
|
|
||||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||||
public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
private static CountDownLatch actionConstructorCalled;
|
private static volatile CountDownLatch actionConstructorCalled;
|
||||||
private static CountDownLatch actionInitCalled;
|
private static volatile CountDownLatch actionInitCalled;
|
||||||
private static CountDownLatch triggerFiredLatch;
|
private static volatile CountDownLatch triggerFiredLatch;
|
||||||
private static int waitForSeconds = 1;
|
private static volatile int waitForSeconds = 1;
|
||||||
private static AtomicBoolean triggerFired;
|
private static volatile AtomicBoolean triggerFired;
|
||||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
private static volatile Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
private static SolrCloudManager cloudManager;
|
private static volatile SolrCloudManager cloudManager;
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupCluster() throws Exception {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static CountDownLatch getTriggerFiredLatch() {
|
private static CountDownLatch getTriggerFiredLatch() {
|
||||||
return triggerFiredLatch;
|
return triggerFiredLatch;
|
||||||
|
@ -80,33 +76,26 @@ public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
.addConfig("conf", configset("cloud-minimal"))
|
.addConfig("conf", configset("cloud-minimal"))
|
||||||
.configure();
|
.configure();
|
||||||
|
|
||||||
|
final Overseer overseer = cluster.getOpenOverseer();
|
||||||
|
assertNotNull(overseer);
|
||||||
|
cloudManager = overseer.getSolrCloudManager();
|
||||||
|
assertNotNull(cloudManager);
|
||||||
|
|
||||||
// disable .scheduled_maintenance (once it exists)
|
// disable .scheduled_maintenance (once it exists)
|
||||||
CloudTestUtils.waitForTriggerToBeScheduled(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
|
CloudTestUtils.waitForTriggerToBeScheduled(cloudManager, ".scheduled_maintenance");
|
||||||
CloudTestUtils.suspendTrigger(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
|
CloudTestUtils.suspendTrigger(cloudManager, ".scheduled_maintenance");
|
||||||
|
|
||||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
|
||||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
|
||||||
int overseerLeaderIndex = 0;
|
|
||||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
|
||||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
|
||||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
|
||||||
overseerLeaderIndex = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
|
|
||||||
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
|
||||||
// aggressively remove all active scheduled triggers
|
// aggressively remove all active scheduled triggers
|
||||||
|
final ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||||
|
// TODO: is this really safe? is it possible overseer is still in process of adding some to schedule?
|
||||||
scheduledTriggers.removeAll();
|
scheduledTriggers.removeAll();
|
||||||
|
|
||||||
// clear any persisted auto scaling configuration
|
// clear any persisted auto scaling configuration
|
||||||
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||||
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||||
|
|
||||||
|
|
||||||
cluster.getSolrClient().setDefaultCollection(null);
|
cluster.getSolrClient().setDefaultCollection(null);
|
||||||
|
|
||||||
|
|
||||||
waitForSeconds = 1 + random().nextInt(3);
|
waitForSeconds = 1 + random().nextInt(3);
|
||||||
actionConstructorCalled = new CountDownLatch(1);
|
actionConstructorCalled = new CountDownLatch(1);
|
||||||
actionInitCalled = new CountDownLatch(1);
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
@ -114,7 +103,6 @@ public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
triggerFired = new AtomicBoolean(false);
|
triggerFired = new AtomicBoolean(false);
|
||||||
events.clear();
|
events.clear();
|
||||||
|
|
||||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
|
||||||
// clear any events or markers
|
// clear any events or markers
|
||||||
// todo: consider the impact of such cleanup on regular cluster restarts
|
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||||
|
@ -134,71 +122,114 @@ public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeLostTriggerRestoreState() throws Exception {
|
public void testNodeLostTriggerRestoreState() throws Exception {
|
||||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
|
||||||
actionInitCalled = new CountDownLatch(2);
|
final String triggerName = "node_lost_restore_trigger";
|
||||||
|
|
||||||
// start a new node
|
// start a new node
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
final JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
String nodeName = newNode.getNodeName();
|
final String nodeName = newNode.getNodeName();
|
||||||
|
|
||||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
// should be enough to ensure trigger doesn't fire any actions until we replace the trigger
|
||||||
waitForSeconds = 5;
|
waitForSeconds = 500000;
|
||||||
String setTriggerCommand = "{" +
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
(cloudManager,
|
||||||
"'name' : 'node_lost_restore_trigger'," +
|
"{" +
|
||||||
"'event' : 'nodeLost'," +
|
"'set-trigger' : {" +
|
||||||
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
"'name' : '"+triggerName+"'," +
|
||||||
"'enabled' : true," +
|
"'event' : 'nodeLost'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
"}}";
|
"'enabled' : true," +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
NamedList<Object> response = solrClient.request(req);
|
"}}");
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
|
// poll the internal state of the trigger until it run()s at least once and updates
|
||||||
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
// it's internal state to know the node we added is live
|
||||||
Thread.sleep(200);
|
//
|
||||||
}
|
// (this should run roughly once a second)
|
||||||
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
(new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource()))
|
||||||
|
.waitFor("initial trigger never ran to detect new live node", () ->
|
||||||
|
(((Collection<String>) getTriggerState(triggerName).get("lastLiveNodes"))
|
||||||
|
.contains(nodeName)));
|
||||||
|
|
||||||
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
// kill our node
|
||||||
int index = -1;
|
cluster.stopJettySolrRunner(newNode);
|
||||||
for (int i = 0; i < jettySolrRunners.size(); i++) {
|
cluster.waitForJettyToStop(newNode);
|
||||||
JettySolrRunner runner = jettySolrRunners.get(i);
|
|
||||||
if (runner == newNode) index = i;
|
|
||||||
}
|
|
||||||
assertFalse(index == -1);
|
|
||||||
JettySolrRunner j = cluster.stopJettySolrRunner(index);
|
|
||||||
cluster.waitForJettyToStop(j);
|
|
||||||
|
|
||||||
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
// poll the internal state of the trigger until it run()s at least once (more) and updates
|
||||||
Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
// it's internal state to know the node we killed is no longer alive
|
||||||
|
//
|
||||||
|
// (this should run roughly once a second of simulated time)
|
||||||
|
(new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource()))
|
||||||
|
.waitFor("initial trigger never ran to detect lost node", () ->
|
||||||
|
! (((Collection<String>) getTriggerState(triggerName).get("lastLiveNodes"))
|
||||||
|
.contains(nodeName)));
|
||||||
|
|
||||||
waitForSeconds = 0;
|
// since we know the nodeLost event has been detected, we can recored the current timestamp
|
||||||
setTriggerCommand = "{" +
|
// (relative to the cluster's time source) and later assert that (restored state) correctly
|
||||||
"'set-trigger' : {" +
|
// tracked that the event happened prior to "now"
|
||||||
"'name' : 'node_lost_restore_trigger'," +
|
final long maxEventTimeNs = cloudManager.getTimeSource().getTimeNs();
|
||||||
"'event' : 'nodeLost'," +
|
|
||||||
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
// even though our trigger has detected a lost node, the *action* we registered should not have
|
||||||
"'enabled' : true," +
|
// been run yet, due to the large waitFor configuration...
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
assertEquals("initial trigger action should not have fired", false, triggerFired.get());
|
||||||
"}}";
|
assertEquals("initial trigger action latch should not have counted down",
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
1, triggerFiredLatch.getCount());
|
||||||
response = solrClient.request(req);
|
assertEquals("initial trigger action should not have recorded any events: " + events.toString(),
|
||||||
assertEquals(response.get("result").toString(), "success");
|
0, events.size());
|
||||||
|
|
||||||
// wait until the second instance of action is created
|
//
|
||||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
// now replace the trigger with a new instance to test that the state gets copied over correctly
|
||||||
fail("Two TriggerAction instances should have been created by now");
|
//
|
||||||
}
|
|
||||||
|
// reset the actionInitCalled counter so we can confirm the second instances is inited
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
// use a low waitTime to ensure it processes the event quickly.
|
||||||
|
// (this updated property also ensures the set-trigger won't be treated as a No-Op)
|
||||||
|
waitForSeconds = 0 + random().nextInt(3);
|
||||||
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
|
(cloudManager,
|
||||||
|
"{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : '"+triggerName+"'," +
|
||||||
|
"'event' : 'nodeLost'," +
|
||||||
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}");
|
||||||
|
|
||||||
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// the trigger actions should now (eventually) record that the node is lost
|
||||||
|
assertTrue("Second instance of our trigger never fired the action to process the event",
|
||||||
|
triggerFiredLatch.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertEquals("Wrong number of events recorded: " + events.toString(),
|
||||||
|
1, events.size());
|
||||||
|
|
||||||
|
final TriggerEvent event = events.iterator().next();
|
||||||
|
assertNotNull("null event???", event);
|
||||||
|
assertTrue("Event should have been a nodeLost event: " + event.getClass(),
|
||||||
|
event instanceof NodeLostTrigger.NodeLostEvent);
|
||||||
|
|
||||||
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
assertNotNull("event is missing NODE_NAMES: " + event, event.getProperty(TriggerEvent.NODE_NAMES));
|
||||||
assertTrue("The trigger did not fire at all", await);
|
assertEquals("event has incorrect NODE_NAMES: " + event,
|
||||||
assertTrue(triggerFired.get());
|
Collections.singletonList(nodeName),
|
||||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
|
event.getProperty(TriggerEvent.NODE_NAMES));
|
||||||
assertNotNull(nodeLostEvent);
|
|
||||||
List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
|
assertTrue("event TS is too late, should be before (max) expected TS @ "
|
||||||
assertTrue(nodeNames.contains(nodeName));
|
+ maxEventTimeNs + ": " + event,
|
||||||
|
event.getEventTime() < maxEventTimeNs);
|
||||||
|
|
||||||
|
assertNotNull("event is missing EVENT_TIMES: " + event, event.getProperty(TriggerEvent.EVENT_TIMES));
|
||||||
|
assertEquals("event has unexpeted number of EVENT_TIMES: " + event,
|
||||||
|
1, ((Collection)event.getProperty(TriggerEvent.EVENT_TIMES)).size());
|
||||||
|
assertEquals("event's TS doesn't match EVENT_TIMES: " + event,
|
||||||
|
event.getEventTime(),
|
||||||
|
((Collection)event.getProperty(TriggerEvent.EVENT_TIMES)).iterator().next());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -221,9 +252,7 @@ public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
nonOverseerLeaderIndex = i;
|
nonOverseerLeaderIndex = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
CloudTestUtils.assertAutoScalingRequest(cloudManager, setTriggerCommand);
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -247,17 +276,16 @@ public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
actionInitCalled = new CountDownLatch(1);
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
// update the trigger with exactly the same data
|
// update the trigger with exactly the same data
|
||||||
setTriggerCommand = "{" +
|
CloudTestUtils.assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
(cloudManager,
|
||||||
"'name' : 'node_lost_trigger'," +
|
"{" +
|
||||||
"'event' : 'nodeLost'," +
|
"'set-trigger' : {" +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'name' : 'node_lost_trigger'," +
|
||||||
"'enabled' : true," +
|
"'event' : 'nodeLost'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"}}";
|
"'enabled' : true," +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
response = solrClient.request(req);
|
"}}");
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// this should be a no-op so the action should have been created but init should not be called
|
// this should be a no-op so the action should have been created but init should not be called
|
||||||
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
||||||
|
@ -301,4 +329,18 @@ public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
super.init();
|
super.init();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for getting a copy of the current (internal) trigger state of a scheduled trigger.
|
||||||
|
*/
|
||||||
|
private Map<String, Object> getTriggerState(final String name) {
|
||||||
|
final Overseer overseer = cluster.getOpenOverseer();
|
||||||
|
final ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||||
|
final AutoScaling.Trigger t = scheduledTriggers.getTrigger(name);
|
||||||
|
assertNotNull(name + " is not a currently scheduled trigger", t);
|
||||||
|
assertTrue(name + " is not a TriggerBase w/state: " + t.getClass(),
|
||||||
|
t instanceof TriggerBase);
|
||||||
|
return ((TriggerBase)t).deepCopyState();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -532,10 +532,12 @@ public class SimCloudManager implements SolrCloudManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simulate the effect of restarting Overseer leader - in this case this means restarting the
|
* Simulate the effect of restarting Overseer leader - in this case this means closing the current
|
||||||
* OverseerTriggerThread and optionally killing a node. All background tasks currently in progress
|
* {@link OverseerTriggerThread} (and optionally killing a node) then starting a new
|
||||||
* will be interrupted.
|
* {@link OverseerTriggerThread}.
|
||||||
|
* All background tasks currently in progress will be interrupted.
|
||||||
* @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
|
* @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
|
||||||
|
* @see #getOverseerTriggerThread
|
||||||
*/
|
*/
|
||||||
public void simRestartOverseer(String killNodeId) throws Exception {
|
public void simRestartOverseer(String killNodeId) throws Exception {
|
||||||
log.info("=== Restarting OverseerTriggerThread and clearing object cache...");
|
log.info("=== Restarting OverseerTriggerThread and clearing object cache...");
|
||||||
|
@ -900,4 +902,12 @@ public class SimCloudManager implements SolrCloudManager {
|
||||||
IOUtils.closeQuietly(objectCache);
|
IOUtils.closeQuietly(objectCache);
|
||||||
simCloudManagerPool.shutdownNow();
|
simCloudManagerPool.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Direct access to the current {@link OverseerTriggerThread}
|
||||||
|
* @see #simRestartOverseer
|
||||||
|
*/
|
||||||
|
public OverseerTriggerThread getOverseerTriggerThread() {
|
||||||
|
return ((OverseerTriggerThread) triggerThread.getThread());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.apache.solr.SolrTestCaseJ4;
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
@ -121,4 +122,15 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
||||||
fail("Couldn't get random replica that matched conditions\n" + slice.toString());
|
fail("Couldn't get random replica that matched conditions\n" + slice.toString());
|
||||||
return null; // just to keep the compiler happy - fail will always throw an Exception
|
return null; // just to keep the compiler happy - fail will always throw an Exception
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates & executes an autoscaling request against the current cluster, asserting that
|
||||||
|
* the result is a success
|
||||||
|
*
|
||||||
|
* @param json The request to send
|
||||||
|
* @see CloudTestUtils#assertAutoScalingRequest
|
||||||
|
*/
|
||||||
|
public void assertAutoScalingRequest(final String json) throws IOException {
|
||||||
|
CloudTestUtils.assertAutoScalingRequest(cluster, json);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,10 @@
|
||||||
|
|
||||||
package org.apache.solr.cloud.autoscaling.sim;
|
package org.apache.solr.cloud.autoscaling.sim;
|
||||||
|
|
||||||
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -38,7 +37,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
|
||||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||||
|
@ -46,15 +44,17 @@ import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.cloud.CloudTestUtils;
|
import org.apache.solr.cloud.CloudTestUtils;
|
||||||
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
|
|
||||||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||||
|
import org.apache.solr.cloud.autoscaling.AutoScaling;
|
||||||
import org.apache.solr.cloud.autoscaling.CapturedEvent;
|
import org.apache.solr.cloud.autoscaling.CapturedEvent;
|
||||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||||
|
import org.apache.solr.cloud.autoscaling.NodeAddedTrigger;
|
||||||
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
|
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
|
||||||
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
|
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
|
||||||
import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
|
import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
|
||||||
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
||||||
|
import org.apache.solr.cloud.autoscaling.TriggerBase;
|
||||||
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
||||||
import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
|
import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
|
||||||
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
|
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
|
||||||
|
@ -62,7 +62,6 @@ import org.apache.solr.cloud.autoscaling.TriggerValidationException;
|
||||||
import org.apache.solr.common.MapWriter;
|
import org.apache.solr.common.MapWriter;
|
||||||
import org.apache.solr.common.cloud.LiveNodesListener;
|
import org.apache.solr.common.cloud.LiveNodesListener;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.NamedList;
|
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
import org.apache.solr.core.SolrResourceLoader;
|
import org.apache.solr.core.SolrResourceLoader;
|
||||||
import org.apache.solr.util.LogLevel;
|
import org.apache.solr.util.LogLevel;
|
||||||
|
@ -158,30 +157,25 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
|
|
||||||
// first trigger
|
// first trigger
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_trigger1'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_trigger1'," +
|
||||||
"'waitFor' : '0s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '0s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// second trigger
|
assertAutoScalingRequest
|
||||||
setTriggerCommand = "{" +
|
("{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
"'name' : 'node_added_trigger2'," +
|
"'name' : 'node_added_trigger2'," +
|
||||||
"'event' : 'nodeAdded'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'waitFor' : '0s'," +
|
"'waitFor' : '0s'," +
|
||||||
"'enabled' : true," +
|
"'enabled' : true," +
|
||||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||||
"}}";
|
"}}");
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// wait until the two instances of action are created
|
// wait until the two instances of action are created
|
||||||
if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
|
@ -199,29 +193,25 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
actionInitCalled = new CountDownLatch(2);
|
actionInitCalled = new CountDownLatch(2);
|
||||||
triggerFiredLatch = new CountDownLatch(2);
|
triggerFiredLatch = new CountDownLatch(2);
|
||||||
|
|
||||||
setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_lost_trigger1'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeLost'," +
|
"'name' : 'node_lost_trigger1'," +
|
||||||
"'waitFor' : '0s'," +
|
"'event' : 'nodeLost'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '0s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_lost_trigger2'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeLost'," +
|
"'name' : 'node_lost_trigger2'," +
|
||||||
"'waitFor' : '0s'," +
|
"'event' : 'nodeLost'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '0s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// wait until the two instances of action are created
|
// wait until the two instances of action are created
|
||||||
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
|
@ -277,140 +267,184 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
// commentted 190-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 14-Oct-2018
|
|
||||||
public void testNodeLostTriggerRestoreState() throws Exception {
|
public void testNodeLostTriggerRestoreState() throws Exception {
|
||||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
|
||||||
actionInitCalled = new CountDownLatch(2);
|
final String triggerName = "node_lost_restore_trigger";
|
||||||
|
|
||||||
// start a new node
|
// should be enough to ensure trigger doesn't fire any actions until we replace the trigger
|
||||||
String nodeName = cluster.simAddNode();
|
waitForSeconds = 500000;
|
||||||
|
assertAutoScalingRequest
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
("{" +
|
||||||
waitForSeconds = 5;
|
"'set-trigger' : {" +
|
||||||
String setTriggerCommand = "{" +
|
"'name' : '"+triggerName+"'," +
|
||||||
"'set-trigger' : {" +
|
"'event' : 'nodeLost'," +
|
||||||
"'name' : 'node_lost_restore_trigger'," +
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
"'event' : 'nodeLost'," +
|
"'enabled' : true," +
|
||||||
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
"'enabled' : true," +
|
"}}");
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
|
||||||
"}}";
|
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cluster.getTimeSource());
|
|
||||||
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
|
||||||
timeOut.sleep(200);
|
|
||||||
}
|
|
||||||
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
|
||||||
|
|
||||||
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// start a new node that we can kill later
|
||||||
|
final String nodeName = cluster.simAddNode();
|
||||||
|
|
||||||
|
// poll the internal state of the trigger until it run()s at least once and updates
|
||||||
|
// it's internal state to know the node we added is live
|
||||||
|
//
|
||||||
|
// (this should run roughly once a second of simulated time)
|
||||||
|
(new TimeOut(30, TimeUnit.SECONDS, cluster.getTimeSource()))
|
||||||
|
.waitFor("initial trigger never ran to detect new live node", () ->
|
||||||
|
(((Collection<String>) getTriggerState(triggerName).get("lastLiveNodes"))
|
||||||
|
.contains(nodeName)));
|
||||||
|
|
||||||
|
// kill our node
|
||||||
cluster.simRemoveNode(nodeName, false);
|
cluster.simRemoveNode(nodeName, false);
|
||||||
|
|
||||||
|
// poll the internal state of the trigger until it run()s at least once (more) and updates
|
||||||
|
// it's internal state to know the node we killed is no longer alive
|
||||||
|
//
|
||||||
|
// (this should run roughly once a second of simulated time)
|
||||||
|
(new TimeOut(30, TimeUnit.SECONDS, cluster.getTimeSource()))
|
||||||
|
.waitFor("initial trigger never ran to detect lost node", () ->
|
||||||
|
! (((Collection<String>) getTriggerState(triggerName).get("lastLiveNodes"))
|
||||||
|
.contains(nodeName)));
|
||||||
|
|
||||||
|
// since we know the nodeLost event has been detected, we can recored the current timestamp
|
||||||
|
// (relative to the cluster's time source) and later assert that (restored state) correctly
|
||||||
|
// tracked that the event happened prior to "now"
|
||||||
|
final long maxEventTimeNs = cluster.getTimeSource().getTimeNs();
|
||||||
|
|
||||||
|
// even though our trigger has detected a lost node, the *action* we registered should not have
|
||||||
|
// been run yet, due to the large waitFor configuration...
|
||||||
|
assertEquals("initial trigger action should not have fired", false, triggerFired.get());
|
||||||
|
assertEquals("initial trigger action latch should not have counted down",
|
||||||
|
1, triggerFiredLatch.getCount());
|
||||||
|
assertEquals("initial trigger action should not have recorded any events: " + events.toString(),
|
||||||
|
0, events.size());
|
||||||
|
|
||||||
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
//
|
||||||
timeOut.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
// now replace the trigger with a new instance to test that the state gets copied over correctly
|
||||||
|
//
|
||||||
waitForSeconds = 0;
|
|
||||||
setTriggerCommand = "{" +
|
// reset the actionInitCalled counter so we can confirm the second instances is inited
|
||||||
"'set-trigger' : {" +
|
actionInitCalled = new CountDownLatch(1);
|
||||||
"'name' : 'node_lost_restore_trigger'," +
|
// use a low waitTime to ensure it processes the event quickly.
|
||||||
"'event' : 'nodeLost'," +
|
// (this updated property also ensures the set-trigger won't be treated as a No-Op)
|
||||||
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
waitForSeconds = 0 + random().nextInt(3);
|
||||||
"'enabled' : true," +
|
assertAutoScalingRequest
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
("{" +
|
||||||
"}}";
|
"'set-trigger' : {" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"'name' : '"+triggerName+"'," +
|
||||||
response = solrClient.request(req);
|
"'event' : 'nodeLost'," +
|
||||||
assertEquals(response.get("result").toString(), "success");
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
// wait until the second instance of action is created
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
"}}");
|
||||||
fail("Two TriggerAction instances should have been created by now");
|
|
||||||
}
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
boolean await = triggerFiredLatch.await(90000 / SPEED, TimeUnit.MILLISECONDS);
|
|
||||||
assertTrue("The trigger did not fire at all", await);
|
// the trigger actions should now (eventually) record that the node is lost
|
||||||
assertTrue(triggerFired.get());
|
assertTrue("Second instance of our trigger never fired the action to process the event",
|
||||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
|
triggerFiredLatch.await(30, TimeUnit.SECONDS));
|
||||||
assertNotNull(nodeLostEvent);
|
|
||||||
List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
|
final TriggerEvent event = assertSingleEvent(nodeName, maxEventTimeNs);
|
||||||
assertTrue(nodeNames.contains(nodeName));
|
assertTrue("Event should have been a nodeLost event: " + event.getClass(),
|
||||||
|
event instanceof NodeLostTrigger.NodeLostEvent);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
|
|
||||||
public void testNodeAddedTriggerRestoreState() throws Exception {
|
public void testNodeAddedTriggerRestoreState() throws Exception {
|
||||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
|
||||||
actionInitCalled = new CountDownLatch(2);
|
final String triggerName = "node_added_restore_trigger";
|
||||||
|
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
// should be enough to ensure trigger doesn't fire any actions until we replace the trigger
|
||||||
waitForSeconds = 5;
|
waitForSeconds = 500000;
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_restore_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : '"+triggerName+"'," +
|
||||||
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cluster.getTimeSource());
|
|
||||||
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
|
||||||
timeOut.sleep(200);
|
|
||||||
}
|
|
||||||
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
|
||||||
|
|
||||||
// start a new node
|
// start a new node
|
||||||
String newNode = cluster.simAddNode();
|
final String nodeName = cluster.simAddNode();
|
||||||
|
|
||||||
// ensure that the old trigger sees the new node, todo find a better way to do this
|
// poll the internal state of the trigger until it run()s at least once and updates
|
||||||
cluster.getTimeSource().sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
// it's internal state to know the node we added is live
|
||||||
|
//
|
||||||
|
// (this should run roughly once a second of simulated time)
|
||||||
|
(new TimeOut(30, TimeUnit.SECONDS, cluster.getTimeSource()))
|
||||||
|
.waitFor("initial trigger never ran to detect new live node", () ->
|
||||||
|
(((Collection<String>) getTriggerState(triggerName).get("lastLiveNodes"))
|
||||||
|
.contains(nodeName)));
|
||||||
|
|
||||||
waitForSeconds = 0;
|
// since we know the nodeAddded event has been detected, we can recored the current timestamp
|
||||||
setTriggerCommand = "{" +
|
// (relative to the cluster's time source) and later assert that (restored state) correctly
|
||||||
"'set-trigger' : {" +
|
// tracked that the event happened prior to "now"
|
||||||
"'name' : 'node_added_restore_trigger'," +
|
final long maxEventTimeNs = cluster.getTimeSource().getTimeNs();
|
||||||
"'event' : 'nodeAdded'," +
|
|
||||||
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
// even though our trigger has detected an added node, the *action* we registered should not have
|
||||||
"'enabled' : true," +
|
// been run yet, due to the large waitFor configuration...
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
assertEquals("initial trigger action should not have fired", false, triggerFired.get());
|
||||||
"}}";
|
assertEquals("initial trigger action latch should not have counted down",
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
1, triggerFiredLatch.getCount());
|
||||||
response = solrClient.request(req);
|
assertEquals("initial trigger action should not have recorded any events: " + events.toString(),
|
||||||
assertEquals(response.get("result").toString(), "success");
|
0, events.size());
|
||||||
|
|
||||||
// wait until the second instance of action is created
|
//
|
||||||
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
// now replace the trigger with a new instance to test that the state gets copied over correctly
|
||||||
fail("Two TriggerAction instances should have been created by now");
|
//
|
||||||
}
|
|
||||||
|
// reset the actionInitCalled counter so we can confirm the second instances is inited
|
||||||
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
// use a low waitTime to ensure it processes the event quickly.
|
||||||
|
// (this updated property also ensures the set-trigger won't be treated as a No-Op)
|
||||||
|
waitForSeconds = 0 + random().nextInt(3);
|
||||||
|
assertAutoScalingRequest
|
||||||
|
("{" +
|
||||||
|
"'set-trigger' : {" +
|
||||||
|
"'name' : '"+triggerName+"'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '"+waitForSeconds+"s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
|
"}}");
|
||||||
|
|
||||||
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// the trigger actions should now (eventually) record that the new node is added
|
||||||
|
assertTrue("Second instance of our trigger never fired the action to process the event",
|
||||||
|
triggerFiredLatch.await(30, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
final TriggerEvent event = assertSingleEvent(nodeName, maxEventTimeNs);
|
||||||
|
assertTrue("Event should have been a nodeAdded event: " + event.getClass(),
|
||||||
|
event instanceof NodeAddedTrigger.NodeAddedEvent);
|
||||||
|
|
||||||
boolean await = triggerFiredLatch.await(60000 / SPEED, TimeUnit.MILLISECONDS);
|
|
||||||
assertTrue("The trigger did not fire at all", await);
|
|
||||||
assertTrue(triggerFired.get());
|
|
||||||
TriggerEvent nodeAddedEvent = events.iterator().next();
|
|
||||||
assertNotNull(nodeAddedEvent);
|
|
||||||
List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
|
||||||
assertTrue(nodeNames.toString(), nodeNames.contains(newNode));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2018-06-18
|
@LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2018-06-18
|
||||||
public void testNodeAddedTrigger() throws Exception {
|
public void testNodeAddedTrigger() throws Exception {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -430,17 +464,15 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
actionInitCalled = new CountDownLatch(1);
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
// update the trigger with exactly the same data
|
// update the trigger with exactly the same data
|
||||||
setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// this should be a no-op so the action should have been created but init should not be called
|
// this should be a no-op so the action should have been created but init should not be called
|
||||||
if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
|
@ -455,17 +487,15 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
||||||
public void testNodeLostTrigger() throws Exception {
|
public void testNodeLostTrigger() throws Exception {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_lost_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeLost'," +
|
"'name' : 'node_lost_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'nodeLost'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -486,18 +516,16 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
actionInitCalled = new CountDownLatch(1);
|
actionInitCalled = new CountDownLatch(1);
|
||||||
|
|
||||||
// update the trigger with exactly the same data
|
// update the trigger with exactly the same data
|
||||||
setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_lost_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeLost'," +
|
"'name' : 'node_lost_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'nodeLost'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// this should be a no-op so the action should have been created but init should not be called
|
// this should be a no-op so the action should have been created but init should not be called
|
||||||
if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -630,20 +658,17 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
public void testEventQueue() throws Exception {
|
public void testEventQueue() throws Exception {
|
||||||
waitForSeconds = 1;
|
waitForSeconds = 1;
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
|
||||||
"'set-trigger' : {" +
|
|
||||||
"'name' : 'node_added_trigger1'," +
|
|
||||||
"'event' : 'nodeAdded'," +
|
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
|
||||||
"'enabled' : true," +
|
|
||||||
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
|
|
||||||
"}}";
|
|
||||||
|
|
||||||
String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||||
|
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
assertAutoScalingRequest
|
||||||
NamedList<Object> response = solrClient.request(req);
|
("{" +
|
||||||
assertEquals(response.get("result").toString(), "success");
|
"'set-trigger' : {" +
|
||||||
|
"'name' : 'node_added_trigger1'," +
|
||||||
|
"'event' : 'nodeAdded'," +
|
||||||
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
|
"'enabled' : true," +
|
||||||
|
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
|
||||||
|
"}}");
|
||||||
|
|
||||||
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -686,17 +711,15 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 14-Oct-2018
|
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 14-Oct-2018
|
||||||
public void testEventFromRestoredState() throws Exception {
|
public void testEventFromRestoredState() throws Exception {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'waitFor' : '10s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '10s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
|
@ -849,29 +872,25 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
// set up triggers
|
// set up triggers
|
||||||
|
|
||||||
log.info("====== ADD TRIGGERS");
|
log.info("====== ADD TRIGGERS");
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'waitFor' : '1s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '1s'," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
"'enabled' : true," +
|
||||||
"}}";
|
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
|
("{" +
|
||||||
"'set-trigger' : {" +
|
"'set-trigger' : {" +
|
||||||
"'name' : 'node_lost_trigger'," +
|
"'name' : 'node_lost_trigger'," +
|
||||||
"'event' : 'nodeLost'," +
|
"'event' : 'nodeLost'," +
|
||||||
"'waitFor' : '1s'," +
|
"'waitFor' : '1s'," +
|
||||||
"'enabled' : true," +
|
"'enabled' : true," +
|
||||||
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||||
"}}";
|
"}}");
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
|
||||||
|
|
||||||
|
@ -949,54 +968,48 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
@Test
|
@Test
|
||||||
public void testListeners() throws Exception {
|
public void testListeners() throws Exception {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'actions' : [" +
|
"'enabled' : true," +
|
||||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
|
"'actions' : [" +
|
||||||
"{'name':'test1','class':'" + TestDummyAction.class.getName() + "'}," +
|
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
|
||||||
"]" +
|
"{'name':'test1','class':'" + TestDummyAction.class.getName() + "'}," +
|
||||||
"}}";
|
"]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
|
||||||
fail("The TriggerAction should have been created by now");
|
fail("The TriggerAction should have been created by now");
|
||||||
}
|
}
|
||||||
|
|
||||||
String setListenerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-listener' : " +
|
("{" +
|
||||||
"{" +
|
"'set-listener' : " +
|
||||||
"'name' : 'foo'," +
|
"{" +
|
||||||
"'trigger' : 'node_added_trigger'," +
|
"'name' : 'foo'," +
|
||||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
|
"'trigger' : 'node_added_trigger'," +
|
||||||
"'beforeAction' : 'test'," +
|
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
|
||||||
"'afterAction' : ['test', 'test1']," +
|
"'beforeAction' : 'test'," +
|
||||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
"'afterAction' : ['test', 'test1']," +
|
||||||
"}" +
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
"}";
|
"}" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand);
|
"}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
String setListenerCommand1 = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-listener' : " +
|
("{" +
|
||||||
"{" +
|
"'set-listener' : " +
|
||||||
"'name' : 'bar'," +
|
"{" +
|
||||||
"'trigger' : 'node_added_trigger'," +
|
"'name' : 'bar'," +
|
||||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
"'trigger' : 'node_added_trigger'," +
|
||||||
"'beforeAction' : ['test', 'test1']," +
|
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||||
"'afterAction' : 'test'," +
|
"'beforeAction' : ['test', 'test1']," +
|
||||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
"'afterAction' : 'test'," +
|
||||||
"}" +
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
"}";
|
"}" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand1);
|
"}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
listenerEvents.clear();
|
listenerEvents.clear();
|
||||||
failDummyAction = false;
|
failDummyAction = false;
|
||||||
|
@ -1113,32 +1126,28 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
failDummyAction = false;
|
failDummyAction = false;
|
||||||
waitForSeconds = 1;
|
waitForSeconds = 1;
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'node_added_cooldown_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'nodeAdded'," +
|
"'name' : 'node_added_cooldown_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'nodeAdded'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'actions' : [" +
|
"'enabled' : true," +
|
||||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
"'actions' : [" +
|
||||||
"]" +
|
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||||
"}}";
|
"]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
String setListenerCommand1 = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-listener' : " +
|
("{" +
|
||||||
"{" +
|
"'set-listener' : " +
|
||||||
"'name' : 'bar'," +
|
"{" +
|
||||||
"'trigger' : 'node_added_cooldown_trigger'," +
|
"'name' : 'bar'," +
|
||||||
"'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
|
"'trigger' : 'node_added_cooldown_trigger'," +
|
||||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
"'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
|
||||||
"}" +
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
"}";
|
"}" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand1);
|
"}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
listenerCreated = new CountDownLatch(1);
|
listenerCreated = new CountDownLatch(1);
|
||||||
listenerEvents.clear();
|
listenerEvents.clear();
|
||||||
|
@ -1226,39 +1235,35 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
create.process(solrClient);
|
create.process(solrClient);
|
||||||
CloudTestUtils.waitForState(cluster, COLL1, 10, TimeUnit.SECONDS, CloudTestUtils.clusterShape(1, 2, false, true));
|
CloudTestUtils.waitForState(cluster, COLL1, 10, TimeUnit.SECONDS, CloudTestUtils.clusterShape(1, 2, false, true));
|
||||||
|
|
||||||
String setTriggerCommand = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-trigger' : {" +
|
("{" +
|
||||||
"'name' : 'search_rate_trigger'," +
|
"'set-trigger' : {" +
|
||||||
"'event' : 'searchRate'," +
|
"'name' : 'search_rate_trigger'," +
|
||||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
"'event' : 'searchRate'," +
|
||||||
"'enabled' : true," +
|
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||||
"'aboveRate' : 1.0," +
|
"'enabled' : true," +
|
||||||
"'aboveNodeRate' : 1.0," +
|
"'aboveRate' : 1.0," +
|
||||||
"'actions' : [" +
|
"'aboveNodeRate' : 1.0," +
|
||||||
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
|
"'actions' : [" +
|
||||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
|
||||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||||
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||||
"{'name':'finish','class':'" + FinishTriggerAction.class.getName() + "'}," +
|
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||||
"]" +
|
"{'name':'finish','class':'" + FinishTriggerAction.class.getName() + "'}," +
|
||||||
"}}";
|
"]" +
|
||||||
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
|
"}}");
|
||||||
NamedList<Object> response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
String setListenerCommand1 = "{" +
|
assertAutoScalingRequest
|
||||||
"'set-listener' : " +
|
("{" +
|
||||||
"{" +
|
"'set-listener' : " +
|
||||||
"'name' : 'srt'," +
|
"{" +
|
||||||
"'trigger' : 'search_rate_trigger'," +
|
"'name' : 'srt'," +
|
||||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
"'trigger' : 'search_rate_trigger'," +
|
||||||
"'afterAction': ['compute', 'execute', 'test']," +
|
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
"'afterAction': ['compute', 'execute', 'test']," +
|
||||||
"}" +
|
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||||
"}";
|
"}" +
|
||||||
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListenerCommand1);
|
"}");
|
||||||
response = solrClient.request(req);
|
|
||||||
assertEquals(response.get("result").toString(), "success");
|
|
||||||
|
|
||||||
// SolrParams query = params(CommonParams.Q, "*:*");
|
// SolrParams query = params(CommonParams.Q, "*:*");
|
||||||
// for (int i = 0; i < 500; i++) {
|
// for (int i = 0; i < 500; i++) {
|
||||||
|
@ -1329,4 +1334,51 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
assertEquals("ADDREPLICA", m._get("params.action", null));
|
assertEquals("ADDREPLICA", m._get("params.action", null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for getting a copy of the current (internal) trigger state of a scheduled trigger.
|
||||||
|
*/
|
||||||
|
private Map<String, Object> getTriggerState(final String name) {
|
||||||
|
final AutoScaling.Trigger t = cluster.getOverseerTriggerThread().getScheduledTriggers().getTrigger(name);
|
||||||
|
assertNotNull(name + " is not a currently scheduled trigger", t);
|
||||||
|
assertTrue(name + " is not a TriggerBase w/state: " + t.getClass(),
|
||||||
|
t instanceof TriggerBase);
|
||||||
|
return ((TriggerBase)t).deepCopyState();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for making some common assertions about {@link #events}:
|
||||||
|
* <ul>
|
||||||
|
* <li>Exactly one event that is not null</li>
|
||||||
|
* <li>Event refers to exactly one expected {@link TriggerEvent#NODE_NAMES}</li>
|
||||||
|
* <li>Event has exactly one {@link TriggerEvent#EVENT_TIMES} (which matches {@link TriggerEvent#getEventTime}) which is less then the <code>maxExpectedEventTimeNs</code></li>
|
||||||
|
* </ul>
|
||||||
|
* @return the event found so that other assertions can be made
|
||||||
|
*/
|
||||||
|
private static TriggerEvent assertSingleEvent(final String expectedNodeName,
|
||||||
|
final long maxExpectedEventTimeNs) {
|
||||||
|
|
||||||
|
assertEquals("Wrong number of events recorded: " + events.toString(),
|
||||||
|
1, events.size());
|
||||||
|
|
||||||
|
final TriggerEvent event = events.iterator().next();
|
||||||
|
assertNotNull("null event???", event);
|
||||||
|
assertNotNull("event is missing NODE_NAMES: " + event, event.getProperty(TriggerEvent.NODE_NAMES));
|
||||||
|
assertEquals("event has incorrect NODE_NAMES: " + event,
|
||||||
|
Collections.singletonList(expectedNodeName),
|
||||||
|
event.getProperty(TriggerEvent.NODE_NAMES));
|
||||||
|
|
||||||
|
assertTrue("event TS is too late, should be before (max) expected TS @ "
|
||||||
|
+ maxExpectedEventTimeNs + ": " + event,
|
||||||
|
event.getEventTime() < maxExpectedEventTimeNs);
|
||||||
|
|
||||||
|
assertNotNull("event is missing EVENT_TIMES: " + event, event.getProperty(TriggerEvent.EVENT_TIMES));
|
||||||
|
assertEquals("event has unexpeted number of EVENT_TIMES: " + event,
|
||||||
|
1, ((Collection)event.getProperty(TriggerEvent.EVENT_TIMES)).size());
|
||||||
|
assertEquals("event's TS doesn't match EVENT_TIMES: " + event,
|
||||||
|
event.getEventTime(),
|
||||||
|
((Collection)event.getProperty(TriggerEvent.EVENT_TIMES)).iterator().next());
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue