mirror of https://github.com/apache/lucene.git
SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability
This commit is contained in:
parent
23707314dd
commit
ed9e5eb75b
|
@ -131,6 +131,8 @@ Other Changes
|
|||
|
||||
* SOLR-12118: Solr Ref-Guide can now use some ivy version props directly as attributes in content (hossman)
|
||||
|
||||
* SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin)
|
||||
|
||||
================== 7.3.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.metrics.SolrCoreMetricManager;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||
|
||||
/**
|
||||
* Integration test for {@link MetricTrigger}
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
public class MetricTriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||
static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static int waitForSeconds = 1;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetricTrigger() throws Exception {
|
||||
cluster.waitForAllNodes(5);
|
||||
|
||||
String collectionName = "testMetricTrigger";
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||
"conf", 2, 2).setMaxShardsPerNode(2);
|
||||
create.process(solrClient);
|
||||
solrClient.setDefaultCollection(collectionName);
|
||||
|
||||
waitForState("Timed out waiting for collection:" + collectionName + " to become active", collectionName, clusterShape(2, 2));
|
||||
|
||||
DocCollection docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
String shardId = "shard1";
|
||||
Replica replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
|
||||
String coreName = replica.getCoreName();
|
||||
String replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
|
||||
long waitForSeconds = 2 + random().nextInt(5);
|
||||
String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
|
||||
String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
|
||||
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'metric_trigger'," +
|
||||
"'event' : 'metric'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'metric': '" + tag + "'" +
|
||||
"'above' : 100.0," +
|
||||
"'collection': '" + collectionName + "'" +
|
||||
"'shard':'" + shardId + "'" +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand1 = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'srt'," +
|
||||
"'trigger' : 'metric_trigger'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'afterAction': ['compute', 'execute', 'test']," +
|
||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// start more nodes so that we have at least 4
|
||||
for (int i = cluster.getJettySolrRunners().size(); i < 4; i++) {
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
cluster.waitForAllNodes(10);
|
||||
|
||||
List<SolrInputDocument> docs = new ArrayList<>(500);
|
||||
for (int i = 0; i < 500; i++) {
|
||||
docs.add(new SolrInputDocument("id", String.valueOf(i), "x_s", "x" + i));
|
||||
}
|
||||
solrClient.add(docs);
|
||||
solrClient.commit();
|
||||
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
|
||||
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||
long now = timeSource.getTimeNs();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
assertEquals(collectionName, ev.event.getProperties().get("collection"));
|
||||
|
||||
// find a new replica and create its metric name
|
||||
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
|
||||
coreName = replica.getCoreName();
|
||||
replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
|
||||
registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
|
||||
tag = "metrics:" + registry + ":INDEX.sizeInBytes";
|
||||
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
listenerEvents.clear();
|
||||
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'metric_trigger'," +
|
||||
"'event' : 'metric'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'metric': '" + tag + "'" +
|
||||
"'above' : 100.0," +
|
||||
"'collection': '" + collectionName + "'" +
|
||||
"'shard':'" + shardId + "'" +
|
||||
"'preferredOperation':'addreplica'" +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
|
||||
ev = listenerEvents.get("srt").get(0);
|
||||
now = timeSource.getTimeNs();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
assertEquals(collectionName, ev.event.getProperties().get("collection"));
|
||||
docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||
assertEquals(5, docCollection.getReplicas().size());
|
||||
}
|
||||
|
||||
public static class MetricAction extends TriggerActionBase {
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
try {
|
||||
events.add(event);
|
||||
long currentTimeNanos = timeSource.getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
triggerFiredLatch.countDown();
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestTriggerListener extends TriggerListenerBase {
|
||||
@Override
|
||||
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||
super.init(cloudManager, config);
|
||||
listenerCreated.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||
ActionContext context, Throwable error, String message) {
|
||||
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,300 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch actionConstructorCalled;
|
||||
private static CountDownLatch actionInitCalled;
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static int waitForSeconds = 1;
|
||||
private static AtomicBoolean triggerFired;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
private static SolrCloudManager cloudManager;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
}
|
||||
|
||||
private static CountDownLatch getTriggerFiredLatch() {
|
||||
return triggerFiredLatch;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
// ensure that exactly 2 jetty nodes are running
|
||||
int numJetties = cluster.getJettySolrRunners().size();
|
||||
log.info("Found {} jetty instances running", numJetties);
|
||||
for (int i = 2; i < numJetties; i++) {
|
||||
int r = random().nextInt(cluster.getJettySolrRunners().size());
|
||||
log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
|
||||
cluster.stopJettySolrRunner(r);
|
||||
}
|
||||
for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
|
||||
// start jetty instances
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
cluster.waitForAllNodes(5);
|
||||
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int overseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
|
||||
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||
// aggressively remove all active scheduled triggers
|
||||
scheduledTriggers.removeAll();
|
||||
|
||||
// clear any persisted auto scaling configuration
|
||||
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||
|
||||
cluster.deleteAllCollections();
|
||||
cluster.getSolrClient().setDefaultCollection(null);
|
||||
|
||||
// restart Overseer. Even though we reset the autoscaling config some already running
|
||||
// trigger threads may still continue to execute and produce spurious events
|
||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
Thread.sleep(5000);
|
||||
|
||||
waitForSeconds = 1 + random().nextInt(3);
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
actionInitCalled = new CountDownLatch(1);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired = new AtomicBoolean(false);
|
||||
events.clear();
|
||||
|
||||
while (cluster.getJettySolrRunners().size() < 2) {
|
||||
// perhaps a test stopped a node but didn't start it back
|
||||
// lets start a node
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
// clear any events or markers
|
||||
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
}
|
||||
|
||||
private void deleteChildrenRecursively(String path) throws Exception {
|
||||
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeAddedTriggerRestoreState() throws Exception {
|
||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||
actionInitCalled = new CountDownLatch(2);
|
||||
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
waitForSeconds = 5;
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_restore_trigger'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
|
||||
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
||||
Thread.sleep(200);
|
||||
}
|
||||
assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
||||
|
||||
// start a new node
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
|
||||
// ensure that the old trigger sees the new node, todo find a better way to do this
|
||||
Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
||||
|
||||
waitForSeconds = 0;
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_restore_trigger'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// wait until the second instance of action is created
|
||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("Two TriggerAction instances should have been created by now");
|
||||
}
|
||||
|
||||
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||
assertTrue(nodeNames.contains(newNode.getNodeName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeAddedTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
||||
assertNotNull(nodeAddedEvent);
|
||||
List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||
assertTrue(nodeNames.contains(newNode.getNodeName()));
|
||||
|
||||
// reset
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
actionInitCalled = new CountDownLatch(1);
|
||||
|
||||
// update the trigger with exactly the same data
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// this should be a no-op so the action should have been created but init should not be called
|
||||
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public static class TestTriggerAction extends TriggerActionBase {
|
||||
|
||||
public TestTriggerAction() {
|
||||
actionConstructorCalled.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||
try {
|
||||
if (triggerFired.compareAndSet(false, true)) {
|
||||
events.add(event);
|
||||
long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
getTriggerFiredLatch().countDown();
|
||||
} else {
|
||||
fail(event.source + " was fired more than once!");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> args) {
|
||||
log.info("TestTriggerAction init");
|
||||
actionInitCalled.countDown();
|
||||
super.init(args);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,322 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch actionConstructorCalled;
|
||||
private static CountDownLatch actionInitCalled;
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static int waitForSeconds = 1;
|
||||
private static AtomicBoolean triggerFired;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
private static SolrCloudManager cloudManager;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
}
|
||||
|
||||
private static CountDownLatch getTriggerFiredLatch() {
|
||||
return triggerFiredLatch;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
// ensure that exactly 2 jetty nodes are running
|
||||
int numJetties = cluster.getJettySolrRunners().size();
|
||||
log.info("Found {} jetty instances running", numJetties);
|
||||
for (int i = 2; i < numJetties; i++) {
|
||||
int r = random().nextInt(cluster.getJettySolrRunners().size());
|
||||
log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
|
||||
cluster.stopJettySolrRunner(r);
|
||||
}
|
||||
for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
|
||||
// start jetty instances
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
cluster.waitForAllNodes(5);
|
||||
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int overseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
|
||||
ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
|
||||
// aggressively remove all active scheduled triggers
|
||||
scheduledTriggers.removeAll();
|
||||
|
||||
// clear any persisted auto scaling configuration
|
||||
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||
|
||||
cluster.deleteAllCollections();
|
||||
cluster.getSolrClient().setDefaultCollection(null);
|
||||
|
||||
// restart Overseer. Even though we reset the autoscaling config some already running
|
||||
// trigger threads may still continue to execute and produce spurious events
|
||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
Thread.sleep(5000);
|
||||
|
||||
waitForSeconds = 1 + random().nextInt(3);
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
actionInitCalled = new CountDownLatch(1);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired = new AtomicBoolean(false);
|
||||
events.clear();
|
||||
|
||||
while (cluster.getJettySolrRunners().size() < 2) {
|
||||
// perhaps a test stopped a node but didn't start it back
|
||||
// lets start a node
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
// clear any events or markers
|
||||
// todo: consider the impact of such cleanup on regular cluster restarts
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
}
|
||||
|
||||
private void deleteChildrenRecursively(String path) throws Exception {
|
||||
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLostTriggerRestoreState() throws Exception {
|
||||
// for this test we want to update the trigger so we must assert that the actions were created twice
|
||||
actionInitCalled = new CountDownLatch(2);
|
||||
|
||||
// start a new node
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
String nodeName = newNode.getNodeName();
|
||||
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
waitForSeconds = 5;
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_restore_trigger'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '5s'," + // should be enough for us to update the trigger
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
|
||||
while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
|
||||
Thread.sleep(200);
|
||||
}
|
||||
assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
|
||||
|
||||
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
|
||||
int index = -1;
|
||||
for (int i = 0; i < jettySolrRunners.size(); i++) {
|
||||
JettySolrRunner runner = jettySolrRunners.get(i);
|
||||
if (runner == newNode) index = i;
|
||||
}
|
||||
assertFalse(index == -1);
|
||||
cluster.stopJettySolrRunner(index);
|
||||
|
||||
// ensure that the old trigger sees the stopped node, todo find a better way to do this
|
||||
Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
|
||||
|
||||
waitForSeconds = 0;
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_restore_trigger'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// wait until the second instance of action is created
|
||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("Two TriggerAction instances should have been created by now");
|
||||
}
|
||||
|
||||
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
|
||||
assertNotNull(nodeLostEvent);
|
||||
List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||
assertTrue(nodeNames.contains(nodeName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeLostTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_trigger'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int nonOverseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (!jetty.getNodeName().equals(overseerLeader)) {
|
||||
nonOverseerLeaderIndex = i;
|
||||
}
|
||||
}
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
triggerFired.set(false);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
|
||||
cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
|
||||
assertNotNull(nodeLostEvent);
|
||||
List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
|
||||
assertTrue(nodeNames.contains(lostNodeName));
|
||||
|
||||
// reset
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
actionInitCalled = new CountDownLatch(1);
|
||||
|
||||
// update the trigger with exactly the same data
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_trigger'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// this should be a no-op so the action should have been created but init should not be called
|
||||
if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
|
||||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public static class TestTriggerAction extends TriggerActionBase {
|
||||
|
||||
public TestTriggerAction() {
|
||||
actionConstructorCalled.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||
try {
|
||||
if (triggerFired.compareAndSet(false, true)) {
|
||||
events.add(event);
|
||||
long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
getTriggerFiredLatch().countDown();
|
||||
} else {
|
||||
fail(event.source + " was fired more than once!");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> args) {
|
||||
log.info("TestTriggerAction init");
|
||||
actionInitCalled.countDown();
|
||||
super.init(args);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,269 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.LiveNodesListener;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
|
||||
public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch actionInitCalled;
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static CountDownLatch actionConstructorCalled;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
private static ZkStateReader zkStateReader;
|
||||
private static ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
}
|
||||
|
||||
private static CountDownLatch getTriggerFiredLatch() {
|
||||
return triggerFiredLatch;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeMarkersRegistration() throws Exception {
|
||||
// for this test we want to create two triggers so we must assert that the actions were created twice
|
||||
actionInitCalled = new CountDownLatch(2);
|
||||
// similarly we want both triggers to fire
|
||||
triggerFiredLatch = new CountDownLatch(2);
|
||||
actionConstructorCalled = new CountDownLatch(1);
|
||||
TestLiveNodesListener listener = registerLiveNodesListener();
|
||||
|
||||
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
String overseerLeader = (String) overSeerStatus.get("leader");
|
||||
int overseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// add a node
|
||||
JettySolrRunner node = cluster.startJettySolrRunner();
|
||||
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("onChange listener didn't execute on cluster change");
|
||||
}
|
||||
assertEquals(1, listener.addedNodes.size());
|
||||
assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
|
||||
// verify that a znode doesn't exist (no trigger)
|
||||
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
|
||||
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
|
||||
listener.reset();
|
||||
// stop overseer
|
||||
log.info("====== KILL OVERSEER 1");
|
||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("onChange listener didn't execute on cluster change");
|
||||
}
|
||||
assertEquals(1, listener.lostNodes.size());
|
||||
assertEquals(overseerLeader, listener.lostNodes.iterator().next());
|
||||
assertEquals(0, listener.addedNodes.size());
|
||||
// wait until the new overseer is up
|
||||
Thread.sleep(5000);
|
||||
// verify that a znode does NOT exist - there's no nodeLost trigger,
|
||||
// so the new overseer cleaned up existing nodeLost markers
|
||||
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
|
||||
assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
|
||||
|
||||
listener.reset();
|
||||
|
||||
// set up triggers
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
||||
log.info("====== ADD TRIGGERS");
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_triggerMR'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '1s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_triggerMR'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '1s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
|
||||
overseerLeader = (String) overSeerStatus.get("leader");
|
||||
overseerLeaderIndex = 0;
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
|
||||
if (jetty.getNodeName().equals(overseerLeader)) {
|
||||
overseerLeaderIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// create another node
|
||||
log.info("====== ADD NODE 1");
|
||||
JettySolrRunner node1 = cluster.startJettySolrRunner();
|
||||
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("onChange listener didn't execute on cluster change");
|
||||
}
|
||||
assertEquals(1, listener.addedNodes.size());
|
||||
assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
|
||||
// verify that a znode exists
|
||||
pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
|
||||
assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
|
||||
|
||||
Thread.sleep(5000);
|
||||
// nodeAdded marker should be consumed now by nodeAdded trigger
|
||||
assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
|
||||
|
||||
listener.reset();
|
||||
events.clear();
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
// kill overseer again
|
||||
log.info("====== KILL OVERSEER 2");
|
||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("onChange listener didn't execute on cluster change");
|
||||
}
|
||||
|
||||
|
||||
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
|
||||
fail("Trigger should have fired by now");
|
||||
}
|
||||
assertEquals(1, events.size());
|
||||
TriggerEvent ev = events.iterator().next();
|
||||
List<String> nodeNames = (List<String>) ev.getProperty(TriggerEvent.NODE_NAMES);
|
||||
assertTrue(nodeNames.contains(overseerLeader));
|
||||
assertEquals(TriggerEventType.NODELOST, ev.getEventType());
|
||||
}
|
||||
|
||||
private TestLiveNodesListener registerLiveNodesListener() {
|
||||
TestLiveNodesListener listener = new TestLiveNodesListener();
|
||||
zkStateReader.registerLiveNodesListener(listener);
|
||||
return listener;
|
||||
}
|
||||
|
||||
private static class TestLiveNodesListener implements LiveNodesListener {
|
||||
Set<String> lostNodes = new HashSet<>();
|
||||
Set<String> addedNodes = new HashSet<>();
|
||||
CountDownLatch onChangeLatch = new CountDownLatch(1);
|
||||
|
||||
public void reset() {
|
||||
lostNodes.clear();
|
||||
addedNodes.clear();
|
||||
onChangeLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
|
||||
onChangeLatch.countDown();
|
||||
Set<String> old = new HashSet<>(oldLiveNodes);
|
||||
old.removeAll(newLiveNodes);
|
||||
if (!old.isEmpty()) {
|
||||
lostNodes.addAll(old);
|
||||
}
|
||||
newLiveNodes.removeAll(oldLiveNodes);
|
||||
if (!newLiveNodes.isEmpty()) {
|
||||
addedNodes.addAll(newLiveNodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestEventMarkerAction extends TriggerActionBase {
|
||||
|
||||
public TestEventMarkerAction() {
|
||||
actionConstructorCalled.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||
boolean locked = lock.tryLock();
|
||||
if (!locked) {
|
||||
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
events.add(event);
|
||||
getTriggerFiredLatch().countDown();
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> args) {
|
||||
log.info("TestEventMarkerAction init");
|
||||
actionInitCalled.countDown();
|
||||
super.init(args);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
|
||||
/**
|
||||
* Integration test for {@link ScheduledTrigger}
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
|
||||
public class ScheduledTriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch triggerFiredLatch;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduledTrigger() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
||||
// this collection will place 2 cores on 1st node and 1 core on 2nd node
|
||||
String collectionName = "testScheduledTrigger";
|
||||
CollectionAdminRequest.createCollection(collectionName, 1, 3)
|
||||
.setMaxShardsPerNode(5).process(solrClient);
|
||||
waitForState("", collectionName, clusterShape(1, 3));
|
||||
|
||||
// create a policy which allows only 1 core per node thereby creating a violation for the above collection
|
||||
String setClusterPolicy = "{\n" +
|
||||
" \"set-cluster-policy\" : [\n" +
|
||||
" {\"cores\" : \"<2\", \"node\" : \"#EACH\"}\n" +
|
||||
" ]\n" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// start a new node which can be used to balance the cluster as per policy
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
cluster.waitForAllNodes(10);
|
||||
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'sched_trigger_integration1'," +
|
||||
"'event' : 'scheduled'," +
|
||||
"'startTime' : '" + new Date().toInstant().toString() + "'" +
|
||||
"'every' : '+3SECONDS'" +
|
||||
"'actions' : [" +
|
||||
"{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
|
||||
"]}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(20, TimeUnit.SECONDS));
|
||||
assertEquals(1, events.size());
|
||||
Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
|
||||
assertNotNull(actionContextProps);
|
||||
TriggerEvent event = events.iterator().next();
|
||||
List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
|
||||
assertNotNull(operations);
|
||||
assertEquals(1, operations.size());
|
||||
for (SolrRequest operation : operations) {
|
||||
SolrParams params = operation.getParams();
|
||||
assertEquals(newNode.getNodeName(), params.get("targetNode"));
|
||||
}
|
||||
}
|
||||
|
||||
public static class ContextPropertiesRecorderAction extends TriggerActionBase {
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||
actionContextPropertiesRef.set(actionContext.getProperties());
|
||||
try {
|
||||
events.add(event);
|
||||
triggerFiredLatch.countDown();
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,217 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||
|
||||
/**
|
||||
* Integration test for {@link SearchRateTrigger}
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
|
||||
public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||
private static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||
private static int waitForSeconds = 1;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(5)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSearchRate() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String COLL1 = "collection1";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 1, 2);
|
||||
create.process(solrClient);
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'rate' : 1.0," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand1 = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'srt'," +
|
||||
"'trigger' : 'search_rate_trigger'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'afterAction': ['compute', 'execute', 'test']," +
|
||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
SolrParams query = params(CommonParams.Q, "*:*");
|
||||
for (int i = 0; i < 500; i++) {
|
||||
solrClient.query(COLL1, query);
|
||||
}
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(5000);
|
||||
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||
assertEquals(listenerEvents.toString(), 4, events.size());
|
||||
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
|
||||
assertEquals("compute", events.get(0).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
|
||||
assertEquals("execute", events.get(1).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(2).stage.toString());
|
||||
assertEquals("test", events.get(2).actionName);
|
||||
assertEquals("SUCCEEDED", events.get(3).stage.toString());
|
||||
assertNull(events.get(3).actionName);
|
||||
|
||||
CapturedEvent ev = events.get(0);
|
||||
long now = timeSource.getTimeNs();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
|
||||
assertNotNull("nodeRates", nodeRates);
|
||||
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
|
||||
AtomicDouble totalNodeRate = new AtomicDouble();
|
||||
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
|
||||
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
|
||||
assertNotNull("replicaRates", replicaRates);
|
||||
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
|
||||
AtomicDouble totalReplicaRate = new AtomicDouble();
|
||||
replicaRates.forEach(r -> {
|
||||
assertTrue(r.toString(), r.getVariable("rate") != null);
|
||||
totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
|
||||
});
|
||||
Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
|
||||
assertNotNull("shardRates", shardRates);
|
||||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
shardRates = (Map<String, Object>) shardRates.get(COLL1);
|
||||
assertNotNull("shardRates", shardRates);
|
||||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
AtomicDouble totalShardRate = new AtomicDouble();
|
||||
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
|
||||
Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
|
||||
assertNotNull("collectionRates", collectionRates);
|
||||
assertEquals(collectionRates.toString(), 1, collectionRates.size());
|
||||
Double collectionRate = collectionRates.get(COLL1);
|
||||
assertNotNull(collectionRate);
|
||||
assertTrue(collectionRate > 5.0);
|
||||
assertEquals(collectionRate, totalNodeRate.get(), 5.0);
|
||||
assertEquals(collectionRate, totalShardRate.get(), 5.0);
|
||||
assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
|
||||
|
||||
// check operations
|
||||
List<Map<String, Object>> ops = (List<Map<String, Object>>) ev.context.get("properties.operations");
|
||||
assertNotNull(ops);
|
||||
assertTrue(ops.size() > 1);
|
||||
for (Map<String, Object> m : ops) {
|
||||
assertEquals("ADDREPLICA", m.get("params.action"));
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestSearchRateAction extends TriggerActionBase {
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
try {
|
||||
events.add(event);
|
||||
long currentTimeNanos = timeSource.getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
triggerFiredLatch.countDown();
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestTriggerListener extends TriggerListenerBase {
|
||||
@Override
|
||||
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||
super.init(cloudManager, config);
|
||||
listenerCreated.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||
ActionContext context, Throwable error, String message) {
|
||||
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,238 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
public class TriggerCooldownIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||
static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||
static boolean failDummyAction = false;
|
||||
private static CountDownLatch actionConstructorCalled = new CountDownLatch(1);
|
||||
private static CountDownLatch actionInitCalled = new CountDownLatch(1);
|
||||
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||
private static int waitForSeconds = 1;
|
||||
private static AtomicBoolean triggerFired = new AtomicBoolean();
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCooldown() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
failDummyAction = false;
|
||||
waitForSeconds = 1;
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_cooldown_trigger'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [" +
|
||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand1 = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'bar'," +
|
||||
"'trigger' : 'node_added_cooldown_trigger'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
|
||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
listenerCreated = new CountDownLatch(1);
|
||||
listenerEvents.clear();
|
||||
|
||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
assertTrue(triggerFired.get());
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(1000);
|
||||
|
||||
List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
|
||||
// we may get a few IGNORED events if other tests caused events within cooldown period
|
||||
assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
|
||||
long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
|
||||
|
||||
// reset the trigger and captured events
|
||||
listenerEvents.clear();
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired.compareAndSet(true, false);
|
||||
|
||||
JettySolrRunner newNode2 = cluster.startJettySolrRunner();
|
||||
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
|
||||
// there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
|
||||
capturedEvents = listenerEvents.get("bar");
|
||||
assertEquals(capturedEvents.toString(), 1, capturedEvents.size());
|
||||
CapturedEvent ev = capturedEvents.get(0);
|
||||
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
|
||||
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
|
||||
// must be larger than cooldown period
|
||||
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
|
||||
prevTimestamp = ev.timestamp;
|
||||
|
||||
// this also resets the cooldown period
|
||||
long modifiedCooldownPeriodSeconds = 7;
|
||||
String setPropertiesCommand = "{\n" +
|
||||
"\t\"set-properties\" : {\n" +
|
||||
"\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : " + modifiedCooldownPeriodSeconds + "\n" +
|
||||
"\t}\n" +
|
||||
"}";
|
||||
solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
|
||||
response = solrClient.request(req);
|
||||
|
||||
// reset the trigger and captured events
|
||||
listenerEvents.clear();
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired.compareAndSet(true, false);
|
||||
|
||||
JettySolrRunner newNode3 = cluster.startJettySolrRunner();
|
||||
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerFired.compareAndSet(true, false);
|
||||
// add another node
|
||||
JettySolrRunner newNode4 = cluster.startJettySolrRunner();
|
||||
await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(2000);
|
||||
|
||||
// there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
|
||||
capturedEvents = listenerEvents.get("bar");
|
||||
assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
|
||||
// first event should be SUCCEEDED
|
||||
ev = capturedEvents.get(0);
|
||||
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
|
||||
|
||||
ev = capturedEvents.get(capturedEvents.size() - 1);
|
||||
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
|
||||
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
|
||||
// must be larger than the modified cooldown period
|
||||
assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(modifiedCooldownPeriodSeconds));
|
||||
}
|
||||
|
||||
public static class TestTriggerAction extends TriggerActionBase {
|
||||
|
||||
public TestTriggerAction() {
|
||||
actionConstructorCalled.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||
try {
|
||||
if (triggerFired.compareAndSet(false, true)) {
|
||||
events.add(event);
|
||||
long currentTimeNanos = timeSource.getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
triggerFiredLatch.countDown();
|
||||
} else {
|
||||
fail(event.source + " was fired more than once!");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> args) {
|
||||
log.info("TestTriggerAction init");
|
||||
actionInitCalled.countDown();
|
||||
super.init(args);
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestTriggerListener extends TriggerListenerBase {
|
||||
@Override
|
||||
public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
|
||||
super.init(cloudManager, config);
|
||||
listenerCreated.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||
ActionContext context, Throwable error, String message) {
|
||||
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,195 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.junit.BeforeClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(5)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
|
||||
"}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
}
|
||||
|
||||
private static CountDownLatch getTriggerFiredLatch() {
|
||||
return triggerFiredLatch;
|
||||
}
|
||||
|
||||
public void testSetProperties() throws Exception {
|
||||
JettySolrRunner runner = cluster.getJettySolrRunner(0);
|
||||
SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
|
||||
SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
|
||||
AtomicLong diff = new AtomicLong(0);
|
||||
triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference
|
||||
try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
|
||||
AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
|
||||
scheduledTriggers.setAutoScalingConfig(config);
|
||||
scheduledTriggers.add(new TriggerBase(TriggerEventType.NODELOST, "x", Collections.emptyMap(), resourceLoader, solrCloudManager) {
|
||||
@Override
|
||||
protected Map<String, Object> getState() {
|
||||
return Collections.singletonMap("x", "y");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setState(Map<String, Object> state) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(AutoScaling.Trigger old) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (getTriggerFiredLatch().getCount() == 0) return;
|
||||
long l = diff.get();
|
||||
diff.set(timeSource.getTimeNs() - l);
|
||||
getTriggerFiredLatch().countDown();
|
||||
}
|
||||
});
|
||||
assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS));
|
||||
assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0);
|
||||
|
||||
// change schedule delay
|
||||
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
|
||||
scheduledTriggers.setAutoScalingConfig(config);
|
||||
triggerFiredLatch = new CountDownLatch(2);
|
||||
assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
|
||||
assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0);
|
||||
|
||||
// reset with default properties
|
||||
scheduledTriggers.remove("x"); // remove the old trigger
|
||||
config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
|
||||
scheduledTriggers.setAutoScalingConfig(config);
|
||||
|
||||
// test core thread count
|
||||
List<AutoScaling.Trigger> triggerList = new ArrayList<>();
|
||||
final Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
|
||||
final Set<String> triggerNames = Collections.synchronizedSet(new HashSet<>());
|
||||
triggerFiredLatch = new CountDownLatch(8);
|
||||
for (int i = 0; i < 8; i++) {
|
||||
triggerList.add(new MockTrigger(TriggerEventType.NODELOST, "x" + i, Collections.emptyMap(), resourceLoader, solrCloudManager) {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// If core pool size is increased then new threads won't be started if existing threads
|
||||
// aren't busy with tasks. So we make this thread wait longer than necessary
|
||||
// so that the pool is forced to start threads for other triggers
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
if (triggerNames.add(getName())) {
|
||||
getTriggerFiredLatch().countDown();
|
||||
threadNames.add(Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
});
|
||||
scheduledTriggers.add(triggerList.get(i));
|
||||
}
|
||||
assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
|
||||
assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
|
||||
assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE
|
||||
+ " threads but found: " + threadNames,
|
||||
ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size());
|
||||
|
||||
// change core pool size
|
||||
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6));
|
||||
scheduledTriggers.setAutoScalingConfig(config);
|
||||
triggerFiredLatch = new CountDownLatch(8);
|
||||
threadNames.clear();
|
||||
triggerNames.clear();
|
||||
assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
|
||||
assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
|
||||
assertEquals("Expected 6 threads but found: " + threadNames, 6, threadNames.size());
|
||||
|
||||
// reset
|
||||
for (int i = 0; i < 8; i++) {
|
||||
scheduledTriggers.remove(triggerList.get(i).getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockTrigger extends TriggerBase {
|
||||
|
||||
public MockTrigger(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
||||
super(eventType, name, properties, loader, cloudManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getState() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setState(Map<String, Object> state) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(AutoScaling.Trigger old) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue