Merge remote-tracking branch 'origin/feature/autoscaling' into feature/autoscaling

This commit is contained in:
Noble Paul 2017-05-25 17:55:10 +09:30
commit b933b60407
7 changed files with 234 additions and 26 deletions

View File

@ -115,6 +115,12 @@ public class AutoScaling {
public boolean isClosed();
public void restoreState(Trigger<E> old);
/**
* Called before a trigger is scheduled. Any heavy object creation or initialisation should
* be done in this method instead of the Trigger's constructor.
*/
public void init();
}
public static class TriggerFactory implements Closeable {

View File

@ -81,6 +81,17 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
}
@Override
public void setListener(AutoScaling.TriggerListener<NodeAddedEvent> listener) {
listenerRef.set(listener);

View File

@ -82,6 +82,17 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
}
@Override
public void init() {
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
for (int i = 0; i < o.size(); i++) {
Map<String, String> map = o.get(i);
actions.get(i).init(map);
}
}
}
@Override
public void setListener(AutoScaling.TriggerListener<NodeLostEvent> listener) {
listenerRef.set(listener);

View File

@ -143,10 +143,7 @@ public class ScheduledTriggers implements Closeable {
return false;
}
});
List<TriggerAction> actions = newTrigger.getActions();
for (TriggerAction action : actions) {
action.init(newTrigger.getProperties());
}
newTrigger.init(); // mark as ready for scheduling
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
}

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.core.CoreContainer;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -36,6 +38,9 @@ import org.junit.Test;
* Test for {@link NodeAddedTrigger}
*/
public class NodeAddedTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerListener<NodeAddedTrigger.NodeAddedEvent> noFirstRunListener = event -> {
fail("Did not expect the listener to fire on first run!");
@ -49,6 +54,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
.configure();
}
@Before
public void beforeTest() throws Exception {
actionConstructorCalled = new AtomicBoolean(false);
actionInitCalled = new AtomicBoolean(false);
actionCloseCalled = new AtomicBoolean(false);
}
@Test
public void testTrigger() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
@ -123,6 +135,56 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
}
}
public void testActionLifecycle() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
Map<String, String> action = new HashMap<>(2);
action.put("name", "testActionInit");
action.put("class", NodeAddedTriggerTest.AssertInitTriggerAction.class.getName());
actions.add(action);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
assertEquals(true, actionConstructorCalled.get());
assertEquals(false, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
trigger.init();
assertEquals(true, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
}
assertEquals(true, actionCloseCalled.get());
}
public static class AssertInitTriggerAction implements TriggerAction {
public AssertInitTriggerAction() {
actionConstructorCalled.set(true);
}
@Override
public String getName() {
return "";
}
@Override
public String getClassName() {
return getClass().getName();
}
@Override
public void process(AutoScaling.TriggerEvent event) {
}
@Override
public void close() throws IOException {
actionCloseCalled.compareAndSet(false, true);
}
@Override
public void init(Map<String, String> args) {
actionInitCalled.compareAndSet(false, true);
}
}
@Test
public void testListenerAcceptance() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -28,7 +29,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -36,6 +39,9 @@ import org.junit.Test;
* Test for {@link NodeLostTrigger}
*/
public class NodeLostTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerListener<NodeLostTrigger.NodeLostEvent> noFirstRunListener = event -> {
fail("Did not expect the listener to fire on first run!");
@ -49,6 +55,13 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
.configure();
}
@Before
public void beforeTest() throws Exception {
actionConstructorCalled = new AtomicBoolean(false);
actionInitCalled = new AtomicBoolean(false);
actionCloseCalled = new AtomicBoolean(false);
}
@Test
public void testTrigger() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
@ -136,6 +149,56 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
}
public void testActionLifecycle() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
Map<String, String> action = new HashMap<>(2);
action.put("name", "testActionInit");
action.put("class", AssertInitTriggerAction.class.getName());
actions.add(action);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
assertEquals(true, actionConstructorCalled.get());
assertEquals(false, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
trigger.init();
assertEquals(true, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
}
assertEquals(true, actionCloseCalled.get());
}
public static class AssertInitTriggerAction implements TriggerAction {
public AssertInitTriggerAction() {
actionConstructorCalled.set(true);
}
@Override
public String getName() {
return "";
}
@Override
public String getClassName() {
return getClass().getName();
}
@Override
public void process(AutoScaling.TriggerEvent event) {
}
@Override
public void close() throws IOException {
actionCloseCalled.compareAndSet(false, true);
}
@Override
public void init(Map<String, String> args) {
actionInitCalled.compareAndSet(false, true);
}
}
@Test
public void testListenerAcceptance() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
@ -196,7 +259,17 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
trigger.setListener(noFirstRunListener);
trigger.run();
newNode.stop();
// stop the newly created node
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
for (int i = 0; i < jettySolrRunners.size(); i++) {
JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
if (newNode == jettySolrRunner) {
cluster.stopJettySolrRunner(i);
break;
}
}
trigger.run(); // this run should detect the lost node
trigger.close(); // close the old trigger

