mirror of https://github.com/apache/lucene.git
SOLR-12923: harden testEventQueue by replacing the arbitrary sleep call with a countdown latch
This commit is contained in:
parent
235b15acfc
commit
7f7357696f
|
@ -75,7 +75,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
private static volatile CountDownLatch actionCompleted;
|
private static volatile CountDownLatch actionCompleted;
|
||||||
private static AtomicBoolean triggerFired;
|
private static AtomicBoolean triggerFired;
|
||||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||||
public static volatile long eventQueueActionWait = 5000;
|
|
||||||
private static SolrCloudManager cloudManager;
|
private static SolrCloudManager cloudManager;
|
||||||
|
|
||||||
static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
|
static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
|
||||||
|
@ -151,7 +150,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
events.clear();
|
events.clear();
|
||||||
listenerEvents.clear();
|
listenerEvents.clear();
|
||||||
lastActionExecutedAt.set(0);
|
lastActionExecutedAt.set(0);
|
||||||
eventQueueActionWait = 5000;
|
|
||||||
while (cluster.getJettySolrRunners().size() < 2) {
|
while (cluster.getJettySolrRunners().size() < 2) {
|
||||||
// perhaps a test stopped a node but didn't start it back
|
// perhaps a test stopped a node but didn't start it back
|
||||||
// lets start a node
|
// lets start a node
|
||||||
|
@ -394,25 +392,29 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestEventQueueAction extends TriggerActionBase {
|
public static class TestEventQueueAction extends TriggerActionBase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
public static volatile CountDownLatch stall = new CountDownLatch(0);
|
||||||
public TestEventQueueAction() {
|
public TestEventQueueAction() {
|
||||||
log.info("TestEventQueueAction instantiated");
|
log.info("TestEventQueueAction instantiated");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
log.info("-- event: " + event);
|
// make a local copy of the latch so we're using it consistently even as test thread changes tings
|
||||||
|
final CountDownLatch stallLatch = stall;
|
||||||
|
log.info("processing: stall={} event={} ", stallLatch, event);
|
||||||
events.add(event);
|
events.add(event);
|
||||||
long eventQueueActionWaitCopy = eventQueueActionWait;
|
|
||||||
getActionStarted().countDown();
|
getActionStarted().countDown();
|
||||||
try {
|
try {
|
||||||
log.info("-- Going to sleep for {} ms", eventQueueActionWaitCopy);
|
if (stallLatch.await(60, TimeUnit.SECONDS)) {
|
||||||
Thread.sleep(eventQueueActionWaitCopy);
|
log.info("Firing trigger event after await()ing 'stall' countdown");
|
||||||
log.info("-- Woke up after sleeping for {} ms", eventQueueActionWaitCopy);
|
triggerFired.set(true);
|
||||||
triggerFired.compareAndSet(false, true);
|
} else {
|
||||||
|
log.error("Timed out await()ing 'stall' countdown");
|
||||||
|
}
|
||||||
getActionCompleted().countDown();
|
getActionCompleted().countDown();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.info("-- Interrupted");
|
log.info("Interrupted");
|
||||||
getActionInterrupted().countDown();
|
getActionInterrupted().countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -454,19 +456,32 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
actionInitCalled.await(60, TimeUnit.SECONDS));
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// setup the trigger action to stall so we can test interupting it w/overseer change
|
||||||
|
// NOTE: we will never release this latch, instead we expect the interupt on overseer shutdown
|
||||||
|
TestEventQueueAction.stall = new CountDownLatch(1);
|
||||||
|
|
||||||
// add node to generate the event
|
// add node to generate the event
|
||||||
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
JettySolrRunner newNode = cluster.startJettySolrRunner();
|
||||||
cluster.waitForAllNodes(30);
|
cluster.waitForAllNodes(30);
|
||||||
assertTrue("Action did not start even after await()ing an excessive amount of time",
|
assertTrue("Action did not start even after await()ing an excessive amount of time",
|
||||||
actionStarted.await(60, TimeUnit.SECONDS));
|
actionStarted.await(60, TimeUnit.SECONDS));
|
||||||
eventQueueActionWait = 1;
|
|
||||||
// event should be there
|
// event should be there
|
||||||
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
|
final TriggerEvent nodeAddedEvent = events.iterator().next();
|
||||||
assertNotNull(nodeAddedEvent);
|
assertNotNull(nodeAddedEvent);
|
||||||
|
assertNotNull(nodeAddedEvent.getId());
|
||||||
|
assertNotNull(nodeAddedEvent.getEventType());
|
||||||
|
assertNotNull(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME));
|
||||||
|
|
||||||
// but action did not complete yet so the event is still enqueued
|
// but action did not complete yet so the event is still enqueued
|
||||||
assertFalse(triggerFired.get());
|
assertFalse(triggerFired.get());
|
||||||
|
|
||||||
|
// we know the event action has started, so we can re-set state for the next instance
|
||||||
|
// that will run after the overseer change
|
||||||
events.clear();
|
events.clear();
|
||||||
actionStarted = new CountDownLatch(1);
|
actionStarted = new CountDownLatch(1);
|
||||||
|
TestEventQueueAction.stall = new CountDownLatch(0); // so replay won't wait
|
||||||
|
|
||||||
// kill overseer leader
|
// kill overseer leader
|
||||||
JettySolrRunner j = cluster.stopJettySolrRunner(overseerLeaderIndex);
|
JettySolrRunner j = cluster.stopJettySolrRunner(overseerLeaderIndex);
|
||||||
cluster.waitForJettyToStop(j);
|
cluster.waitForJettyToStop(j);
|
||||||
|
@ -477,12 +492,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
|
||||||
// it should fire again from enqueued event
|
// it should fire again from enqueued event
|
||||||
assertTrue("Action did not (re-)start even after await()ing an excessive amount of time",
|
assertTrue("Action did not (re-)start even after await()ing an excessive amount of time",
|
||||||
actionStarted.await(60, TimeUnit.SECONDS));
|
actionStarted.await(60, TimeUnit.SECONDS));
|
||||||
TriggerEvent replayedEvent = events.iterator().next();
|
|
||||||
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
|
final TriggerEvent replayedEvent = events.iterator().next();
|
||||||
assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
|
assertNotNull(replayedEvent);
|
||||||
|
|
||||||
assertTrue("Action did not complete even after await()ing an excessive amount of time",
|
assertTrue("Action did not complete even after await()ing an excessive amount of time",
|
||||||
actionCompleted.await(60, TimeUnit.SECONDS));
|
actionCompleted.await(60, TimeUnit.SECONDS));
|
||||||
assertTrue(triggerFired.get());
|
assertTrue(triggerFired.get());
|
||||||
|
|
||||||
|
assertEquals(nodeAddedEvent.getId(), replayedEvent.getId());
|
||||||
|
assertEquals(nodeAddedEvent.getEventTime(), replayedEvent.getEventTime());
|
||||||
|
assertEquals(nodeAddedEvent.getEventType(), replayedEvent.getEventType());
|
||||||
|
assertEquals(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME),
|
||||||
|
replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME));
|
||||||
|
assertEquals(Boolean.TRUE, replayedEvent.getProperty(TriggerEvent.REPLAYING));
|
||||||
}
|
}
|
||||||
|
|
||||||
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||||
|
|
|
@ -677,38 +677,42 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestEventQueueAction extends TriggerActionBase {
|
public static class TestEventQueueAction extends TriggerActionBase {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
public static volatile CountDownLatch stall = new CountDownLatch(0);
|
||||||
public TestEventQueueAction() {
|
public TestEventQueueAction() {
|
||||||
log.info("TestEventQueueAction instantiated");
|
log.info("TestEventQueueAction instantiated");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(TriggerEvent event, ActionContext actionContext) {
|
public void process(TriggerEvent event, ActionContext actionContext) {
|
||||||
log.info("-- event: " + event);
|
// make a local copy of the latch so we're using it consistently even as test thread changes tings
|
||||||
|
final CountDownLatch stallLatch = stall;
|
||||||
|
log.info("processing: stall={} event={} ", stallLatch, event);
|
||||||
events.add(event);
|
events.add(event);
|
||||||
getActionStarted().countDown();
|
getActionStarted().countDown();
|
||||||
try {
|
try {
|
||||||
Thread.sleep(eventQueueActionWait);
|
if (stallLatch.await(60, TimeUnit.SECONDS)) {
|
||||||
triggerFired.compareAndSet(false, true);
|
log.info("Firing trigger event after await()ing 'stall' countdown");
|
||||||
|
triggerFired.set(true);
|
||||||
|
} else {
|
||||||
|
log.error("Timed out await()ing 'stall' countdown");
|
||||||
|
}
|
||||||
getActionCompleted().countDown();
|
getActionCompleted().countDown();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
log.info("Interrupted");
|
||||||
getActionInterrupted().countDown();
|
getActionInterrupted().countDown();
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> args) throws TriggerValidationException {
|
public void init() throws Exception {
|
||||||
log.debug("TestTriggerAction init");
|
log.info("TestEventQueueAction init");
|
||||||
actionInitCalled.countDown();
|
actionInitCalled.countDown();
|
||||||
super.configure(loader, cloudManager, args);
|
super.init();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long eventQueueActionWait = 5000;
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
//@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13072") // this test fails easily
|
|
||||||
public void testEventQueue() throws Exception {
|
public void testEventQueue() throws Exception {
|
||||||
waitForSeconds = 1;
|
waitForSeconds = 1;
|
||||||
SolrClient solrClient = cluster.simGetSolrClient();
|
SolrClient solrClient = cluster.simGetSolrClient();
|
||||||
|
@ -729,21 +733,31 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time",
|
||||||
actionInitCalled.await(60, TimeUnit.SECONDS));
|
actionInitCalled.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// wait for the trigger to run at least once
|
// setup the trigger action to stall so we can test interupting it w/overseer change
|
||||||
cluster.getTimeSource().sleep(2 * waitForSeconds * 1000);
|
// NOTE: we will never release this latch, instead we expect the interupt on overseer shutdown
|
||||||
|
TestEventQueueAction.stall = new CountDownLatch(1);
|
||||||
|
|
||||||
// add node to generate the event
|
// add node to generate the event
|
||||||
String newNode = cluster.simAddNode();
|
final String newNode = cluster.simAddNode();
|
||||||
assertTrue("Action did not start even after await()ing an excessive amount of time",
|
assertTrue("Action did not start even after await()ing an excessive amount of time",
|
||||||
actionStarted.await(60, TimeUnit.SECONDS));
|
actionStarted.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// event should be there
|
// event should be there
|
||||||
TriggerEvent nodeAddedEvent = events.iterator().next();
|
final TriggerEvent nodeAddedEvent = events.iterator().next();
|
||||||
assertNotNull(nodeAddedEvent);
|
assertNotNull(nodeAddedEvent);
|
||||||
// but action did not complete yet so the event is still enqueued
|
assertNotNull(nodeAddedEvent.getId());
|
||||||
|
assertNotNull(nodeAddedEvent.getEventType());
|
||||||
|
assertNotNull(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME));
|
||||||
|
|
||||||
|
// but action did not complete yet (due to stall) so the event is still enqueued
|
||||||
assertFalse(triggerFired.get());
|
assertFalse(triggerFired.get());
|
||||||
|
|
||||||
|
// we know the event action has started, so we can re-set state for the next instance
|
||||||
|
// that will run after the overseer change
|
||||||
events.clear();
|
events.clear();
|
||||||
actionStarted = new CountDownLatch(1);
|
actionStarted = new CountDownLatch(1);
|
||||||
eventQueueActionWait = 1;
|
TestEventQueueAction.stall = new CountDownLatch(0); // so replay won't wait
|
||||||
|
|
||||||
// kill overseer
|
// kill overseer
|
||||||
cluster.simRestartOverseer(overseerLeader);
|
cluster.simRestartOverseer(overseerLeader);
|
||||||
cluster.getTimeSource().sleep(5000);
|
cluster.getTimeSource().sleep(5000);
|
||||||
|
@ -756,12 +770,21 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||||
// it should fire again from enqueued event
|
// it should fire again from enqueued event
|
||||||
assertTrue("Action did not (re-)start even after await()ing an excessive amount of time",
|
assertTrue("Action did not (re-)start even after await()ing an excessive amount of time",
|
||||||
actionStarted.await(60, TimeUnit.SECONDS));
|
actionStarted.await(60, TimeUnit.SECONDS));
|
||||||
TriggerEvent replayedEvent = events.iterator().next();
|
|
||||||
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
|
final TriggerEvent replayedEvent = events.iterator().next();
|
||||||
assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
|
assertNotNull(replayedEvent);
|
||||||
|
|
||||||
assertTrue("Action did not complete even after await()ing an excessive amount of time",
|
assertTrue("Action did not complete even after await()ing an excessive amount of time",
|
||||||
actionCompleted.await(60, TimeUnit.SECONDS));
|
actionCompleted.await(60, TimeUnit.SECONDS));
|
||||||
assertTrue(triggerFired.get());
|
assertTrue(triggerFired.get());
|
||||||
|
|
||||||
|
assertEquals(nodeAddedEvent.getId(), replayedEvent.getId());
|
||||||
|
assertEquals(nodeAddedEvent.getEventTime(), replayedEvent.getEventTime());
|
||||||
|
assertEquals(nodeAddedEvent.getEventType(), replayedEvent.getEventType());
|
||||||
|
assertEquals(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME),
|
||||||
|
replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME));
|
||||||
|
assertEquals(Boolean.TRUE, replayedEvent.getProperty(TriggerEvent.REPLAYING));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue