diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index 23f24c3a00d..756617e2870 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -75,7 +75,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { private static volatile CountDownLatch actionCompleted; private static AtomicBoolean triggerFired; private static Set events = ConcurrentHashMap.newKeySet(); - public static volatile long eventQueueActionWait = 5000; private static SolrCloudManager cloudManager; static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5); @@ -151,7 +150,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { events.clear(); listenerEvents.clear(); lastActionExecutedAt.set(0); - eventQueueActionWait = 5000; while (cluster.getJettySolrRunners().size() < 2) { // perhaps a test stopped a node but didn't start it back // lets start a node @@ -394,25 +392,29 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { } public static class TestEventQueueAction extends TriggerActionBase { - + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static volatile CountDownLatch stall = new CountDownLatch(0); public TestEventQueueAction() { log.info("TestEventQueueAction instantiated"); } @Override public void process(TriggerEvent event, ActionContext actionContext) { - log.info("-- event: " + event); + // make a local copy of the latch so we're using it consistently even as test thread changes tings + final CountDownLatch stallLatch = stall; + log.info("processing: stall={} event={} ", stallLatch, event); events.add(event); - long eventQueueActionWaitCopy = eventQueueActionWait; getActionStarted().countDown(); try { - log.info("-- Going to sleep for {} ms", eventQueueActionWaitCopy); - Thread.sleep(eventQueueActionWaitCopy); - log.info("-- Woke up after sleeping for {} ms", eventQueueActionWaitCopy); - triggerFired.compareAndSet(false, true); + if (stallLatch.await(60, TimeUnit.SECONDS)) { + log.info("Firing trigger event after await()ing 'stall' countdown"); + triggerFired.set(true); + } else { + log.error("Timed out await()ing 'stall' countdown"); + } getActionCompleted().countDown(); } catch (InterruptedException e) { - log.info("-- Interrupted"); + log.info("Interrupted"); getActionInterrupted().countDown(); } } @@ -454,19 +456,32 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time", actionInitCalled.await(60, TimeUnit.SECONDS)); + // setup the trigger action to stall so we can test interupting it w/overseer change + // NOTE: we will never release this latch, instead we expect the interupt on overseer shutdown + TestEventQueueAction.stall = new CountDownLatch(1); + // add node to generate the event JettySolrRunner newNode = cluster.startJettySolrRunner(); cluster.waitForAllNodes(30); assertTrue("Action did not start even after await()ing an excessive amount of time", actionStarted.await(60, TimeUnit.SECONDS)); - eventQueueActionWait = 1; + // event should be there - NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next(); + final TriggerEvent nodeAddedEvent = events.iterator().next(); assertNotNull(nodeAddedEvent); + assertNotNull(nodeAddedEvent.getId()); + assertNotNull(nodeAddedEvent.getEventType()); + assertNotNull(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME)); + // but action did not complete yet so the event is still enqueued assertFalse(triggerFired.get()); + + // we know the event action has started, so we can re-set state for the next instance + // that will run after the overseer change events.clear(); actionStarted = new CountDownLatch(1); + TestEventQueueAction.stall = new CountDownLatch(0); // so replay won't wait + // kill overseer leader JettySolrRunner j = cluster.stopJettySolrRunner(overseerLeaderIndex); cluster.waitForJettyToStop(j); @@ -477,12 +492,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { // it should fire again from enqueued event assertTrue("Action did not (re-)start even after await()ing an excessive amount of time", actionStarted.await(60, TimeUnit.SECONDS)); - TriggerEvent replayedEvent = events.iterator().next(); - assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null); - assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null); + + final TriggerEvent replayedEvent = events.iterator().next(); + assertNotNull(replayedEvent); + assertTrue("Action did not complete even after await()ing an excessive amount of time", actionCompleted.await(60, TimeUnit.SECONDS)); assertTrue(triggerFired.get()); + + assertEquals(nodeAddedEvent.getId(), replayedEvent.getId()); + assertEquals(nodeAddedEvent.getEventTime(), replayedEvent.getEventTime()); + assertEquals(nodeAddedEvent.getEventType(), replayedEvent.getEventType()); + assertEquals(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME), + replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME)); + assertEquals(Boolean.TRUE, replayedEvent.getProperty(TriggerEvent.REPLAYING)); } static Map> listenerEvents = new HashMap<>(); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java index 871e0834e5b..f680781e203 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java @@ -677,38 +677,42 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase { } public static class TestEventQueueAction extends TriggerActionBase { - + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static volatile CountDownLatch stall = new CountDownLatch(0); public TestEventQueueAction() { log.info("TestEventQueueAction instantiated"); } @Override public void process(TriggerEvent event, ActionContext actionContext) { - log.info("-- event: " + event); + // make a local copy of the latch so we're using it consistently even as test thread changes tings + final CountDownLatch stallLatch = stall; + log.info("processing: stall={} event={} ", stallLatch, event); events.add(event); getActionStarted().countDown(); try { - Thread.sleep(eventQueueActionWait); - triggerFired.compareAndSet(false, true); + if (stallLatch.await(60, TimeUnit.SECONDS)) { + log.info("Firing trigger event after await()ing 'stall' countdown"); + triggerFired.set(true); + } else { + log.error("Timed out await()ing 'stall' countdown"); + } getActionCompleted().countDown(); } catch (InterruptedException e) { + log.info("Interrupted"); getActionInterrupted().countDown(); - return; } } @Override - public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map args) throws TriggerValidationException { - log.debug("TestTriggerAction init"); + public void init() throws Exception { + log.info("TestEventQueueAction init"); actionInitCalled.countDown(); - super.configure(loader, cloudManager, args); + super.init(); } } - public static long eventQueueActionWait = 5000; - @Test - //@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13072") // this test fails easily public void testEventQueue() throws Exception { waitForSeconds = 1; SolrClient solrClient = cluster.simGetSolrClient(); @@ -729,21 +733,31 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase { assertTrue("Trigger was not init()ed even after await()ing an excessive amount of time", actionInitCalled.await(60, TimeUnit.SECONDS)); - // wait for the trigger to run at least once - cluster.getTimeSource().sleep(2 * waitForSeconds * 1000); - + // setup the trigger action to stall so we can test interupting it w/overseer change + // NOTE: we will never release this latch, instead we expect the interupt on overseer shutdown + TestEventQueueAction.stall = new CountDownLatch(1); + // add node to generate the event - String newNode = cluster.simAddNode(); + final String newNode = cluster.simAddNode(); assertTrue("Action did not start even after await()ing an excessive amount of time", actionStarted.await(60, TimeUnit.SECONDS)); + // event should be there - TriggerEvent nodeAddedEvent = events.iterator().next(); + final TriggerEvent nodeAddedEvent = events.iterator().next(); assertNotNull(nodeAddedEvent); - // but action did not complete yet so the event is still enqueued + assertNotNull(nodeAddedEvent.getId()); + assertNotNull(nodeAddedEvent.getEventType()); + assertNotNull(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME)); + + // but action did not complete yet (due to stall) so the event is still enqueued assertFalse(triggerFired.get()); + + // we know the event action has started, so we can re-set state for the next instance + // that will run after the overseer change events.clear(); actionStarted = new CountDownLatch(1); - eventQueueActionWait = 1; + TestEventQueueAction.stall = new CountDownLatch(0); // so replay won't wait + // kill overseer cluster.simRestartOverseer(overseerLeader); cluster.getTimeSource().sleep(5000); @@ -756,12 +770,21 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase { // it should fire again from enqueued event assertTrue("Action did not (re-)start even after await()ing an excessive amount of time", actionStarted.await(60, TimeUnit.SECONDS)); - TriggerEvent replayedEvent = events.iterator().next(); - assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null); - assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null); + + final TriggerEvent replayedEvent = events.iterator().next(); + assertNotNull(replayedEvent); + assertTrue("Action did not complete even after await()ing an excessive amount of time", actionCompleted.await(60, TimeUnit.SECONDS)); assertTrue(triggerFired.get()); + + assertEquals(nodeAddedEvent.getId(), replayedEvent.getId()); + assertEquals(nodeAddedEvent.getEventTime(), replayedEvent.getEventTime()); + assertEquals(nodeAddedEvent.getEventType(), replayedEvent.getEventType()); + assertEquals(nodeAddedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME), + replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME)); + assertEquals(Boolean.TRUE, replayedEvent.getProperty(TriggerEvent.REPLAYING)); + } @Test