View File

@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@ -40,7 +39,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
@ -58,7 +56,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P
public class TriggerIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static CountDownLatch actionCreated;
private static CountDownLatch actionConstructorCalled;
private static CountDownLatch actionInitCalled;
private static CountDownLatch triggerFiredLatch;
private static int waitForSeconds = 1;
private static AtomicBoolean triggerFired;
@ -76,7 +75,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Before
public void setupTest() throws Exception {
waitForSeconds = 1 + random().nextInt(3);
actionCreated = new CountDownLatch(1);
actionConstructorCalled = new CountDownLatch(1);
actionInitCalled = new CountDownLatch(1);
triggerFiredLatch = new CountDownLatch(1);
triggerFired = new AtomicBoolean(false);
eventRef = new AtomicReference<>();
@ -96,7 +96,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Test
public void testTriggerThrottling() throws Exception {
// for this test we want to create two triggers so we must assert that the actions were created twice
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
// similarly we want both triggers to fire
triggerFiredLatch = new CountDownLatch(2);
@ -129,7 +129,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// wait until the two instances of action are created
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("Two TriggerAction instances should have been created by now");
}
@ -141,7 +141,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// reset shared state
lastActionExecutedAt.set(0);
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
triggerFiredLatch = new CountDownLatch(2);
setTriggerCommand = "{" +
@ -169,7 +169,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// wait until the two instances of action are created
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("Two TriggerAction instances should have been created by now");
}
@ -231,7 +231,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Test
public void testNodeLostTriggerRestoreState() throws Exception {
// for this test we want to update the trigger so we must assert that the actions were created twice
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
// start a new node
JettySolrRunner newNode = cluster.startJettySolrRunner();
@ -252,10 +252,10 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS);
while (actionCreated.getCount() == 0 && !timeOut.hasTimedOut()) {
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
Thread.sleep(200);
}
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds",actionCreated.getCount() > 0);
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
int index = -1;
@ -283,7 +283,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// wait until the second instance of action is created
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("Two TriggerAction instances should have been created by now");
}
@ -299,7 +299,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Test
public void testNodeAddedTriggerRestoreState() throws Exception {
// for this test we want to update the trigger so we must assert that the actions were created twice
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
CloudSolrClient solrClient = cluster.getSolrClient();
waitForSeconds = 5;
@ -316,10 +316,10 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS);
while (actionCreated.getCount() == 0 && !timeOut.hasTimedOut()) {
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
Thread.sleep(200);
}
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds",actionCreated.getCount() > 0);
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
// start a new node
JettySolrRunner newNode = cluster.startJettySolrRunner();
@ -341,7 +341,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// wait until the second instance of action is created
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("Two TriggerAction instances should have been created by now");
}
@ -369,7 +369,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
@ -381,6 +381,30 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getNodeName());
// reset
actionConstructorCalled = new CountDownLatch(1);
actionInitCalled = new CountDownLatch(1);
// update the trigger with exactly the same data
setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// this should be a no-op so the action should have been created but init should not be called
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
}
@Test
@ -407,7 +431,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
@ -420,6 +444,30 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull(nodeLostEvent);
assertEquals("The node lost trigger was fired but for a different node",
lostNodeName, nodeLostEvent.getNodeName());
// reset
actionConstructorCalled = new CountDownLatch(1);
actionInitCalled = new CountDownLatch(1);
// update the trigger with exactly the same data
setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_trigger'," +
"'event' : 'nodeLost'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// this should be a no-op so the action should have been created but init should not be called
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
}
@Test
@ -453,7 +501,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
if (!actionCreated.await(3, TimeUnit.SECONDS)) {
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
fail("The TriggerAction should have been created by now");
}
@ -473,7 +521,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
public static class TestTriggerAction implements TriggerAction {
public TestTriggerAction() {
actionConstructorCalled.countDown();
}
@Override
@ -507,7 +555,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void init(Map<String, String> args) {
log.info("TestTriggerAction init");
actionCreated.countDown();
actionInitCalled.countDown();
}
}
}