From 3147c131e0ce997259f7bcf31e655d43dd99ef59 Mon Sep 17 00:00:00 2001 From: Chris Hostetter Date: Tue, 11 Dec 2018 10:38:36 -0700 Subject: [PATCH] SOLR-13054: rewrite TriggerSetPropertiesIntegrationTest test no longer depends on changing static non-final non-volatile variables used by multiple threads test also no longer depends on arbitrary sleep calls, instead threads await/poll on concurrent signaling objects/queues --- .../TriggerSetPropertiesIntegrationTest.java | 245 +++++++++++------- 1 file changed, 156 insertions(+), 89 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java index 0ee0e1c4fdd..c59e60bcce9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java @@ -20,13 +20,17 @@ package org.apache.solr.cloud.autoscaling; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; @@ -50,8 +54,6 @@ import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSourc public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static CountDownLatch triggerFiredLatch = new CountDownLatch(1); - @BeforeClass public static void setupCluster() throws Exception { configureCluster(2) @@ -67,134 +69,199 @@ public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase { assertEquals(response.get("result").toString(), "success"); } - private static CountDownLatch getTriggerFiredLatch() { - return triggerFiredLatch; - } - + /** + * Test that we can add/remove triggers to a scheduler, and change the config on the fly, and still get + * expected behavior + */ public void testSetProperties() throws Exception { - JettySolrRunner runner = cluster.getJettySolrRunner(0); - SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader(); - SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager(); - AtomicLong diff = new AtomicLong(0); - triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference + final JettySolrRunner runner = cluster.getJettySolrRunner(0); + final SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader(); + final SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager(); + try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) { AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap()); scheduledTriggers.setAutoScalingConfig(config); - AutoScaling.Trigger t = new TriggerBase(TriggerEventType.NODELOST, "x") { - @Override - protected Map getState() { - return Collections.singletonMap("x", "y"); - } - - @Override - protected void setState(Map state) { - - } - - @Override - public void restoreState(AutoScaling.Trigger old) { - - } + // Setup a trigger that records the timestamp of each time it was run + // we only need 2 timestamps for the test, so limit the queue and make the trigger a No-Op if full + final BlockingQueue timestamps = new ArrayBlockingQueue(2); + final AutoScaling.Trigger t1 = new MockTrigger(TriggerEventType.NODELOST, "mock-timestamper") { @Override public void run() { - if (getTriggerFiredLatch().getCount() == 0) return; - long l = diff.get(); - diff.set(timeSource.getTimeNs() - l); - getTriggerFiredLatch().countDown(); + log.info("Running {} in {}", this.getName(), Thread.currentThread().getName()); + timestamps.offer(timeSource.getTimeNs()); } }; - t.configure(runner.getCoreContainer().getResourceLoader(), runner.getCoreContainer().getZkController().getSolrCloudManager(), Collections.emptyMap()); - scheduledTriggers.add(t); - assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS)); - assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0); + log.info("Configuring simple scheduler and adding trigger: {}", t1.getName()); + t1.configure(resourceLoader, solrCloudManager, Collections.emptyMap()); + scheduledTriggers.add(t1); - // change schedule delay - config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4)); + waitForAndDiffTimestamps("conf(default delay)", + ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS, + timestamps); + + log.info("Reconfiguing scheduler to use 4s delay and clearing queue for trigger: {}", t1.getName()); + config = config.withProperties(Collections.singletonMap + (AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4)); scheduledTriggers.setAutoScalingConfig(config); - triggerFiredLatch = new CountDownLatch(2); - assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS)); - assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0); + timestamps.clear(); - // reset with default properties - scheduledTriggers.remove("x"); // remove the old trigger + waitForAndDiffTimestamps("conf(four sec delay)", + 4, TimeUnit.SECONDS, + timestamps); + + log.info("Removing trigger: {}", t1.getName()); + scheduledTriggers.remove(t1.getName()); + + log.info("Reconfiguing scheduler to use default props"); config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES); scheduledTriggers.setAutoScalingConfig(config); - // test core thread count - List triggerList = new ArrayList<>(); - final Set threadNames = Collections.synchronizedSet(new HashSet<>()); - final Set triggerNames = Collections.synchronizedSet(new HashSet<>()); - triggerFiredLatch = new CountDownLatch(8); - for (int i = 0; i < 8; i++) { - AutoScaling.Trigger trigger = new MockTrigger(TriggerEventType.NODELOST, "x" + i) { + + assertTrue("Test sanity check, need default thread pool to be at least 3 so we can" + + "test lowering it by 2", ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE >= 3); + final int numTriggers = ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE; + final int reducedThreadPoolSize = numTriggers - 2; + + // Setup X instances of a trigger that: + // - records it's name as being run + // - skipping all remaining execution if it's name has already been recorded + // - records the name of the thread that ran it + // - blocks on a cyclic barrier untill at Y instances have run (to hog a thread) + // ...to test that the scheduler will add new threads as needed, up to the configured limit + // + // NOTE: the reason we need X unique instances is because the scheduler won't "re-run" a single + // trigger while a previouss "run" is still in process + final List triggerList = new ArrayList<>(numTriggers); + + // Use a cyclic barrier gated by an atomic ref so we can swap it out later + final AtomicReference latch = new AtomicReference<>(new CyclicBarrier(numTriggers)); + + // variables for tracking state as we go + // NOTE: all read/write must be gated by synchronizing on the barrier (ref), + // so we we can ensure we are reading a consistent view + final Set threadNames = Collections.synchronizedSet(new LinkedHashSet<>()); + final Set triggerNames = Collections.synchronizedSet(new LinkedHashSet<>()); + final AtomicLong fails = new AtomicLong(0); + + // Use a semaphore to track when each trigger *finishes* so our test thread + // can know when to check & clear the tracking state + final Semaphore completionSemaphore = new Semaphore(numTriggers); + + for (int i = 0; i < numTriggers; i++) { + AutoScaling.Trigger trigger = new MockTrigger(TriggerEventType.NODELOST, + "mock-blocking-trigger-" + i) { @Override public void run() { - try { - // If core pool size is increased then new threads won't be started if existing threads - // aren't busy with tasks. So we make this thread wait longer than necessary - // so that the pool is forced to start threads for other triggers - Thread.sleep(5000); - } catch (InterruptedException e) { - } - if (triggerNames.add(getName())) { - getTriggerFiredLatch().countDown(); + log.info("Running {} in {}", this.getName(), Thread.currentThread().getName()); + CyclicBarrier barrier = null; + synchronized (latch) { + if (triggerNames.add(this.getName())) { + log.info("{}: No-Op since we've already recorded a run", this.getName()); + return; + } threadNames.add(Thread.currentThread().getName()); + barrier = latch.get(); + } + + try { + log.info("{}: waiting on barrier to hog a thread", this.getName()); + barrier.await(30, TimeUnit.SECONDS); + completionSemaphore.release(); + } catch (Exception e) { + fails.incrementAndGet(); + log.error(this.getName() + ": failure waiting on cyclic barrier: " + e.toString(), e); } } }; + trigger.configure(resourceLoader, solrCloudManager, Collections.emptyMap()); triggerList.add(trigger); + completionSemaphore.acquire(); + log.info("Adding trigger {} to scheduler", trigger.getName()); scheduledTriggers.add(trigger); } - assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS)); - assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size()); - assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE - + " threads but found: " + threadNames, - ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size()); + + log.info("Waiting on semaphore for all triggers to signal completion..."); + assertTrue("Timed out waiting for semaphore count to be released", + completionSemaphore.tryAcquire(numTriggers, 60, TimeUnit.SECONDS)); + + synchronized (latch) { + assertEquals("Unexpected number of trigger names found: " + triggerNames.toString(), + numTriggers, triggerNames.size()); + assertEquals("Unexpected number of thread ames found: " + threadNames.toString(), + numTriggers, threadNames.size()); + assertEquals("Unexpected number of trigger fails recorded, check logs?", + 0, fails.get()); - // change core pool size - config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6)); - scheduledTriggers.setAutoScalingConfig(config); - triggerFiredLatch = new CountDownLatch(8); - threadNames.clear(); - triggerNames.clear(); - assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS)); - assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size()); - assertEquals("Expected 6 threads but found: " + threadNames, 6, threadNames.size()); + // before releasing the latch, clear the state and update our config to use a lower number of threads + log.info("Updating scheduler config to use {} threads", reducedThreadPoolSize); + config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, + reducedThreadPoolSize)); + scheduledTriggers.setAutoScalingConfig(config); - // reset - for (int i = 0; i < 8; i++) { - scheduledTriggers.remove(triggerList.get(i).getName()); + log.info("Updating cyclic barrier and clearing test state so triggers will 'run' again"); + latch.set(new CyclicBarrier(reducedThreadPoolSize)); + threadNames.clear(); + triggerNames.clear(); + } + + log.info("Waiting on semaphore for all triggers to signal completion..."); + assertTrue("Timed out waiting for semaphore count to be released", + completionSemaphore.tryAcquire(numTriggers, 60, TimeUnit.SECONDS)); + + synchronized (latch) { + assertEquals("Unexpected number of trigger names found: " + triggerNames.toString(), + numTriggers, triggerNames.size()); + assertEquals("Unexpected number of thread names found: " + threadNames.toString(), + reducedThreadPoolSize, threadNames.size()); + assertEquals("Unexpected number of trigger fails recorded, check logs?", + 0, fails.get()); } } } - public static class MockTrigger extends TriggerBase { + + + + private static final void waitForAndDiffTimestamps(final String label, + final long minExpectedDelta, + final TimeUnit minExpectedDeltaUnit, + final BlockingQueue timestamps) { + try { + log.info(label + ": Waiting for 2 timestamps to be recorded"); + Long firstTs = timestamps.poll(minExpectedDelta * 3, minExpectedDeltaUnit); + assertNotNull(label + ": Couldn't get first timestampe after max allowed polling", firstTs); + Long secondTs = timestamps.poll(minExpectedDelta * 3, minExpectedDeltaUnit); + assertNotNull(label + ": Couldn't get second timestampe after max allowed polling", secondTs); + + final long deltaInNanos = secondTs - firstTs; + final long minExpectedDeltaInNanos = minExpectedDeltaUnit.toNanos(minExpectedDelta); + assertTrue(label + ": Delta between timestamps ("+secondTs+"ns - "+firstTs+"ns = "+deltaInNanos+"ns) is not " + + "at least as much as min expected delay: " + minExpectedDeltaInNanos + "ns", + deltaInNanos >= minExpectedDeltaInNanos); + } catch (InterruptedException e) { + log.error(label + ": interupted", e); + fail(label + ": interupted:" + e.toString()); + } + } + + private static abstract class MockTrigger extends TriggerBase { public MockTrigger(TriggerEventType eventType, String name) { super(eventType, name); } @Override - protected Map getState() { + protected Map getState() { return Collections.emptyMap(); } @Override - protected void setState(Map state) { - - } + protected void setState(Map state) { /* No-Op */ } @Override - public void restoreState(AutoScaling.Trigger old) { - - } - - @Override - public void run() { - - } + public void restoreState(AutoScaling.Trigger old) { /* No-Op */ } } }