diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java index 1ddd8a3c673..fa0010215de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java @@ -379,13 +379,15 @@ public class ValueQueue { if (numToFill > 0) { refiller.fillQueueForKey(keyName, ekvs, numToFill); } - // Asynch task to fill > lowWatermark - if (i <= (int) (lowWatermark * numValues)) { - submitRefillTask(keyName, keyQueue); - } - return ekvs; + + break; + } else { + ekvs.add(val); } - ekvs.add(val); + } + // Schedule a refill task in case queue has gone below the watermark + if (keyQueue.size() < (int) (lowWatermark * numValues)) { + submitRefillTask(keyName, keyQueue); } } catch (Exception e) { throw new IOException("Exception while contacting value generator ", e); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java index abc4ebf9b4d..55a9280d626 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java @@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key; import java.io.IOException; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -31,7 +32,6 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; -import com.google.common.base.Supplier; import com.google.common.collect.Sets; public class TestValueQueue { @@ -62,6 +62,18 @@ public class TestValueQueue { } } + private void waitForRefill(ValueQueue valueQueue, String queueName, int queueSize) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + int size = valueQueue.getSize(queueName); + if (size != queueSize) { + LOG.info("Current ValueQueue size is " + size); + return false; + } + return true; + }, 100, 3000); + } + /** * Verifies that Queue is initially filled to "numInitValues" */ @@ -69,7 +81,7 @@ public class TestValueQueue { public void testInitFill() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.1f, 300, 1, + new ValueQueue(10, 0.1f, 30000, 1, SyncGenerationPolicy.ALL, filler); Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals(1, filler.getTop().num); @@ -83,7 +95,7 @@ public class TestValueQueue { public void testWarmUp() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.5f, 300, 1, + new ValueQueue(10, 0.5f, 30000, 1, SyncGenerationPolicy.ALL, filler); vq.initializeQueuesForKeys("k1", "k2", "k3"); FillInfo[] fillInfos = @@ -106,14 +118,17 @@ public class TestValueQueue { public void testRefill() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.1f, 300, 1, + new ValueQueue(100, 0.1f, 30000, 1, SyncGenerationPolicy.ALL, filler); + // Trigger a prefill (10) and an async refill (91) Assert.assertEquals("test", vq.getNext("k1")); - Assert.assertEquals(1, filler.getTop().num); - // Trigger refill - vq.getNext("k1"); - Assert.assertEquals(1, filler.getTop().num); Assert.assertEquals(10, filler.getTop().num); + + // Wait for the async task to finish + waitForRefill(vq, "k1", 100); + // Refill task should add 91 values to get to a full queue (10 produced by + // the prefill to the low watermark, 1 consumed by getNext()) + Assert.assertEquals(91, filler.getTop().num); vq.shutdown(); } @@ -125,10 +140,27 @@ public class TestValueQueue { public void testNoRefill() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.5f, 300, 1, + new ValueQueue(10, 0.5f, 30000, 1, SyncGenerationPolicy.ALL, filler); + // Trigger a prefill (5) and an async refill (6) Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals(5, filler.getTop().num); + + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 6 values to get to a full queue (5 produced by + // the prefill to the low watermark, 1 consumed by getNext()) + Assert.assertEquals(6, filler.getTop().num); + + // Take another value, queue is still above the watermark + Assert.assertEquals("test", vq.getNext("k1")); + + // Wait a while to make sure that no async refills are triggered + try { + waitForRefill(vq, "k1", 10); + } catch (TimeoutException ignored) { + // This is the correct outcome - no refill is expected + } Assert.assertEquals(null, filler.getTop()); vq.shutdown(); } @@ -140,11 +172,29 @@ public class TestValueQueue { public void testgetAtMostPolicyALL() throws Exception { MockFiller filler = new MockFiller(); final ValueQueue vq = - new ValueQueue(10, 0.1f, 300, 1, + new ValueQueue(10, 0.1f, 30000, 1, SyncGenerationPolicy.ALL, filler); + // Trigger a prefill (1) and an async refill (10) Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals(1, filler.getTop().num); + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 10 values to get to a full queue (1 produced by + // the prefill to the low watermark, 1 consumed by getNext()) + Assert.assertEquals(10, filler.getTop().num); + + // Drain completely, no further refills triggered + vq.drain("k1"); + + // Wait a while to make sure that no async refills are triggered + try { + waitForRefill(vq, "k1", 10); + } catch (TimeoutException ignored) { + // This is the correct outcome - no refill is expected + } + Assert.assertNull(filler.getTop()); + // Synchronous call: // 1. Synchronously fill returned list // 2. Start another async task to fill the queue in the cache @@ -154,23 +204,16 @@ public class TestValueQueue { filler.getTop().num); // Wait for the async task to finish - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - int size = vq.getSize("k1"); - if (size != 10) { - LOG.info("Current ValueQueue size is " + size); - return false; - } - return true; - } - }, 100, 3000); + waitForRefill(vq, "k1", 10); + // Refill task should add 10 values to get to a full queue Assert.assertEquals("Failed in async call.", 10, filler.getTop().num); // Drain completely after filled by the async thread - Assert.assertEquals("Failed to drain completely after async.", 10, - vq.getAtMost("k1", 10).size()); - // Synchronous call (No Async call since num > lowWatermark) + vq.drain("k1"); + Assert.assertEquals("Failed to drain completely after async.", 0, + vq.getSize("k1")); + + // Synchronous call Assert.assertEquals("Failed to get all 19.", 19, vq.getAtMost("k1", 19).size()); Assert.assertEquals("Failed in sync call.", 19, filler.getTop().num); @@ -184,14 +227,29 @@ public class TestValueQueue { public void testgetAtMostPolicyATLEAST_ONE() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.3f, 300, 1, + new ValueQueue(10, 0.3f, 30000, 1, SyncGenerationPolicy.ATLEAST_ONE, filler); + // Trigger a prefill (3) and an async refill (8) Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals(3, filler.getTop().num); - // Drain completely - Assert.assertEquals(2, vq.getAtMost("k1", 10).size()); - // Asynch Refill call - Assert.assertEquals(10, filler.getTop().num); + + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 8 values to get to a full queue (3 produced by + // the prefill to the low watermark, 1 consumed by getNext()) + Assert.assertEquals("Failed in async call.", 8, filler.getTop().num); + + // Drain completely, no further refills triggered + vq.drain("k1"); + + // Queue is empty, sync will return a single value and trigger a refill + Assert.assertEquals(1, vq.getAtMost("k1", 10).size()); + Assert.assertEquals(1, filler.getTop().num); + + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 10 values to get to a full queue + Assert.assertEquals("Failed in async call.", 10, filler.getTop().num); vq.shutdown(); } @@ -202,16 +260,29 @@ public class TestValueQueue { public void testgetAtMostPolicyLOW_WATERMARK() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.3f, 300, 1, + new ValueQueue(10, 0.3f, 30000, 1, SyncGenerationPolicy.LOW_WATERMARK, filler); + // Trigger a prefill (3) and an async refill (8) Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals(3, filler.getTop().num); - // Drain completely + + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 8 values to get to a full queue (3 produced by + // the prefill to the low watermark, 1 consumed by getNext()) + Assert.assertEquals("Failed in async call.", 8, filler.getTop().num); + + // Drain completely, no further refills triggered + vq.drain("k1"); + + // Queue is empty, sync will return 3 values and trigger a refill Assert.assertEquals(3, vq.getAtMost("k1", 10).size()); - // Synchronous call - Assert.assertEquals(1, filler.getTop().num); - // Asynch Refill call - Assert.assertEquals(10, filler.getTop().num); + Assert.assertEquals(3, filler.getTop().num); + + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 10 values to get to a full queue + Assert.assertEquals("Failed in async call.", 10, filler.getTop().num); vq.shutdown(); } @@ -219,11 +290,27 @@ public class TestValueQueue { public void testDrain() throws Exception { MockFiller filler = new MockFiller(); ValueQueue vq = - new ValueQueue(10, 0.1f, 300, 1, + new ValueQueue(10, 0.1f, 30000, 1, SyncGenerationPolicy.ALL, filler); + // Trigger a prefill (1) and an async refill (10) Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals(1, filler.getTop().num); + + // Wait for the async task to finish + waitForRefill(vq, "k1", 10); + // Refill task should add 10 values to get to a full queue (1 produced by + // the prefill to the low watermark, 1 consumed by getNext()) + Assert.assertEquals(10, filler.getTop().num); + + // Drain completely, no further refills triggered vq.drain("k1"); + + // Wait a while to make sure that no async refills are triggered + try { + waitForRefill(vq, "k1", 10); + } catch (TimeoutException ignored) { + // This is the correct outcome - no refill is expected + } Assert.assertNull(filler.getTop()); vq.shutdown(); }