mirror of https://github.com/apache/lucene.git
SOLR-11911: Make sure all processing is completed before asserting.
This commit is contained in:
parent
f9f5e83745
commit
6624773613
|
@ -60,6 +60,7 @@ import org.apache.solr.common.util.Pair;
|
|||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -83,8 +84,9 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
public static final int NUM_NODES = 100;
|
||||
|
||||
static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
|
||||
static AtomicInteger triggerFiredCount = new AtomicInteger();
|
||||
static CountDownLatch triggerFiredLatch;
|
||||
static AtomicInteger triggerFinishedCount = new AtomicInteger();
|
||||
static AtomicInteger triggerStartedCount = new AtomicInteger();
|
||||
static CountDownLatch triggerFinishedLatch;
|
||||
static int waitForSeconds;
|
||||
|
||||
@BeforeClass
|
||||
|
@ -95,8 +97,9 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
@Before
|
||||
public void setupTest() throws Exception {
|
||||
waitForSeconds = 5;
|
||||
triggerFiredCount.set(0);
|
||||
triggerFiredLatch = new CountDownLatch(1);
|
||||
triggerStartedCount.set(0);
|
||||
triggerFinishedCount.set(0);
|
||||
triggerFinishedLatch = new CountDownLatch(1);
|
||||
listenerEvents.clear();
|
||||
// disable .scheduled_maintenance
|
||||
String suspendTriggerCommand = "{" +
|
||||
|
@ -129,11 +132,18 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TestTriggerAction extends TriggerActionBase {
|
||||
public static class FinishTriggerAction extends TriggerActionBase {
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
triggerFiredCount.incrementAndGet();
|
||||
triggerFiredLatch.countDown();
|
||||
triggerFinishedCount.incrementAndGet();
|
||||
triggerFinishedLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public static class StartTriggerAction extends TriggerActionBase {
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
triggerStartedCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,14 +152,15 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_trigger'," +
|
||||
"'name' : 'node_lost_trigger1'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [" +
|
||||
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
|
@ -160,7 +171,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'foo'," +
|
||||
"'trigger' : 'node_lost_trigger'," +
|
||||
"'trigger' : 'node_lost_trigger1'," +
|
||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
|
||||
"'beforeAction' : ['compute', 'execute']," +
|
||||
"'afterAction' : ['compute', 'execute']," +
|
||||
|
@ -223,6 +234,19 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
// wait until started == finished
|
||||
TimeOut timeOut = new TimeOut(20 * waitForSeconds * NUM_NODES, TimeUnit.SECONDS, cluster.getTimeSource());
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
if (triggerStartedCount.get() == triggerFinishedCount.get()) {
|
||||
break;
|
||||
}
|
||||
timeOut.sleep(1000);
|
||||
}
|
||||
if (timeOut.hasTimedOut()) {
|
||||
fail("did not finish processing all events in time: started=" + triggerStartedCount.get() + ", finished=" + triggerFinishedCount.get());
|
||||
}
|
||||
|
||||
|
||||
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 30 * nodes.size(), TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(5, 15)) + "ms");
|
||||
long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
|
||||
|
@ -238,14 +262,15 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_added_trigger'," +
|
||||
"'name' : 'node_added_trigger2'," +
|
||||
"'event' : 'nodeAdded'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [" +
|
||||
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
|
@ -263,20 +288,34 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
log.info("Ready after " + CloudTestUtils.waitForState(cluster, collectionName, 20 * NUM_NODES, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(NUM_NODES / 10, NUM_NODES / 8 * 3)) + " ms");
|
||||
|
||||
// start adding nodes
|
||||
int numAddNode = NUM_NODES / 5;
|
||||
List<String> addNodesList = new ArrayList<>(numAddNode);
|
||||
for (int i = 0; i < numAddNode; i++) {
|
||||
addNodesList.add(cluster.simAddNode());
|
||||
cluster.getTimeSource().sleep(5000);
|
||||
}
|
||||
boolean await = triggerFiredLatch.await(1000000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
// wait until at least one event is generated
|
||||
boolean await = triggerFinishedLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("trigger did not fire", await);
|
||||
|
||||
// wait until started == finished
|
||||
TimeOut timeOut = new TimeOut(20 * waitForSeconds * NUM_NODES, TimeUnit.SECONDS, cluster.getTimeSource());
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
if (triggerStartedCount.get() == triggerFinishedCount.get()) {
|
||||
break;
|
||||
}
|
||||
timeOut.sleep(1000);
|
||||
}
|
||||
if (timeOut.hasTimedOut()) {
|
||||
fail("did not finish processing all events in time: started=" + triggerStartedCount.get() + ", finished=" + triggerFinishedCount.get());
|
||||
}
|
||||
|
||||
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
|
||||
int startedEventPos = -1;
|
||||
for (int i = 0; i < systemColl.size(); i++) {
|
||||
SolrInputDocument d = systemColl.get(i);
|
||||
if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
|
||||
if (!"node_added_trigger2".equals(d.getFieldValue("event.source_s"))) {
|
||||
continue;
|
||||
}
|
||||
if ("NODEADDED".equals(d.getFieldValue("event.type_s")) &&
|
||||
|
@ -298,13 +337,13 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
SolrInputDocument finishedEvent = null;
|
||||
long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
|
||||
while (count-- > 0) {
|
||||
cluster.getTimeSource().sleep(150000);
|
||||
cluster.getTimeSource().sleep(10000);
|
||||
long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
|
||||
if (currentNumOps == lastNumOps) {
|
||||
int size = systemColl.size() - 1;
|
||||
for (int i = size; i > lastIgnoredPos; i--) {
|
||||
SolrInputDocument d = systemColl.get(i);
|
||||
if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
|
||||
if (!"node_added_trigger2".equals(d.getFieldValue("event.source_s"))) {
|
||||
continue;
|
||||
}
|
||||
if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
|
||||
|
@ -407,14 +446,15 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
SolrClient solrClient = cluster.simGetSolrClient();
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'node_lost_trigger'," +
|
||||
"'name' : 'node_lost_trigger3'," +
|
||||
"'event' : 'nodeLost'," +
|
||||
"'waitFor' : '" + waitFor + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [" +
|
||||
"{'name':'start','class':'" + StartTriggerAction.class.getName() + "'}," +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
|
@ -441,8 +481,8 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
cluster.simRemoveNode(nodes.get(i), false);
|
||||
cluster.getTimeSource().sleep(killDelay);
|
||||
}
|
||||
// wait for the trigger to fire
|
||||
boolean await = triggerFiredLatch.await(20 * waitFor * 1000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
// wait for the trigger to fire at least once
|
||||
boolean await = triggerFinishedLatch.await(20 * waitFor * 1000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("trigger did not fire within timeout, " +
|
||||
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
|
||||
await);
|
||||
|
@ -450,7 +490,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
int startedEventPos = -1;
|
||||
for (int i = 0; i < systemColl.size(); i++) {
|
||||
SolrInputDocument d = systemColl.get(i);
|
||||
if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
|
||||
if (!"node_lost_trigger3".equals(d.getFieldValue("event.source_s"))) {
|
||||
continue;
|
||||
}
|
||||
if ("NODELOST".equals(d.getFieldValue("event.type_s")) &&
|
||||
|
@ -463,11 +503,22 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
"waitFor=" + waitFor + ", killDelay=" + killDelay + ", minIgnored=" + minIgnored,
|
||||
startedEventPos > -1);
|
||||
SolrInputDocument startedEvent = systemColl.get(startedEventPos);
|
||||
// wait until started == finished
|
||||
TimeOut timeOut = new TimeOut(20 * waitFor * NUM_NODES, TimeUnit.SECONDS, cluster.getTimeSource());
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
if (triggerStartedCount.get() == triggerFinishedCount.get()) {
|
||||
break;
|
||||
}
|
||||
timeOut.sleep(1000);
|
||||
}
|
||||
if (timeOut.hasTimedOut()) {
|
||||
fail("did not finish processing all events in time: started=" + triggerStartedCount.get() + ", finished=" + triggerFinishedCount.get());
|
||||
}
|
||||
int ignored = 0;
|
||||
int lastIgnoredPos = startedEventPos;
|
||||
for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
|
||||
SolrInputDocument d = systemColl.get(i);
|
||||
if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
|
||||
if (!"node_lost_trigger3".equals(d.getFieldValue("event.source_s"))) {
|
||||
continue;
|
||||
}
|
||||
if ("NODELOST".equals(d.getFieldValue("event.type_s"))) {
|
||||
|
@ -492,13 +543,13 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
SolrInputDocument finishedEvent = null;
|
||||
long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
|
||||
while (count-- > 0) {
|
||||
cluster.getTimeSource().sleep(150000);
|
||||
cluster.getTimeSource().sleep(waitFor * 10000);
|
||||
long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
|
||||
if (currentNumOps == lastNumOps) {
|
||||
int size = systemColl.size() - 1;
|
||||
for (int i = size; i > lastIgnoredPos; i--) {
|
||||
SolrInputDocument d = systemColl.get(i);
|
||||
if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
|
||||
if (!"node_lost_trigger3".equals(d.getFieldValue("event.source_s"))) {
|
||||
continue;
|
||||
}
|
||||
if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
|
||||
|
@ -560,7 +611,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
|
||||
"{'name':'test','class':'" + FinishTriggerAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
|
@ -580,7 +631,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
|
||||
boolean await = triggerFiredLatch.await(waitForSeconds * 20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
boolean await = triggerFinishedLatch.await(waitForSeconds * 20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
cluster.getTimeSource().sleep(2000);
|
||||
|
|
Loading…
Reference in New Issue