mirror of https://github.com/apache/lucene.git
SOLR-12133: Fix failures in TriggerIntegrationTest.testEventQueue due to race conditions
This commit is contained in:
parent
ae6d29f0ae
commit
83cca5cd06
|
@ -66,6 +66,7 @@ import static org.apache.solr.common.params.AutoScalingParams.ACTION_THROTTLE_PE
|
||||||
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS;
|
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS;
|
||||||
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_CORE_POOL_SIZE;
|
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_CORE_POOL_SIZE;
|
||||||
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS;
|
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS;
|
||||||
|
import static org.apache.solr.common.util.ExecutorUtil.awaitTermination;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responsible for scheduling active triggers, starting and stopping them and
|
* Responsible for scheduling active triggers, starting and stopping them and
|
||||||
|
@ -497,9 +498,21 @@ public class ScheduledTriggers implements Closeable {
|
||||||
}
|
}
|
||||||
// shutdown and interrupt all running tasks because there's no longer any
|
// shutdown and interrupt all running tasks because there's no longer any
|
||||||
// guarantee about cluster state
|
// guarantee about cluster state
|
||||||
|
log.debug("Shutting down scheduled thread pool executor now");
|
||||||
scheduledThreadPoolExecutor.shutdownNow();
|
scheduledThreadPoolExecutor.shutdownNow();
|
||||||
|
|
||||||
|
log.debug("Shutting down action executor now");
|
||||||
actionExecutor.shutdownNow();
|
actionExecutor.shutdownNow();
|
||||||
|
|
||||||
listeners.close();
|
listeners.close();
|
||||||
|
|
||||||
|
log.debug("Awaiting termination for action executor");
|
||||||
|
awaitTermination(actionExecutor);
|
||||||
|
|
||||||
|
log.debug("Awaiting termination for scheduled thread pool executor");
|
||||||
|
awaitTermination(scheduledThreadPoolExecutor);
|
||||||
|
|
||||||
|
log.debug("ScheduledTriggers closed completely");
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TriggerWrapper implements Runnable, Closeable {
|
private class TriggerWrapper implements Runnable, Closeable {
|
||||||
|
|
|
@ -64,15 +64,16 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P
|
||||||
public class TriggerIntegrationTest extends SolrCloudTestCase {
|
public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
private static CountDownLatch actionConstructorCalled;
|
private static volatile CountDownLatch actionConstructorCalled;
|
||||||
private static CountDownLatch actionInitCalled;
|
private static volatile CountDownLatch actionInitCalled;
|
||||||
private static CountDownLatch triggerFiredLatch;
|
private static volatile CountDownLatch triggerFiredLatch;
|
||||||
private static int waitForSeconds = 1;
|
private static volatile int waitForSeconds = 1;
|
||||||
private static CountDownLatch actionStarted;
|
private static volatile CountDownLatch actionStarted;
|
||||||
private static CountDownLatch actionInterrupted;
|
private static volatile CountDownLatch actionInterrupted;
|
||||||
private static CountDownLatch actionCompleted;
|
private static volatile CountDownLatch actionCompleted;
|
||||||
private static AtomicBoolean triggerFired;
|
private static AtomicBoolean triggerFired;
|
||||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
|
public static volatile long eventQueueActionWait = 5000;
|
||||||
private static SolrCloudManager cloudManager;
|
private static SolrCloudManager cloudManager;
|
||||||
|
|
||||||
// use the same time source as triggers use
|
// use the same time source as triggers use
|
||||||
|
@ -166,6 +167,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
events.clear();
|
events.clear();
|
||||||
listenerEvents.clear();
|
listenerEvents.clear();
|
||||||
lastActionExecutedAt.set(0);
|
lastActionExecutedAt.set(0);
|
||||||
|
eventQueueActionWait = 5000;
|
||||||
while (cluster.getJettySolrRunners().size() < 2) {
|
while (cluster.getJettySolrRunners().size() < 2) {
|
||||||
// perhaps a test stopped a node but didn't start it back
|
// perhaps a test stopped a node but didn't start it back
|
||||||
// lets start a node
|
// lets start a node
|
||||||
|
@ -415,14 +417,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
log.info("-- event: " + event);
|
log.info("-- event: " + event);
|
||||||
events.add(event);
|
events.add(event);
|
||||||
|
long eventQueueActionWaitCopy = eventQueueActionWait;
|
||||||
getActionStarted().countDown();
|
getActionStarted().countDown();
|
||||||
try {
|
try {
|
||||||
Thread.sleep(eventQueueActionWait);
|
log.info("-- Going to sleep for {} ms", eventQueueActionWaitCopy);
|
||||||
|
Thread.sleep(eventQueueActionWaitCopy);
|
||||||
|
log.info("-- Woke up after sleeping for {} ms", eventQueueActionWaitCopy);
|
||||||
triggerFired.compareAndSet(false, true);
|
triggerFired.compareAndSet(false, true);
|
||||||
getActionCompleted().countDown();
|
getActionCompleted().countDown();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
log.info("-- Interrupted");
|
||||||
getActionInterrupted().countDown();
|
getActionInterrupted().countDown();
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,10 +439,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long eventQueueActionWait = 5000;
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
|
||||||
public void testEventQueue() throws Exception {
|
public void testEventQueue() throws Exception {
|
||||||
waitForSeconds = 1;
|
waitForSeconds = 1;
|
||||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||||
|
@ -471,6 +473,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
boolean await = actionStarted.await(60, TimeUnit.SECONDS);
|
boolean await = actionStarted.await(60, TimeUnit.SECONDS);
|
||||||
assertTrue("action did not start", await);
|
assertTrue("action did not start", await);
|
||||||
|
eventQueueActionWait = 1;
|
||||||
// event should be there
|
// event should be there
|
||||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
||||||
assertNotNull(nodeAddedEvent);
|
assertNotNull(nodeAddedEvent);
|
||||||
|
@ -478,7 +481,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
assertFalse(triggerFired.get());
|
assertFalse(triggerFired.get());
|
||||||
events.clear();
|
events.clear();
|
||||||
actionStarted = new CountDownLatch(1);
|
actionStarted = new CountDownLatch(1);
|
||||||
eventQueueActionWait = 1;
|
|
||||||
// kill overseer leader
|
// kill overseer leader
|
||||||
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
|
|
|
@ -73,6 +73,10 @@ public class ExecutorUtil {
|
||||||
|
|
||||||
public static void shutdownAndAwaitTermination(ExecutorService pool) {
|
public static void shutdownAndAwaitTermination(ExecutorService pool) {
|
||||||
pool.shutdown(); // Disable new tasks from being submitted
|
pool.shutdown(); // Disable new tasks from being submitted
|
||||||
|
awaitTermination(pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void awaitTermination(ExecutorService pool) {
|
||||||
boolean shutdown = false;
|
boolean shutdown = false;
|
||||||
while (!shutdown) {
|
while (!shutdown) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue