mirror of https://github.com/apache/lucene.git
SOLR-11407: Make sure that the .system collection is not affected by cluster changes.
Wait specifically for listeners to run.
This commit is contained in:
parent
ca22f17662
commit
26e7af1015
|
@ -18,10 +18,13 @@ package org.apache.solr.handler.admin;
|
|||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
|
@ -31,6 +34,7 @@ import org.apache.solr.cloud.autoscaling.ActionContext;
|
|||
import org.apache.solr.cloud.autoscaling.SystemLogListener;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -50,30 +54,53 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper=DEBUG")
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch actionFiredLatch;
|
||||
private static CountDownLatch listenerFiredLatch;
|
||||
private static CloudSolrClient solrClient;
|
||||
private static String PREFIX = AutoscalingHistoryHandlerTest.class.getSimpleName();
|
||||
private static String COLL_NAME = PREFIX + "_collection";
|
||||
private static String systemCollNode;
|
||||
|
||||
private static CountDownLatch getActionFiredLatch() {
|
||||
return actionFiredLatch;
|
||||
}
|
||||
|
||||
private static CountDownLatch getListenerFiredLatch() {
|
||||
return listenerFiredLatch;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(3)
|
||||
configureCluster(4)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
solrClient = cluster.getSolrClient();
|
||||
CollectionAdminRequest.createCollection(PREFIX + "_collection", null, 1, 3)
|
||||
// create the system collection and test collection on different nodes, to avoid
|
||||
// any interference from .system replicas being moved around.
|
||||
systemCollNode = cluster.getJettySolrRunner(0).getNodeName();
|
||||
CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 1)
|
||||
.setCreateNodeSet(systemCollNode)
|
||||
.process(solrClient);
|
||||
CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 3)
|
||||
Set<String> otherNodes = cluster.getJettySolrRunners().stream().map(JettySolrRunner::getNodeName)
|
||||
.collect(Collectors.toSet());
|
||||
otherNodes.remove(systemCollNode);
|
||||
CollectionAdminRequest.createCollection(COLL_NAME, null, 1, 3)
|
||||
.setCreateNodeSet(String.join(",", otherNodes))
|
||||
.process(solrClient);
|
||||
}
|
||||
|
||||
public static class TesterListener extends TriggerListenerBase {
|
||||
|
||||
@Override
|
||||
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
|
||||
getListenerFiredLatch().countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public static class TesterAction extends TriggerActionBase {
|
||||
|
||||
@Override
|
||||
|
@ -85,6 +112,7 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
actionFiredLatch = new CountDownLatch(1);
|
||||
listenerFiredLatch = new CountDownLatch(1);
|
||||
|
||||
// first trigger
|
||||
String setTriggerCommand = "{" +
|
||||
|
@ -152,6 +180,19 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'node_added1'," +
|
||||
"'trigger' : '" + PREFIX + "_node_added_trigger'," +
|
||||
"'afterAction' : ['test']," +
|
||||
"'class' : '" + TesterListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
|
@ -166,19 +207,32 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'node_lost1'," +
|
||||
"'trigger' : '" + PREFIX + "_node_lost_trigger'," +
|
||||
"'afterAction' : ['test']," +
|
||||
"'class' : '" + TesterListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
}
|
||||
|
||||
private void resetLatch() {
|
||||
private void resetLatches() {
|
||||
actionFiredLatch = new CountDownLatch(1);
|
||||
listenerFiredLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHistory() throws Exception {
|
||||
waitForState("Timed out wait for collection be active", PREFIX + "_collection",
|
||||
waitForState("Timed out wait for collection be active", COLL_NAME,
|
||||
clusterShape(1, 3));
|
||||
waitForState("Timed out wait for collection be active", CollectionAdminParams.SYSTEM_COLL,
|
||||
clusterShape(1, 3));
|
||||
clusterShape(1, 1));
|
||||
|
||||
log.info("### Start add node...");
|
||||
JettySolrRunner jetty = cluster.startJettySolrRunner();
|
||||
|
@ -186,11 +240,18 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
log.info("### Added node " + nodeAddedName);
|
||||
boolean await = actionFiredLatch.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("action did not execute", await);
|
||||
|
||||
await = listenerFiredLatch.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("listener did not execute", await);
|
||||
|
||||
waitForRecovery(COLL_NAME);
|
||||
|
||||
// commit on the history collection
|
||||
Thread.sleep(2000);
|
||||
Thread.sleep(5000);
|
||||
log.info("### Commit .system");
|
||||
solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
|
||||
Thread.sleep(2000);
|
||||
Thread.sleep(5000);
|
||||
|
||||
// verify that new docs exist
|
||||
ModifiableSolrParams query = params(CommonParams.Q, "type:" + SystemLogListener.DOC_TYPE,
|
||||
CommonParams.FQ, "event.source_s:" + PREFIX + "_node_added_trigger");
|
||||
|
@ -239,29 +300,56 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
assertEquals("AFTER_ACTION", docs.get(1).getFieldValue("stage_s"));
|
||||
|
||||
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||
AutoscalingHistoryHandler.COLLECTION_PARAM, CollectionAdminParams.SYSTEM_COLL);
|
||||
docs = solrClient.query(query).getResults();
|
||||
AutoscalingHistoryHandler.COLLECTION_PARAM, COLL_NAME);
|
||||
rsp = solrClient.query(query);
|
||||
docs = rsp.getResults();
|
||||
if (docs.size() != 5) {
|
||||
log.info("Cluster state: " + solrClient.getZkStateReader().getClusterState());
|
||||
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH);
|
||||
log.info("Wrong response: ", rsp);
|
||||
log.info("Full response: " + solrClient.query(query));
|
||||
}
|
||||
assertEquals(5, docs.size());
|
||||
assertEquals("AFTER_ACTION", docs.get(0).getFieldValue("stage_s"));
|
||||
assertEquals("compute_plan", docs.get(0).getFieldValue("action_s"));
|
||||
|
||||
// reset latch
|
||||
resetLatch();
|
||||
// reset latches
|
||||
resetLatches();
|
||||
|
||||
// kill a node
|
||||
String node0Name = cluster.getJettySolrRunner(0).getNodeName();
|
||||
log.info("### Stopping node " + node0Name);
|
||||
cluster.stopJettySolrRunner(0);
|
||||
log.info("### Stopped node " + node0Name);
|
||||
// kill a node where a replica exists
|
||||
ClusterState state = cluster.getSolrClient().getZkStateReader().getClusterState();
|
||||
DocCollection coll = state.getCollection(COLL_NAME);
|
||||
String nodeToKill = null;
|
||||
for (Replica r : coll.getReplicas()) {
|
||||
if (r.isActive(state.getLiveNodes())) {
|
||||
nodeToKill = r.getNodeName();
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull("no suitable node found", nodeToKill);
|
||||
log.info("### Stopping node " + nodeToKill);
|
||||
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
|
||||
if (cluster.getJettySolrRunner(i).getNodeName().equals(nodeToKill)) {
|
||||
cluster.stopJettySolrRunner(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
log.info("### Stopped node " + nodeToKill);
|
||||
await = actionFiredLatch.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("action did not execute", await);
|
||||
|
||||
await = listenerFiredLatch.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("listener did not execute", await);
|
||||
|
||||
// wait for recovery
|
||||
waitForRecovery(PREFIX + "_collection");
|
||||
waitForRecovery(COLL_NAME);
|
||||
|
||||
Thread.sleep(5000);
|
||||
// commit on the history collection
|
||||
log.info("### Commit .system");
|
||||
solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
|
||||
Thread.sleep(5000);
|
||||
|
||||
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||
AutoscalingHistoryHandler.TRIGGER_PARAM, PREFIX + "_node_lost_trigger");
|
||||
docs = solrClient.query(query).getResults();
|
||||
|
@ -269,8 +357,16 @@ public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
|
|||
|
||||
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH,
|
||||
AutoscalingHistoryHandler.TRIGGER_PARAM, PREFIX + "_node_lost_trigger",
|
||||
AutoscalingHistoryHandler.COLLECTION_PARAM, PREFIX + "_collection");
|
||||
docs = solrClient.query(query).getResults();
|
||||
AutoscalingHistoryHandler.COLLECTION_PARAM, COLL_NAME);
|
||||
rsp = solrClient.query(query);
|
||||
docs = rsp.getResults();
|
||||
if (docs.size() != 5) {
|
||||
log.info("Cluster state: " + solrClient.getZkStateReader().getClusterState());
|
||||
query = params(CommonParams.QT, CommonParams.AUTOSCALING_HISTORY_PATH);
|
||||
log.info("Wrong response: ", rsp);
|
||||
log.info("Full response: " + solrClient.query(query));
|
||||
}
|
||||
|
||||
assertEquals(5, docs.size());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue