HADOOP-16581. Revise ValueQueue to correctly replenish queues that go below the watermark (#1463)

(cherry picked from dd0834696a)
(cherry picked from e31594f20e)
This commit is contained in:
Yuval Degani 2019-09-20 09:55:48 -07:00 committed by Erik Krogen
parent 6ef3204d54
commit b091abc471
2 changed files with 131 additions and 42 deletions

View File

@ -379,13 +379,15 @@ public class ValueQueue <E> {
if (numToFill > 0) { if (numToFill > 0) {
refiller.fillQueueForKey(keyName, ekvs, numToFill); refiller.fillQueueForKey(keyName, ekvs, numToFill);
} }
// Asynch task to fill > lowWatermark
if (i <= (int) (lowWatermark * numValues)) { break;
submitRefillTask(keyName, keyQueue); } else {
} ekvs.add(val);
return ekvs;
} }
ekvs.add(val); }
// Schedule a refill task in case queue has gone below the watermark
if (keyQueue.size() < (int) (lowWatermark * numValues)) {
submitRefillTask(keyName, keyQueue);
} }
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Exception while contacting value generator ", e); throw new IOException("Exception while contacting value generator ", e);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key;
import java.io.IOException; import java.io.IOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -31,7 +32,6 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public class TestValueQueue { public class TestValueQueue {
@ -62,6 +62,18 @@ public class TestValueQueue {
} }
} }
private void waitForRefill(ValueQueue<?> valueQueue, String queueName, int queueSize)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
int size = valueQueue.getSize(queueName);
if (size != queueSize) {
LOG.info("Current ValueQueue size is " + size);
return false;
}
return true;
}, 100, 3000);
}
/** /**
* Verifies that Queue is initially filled to "numInitValues" * Verifies that Queue is initially filled to "numInitValues"
*/ */
@ -69,7 +81,7 @@ public class TestValueQueue {
public void testInitFill() throws Exception { public void testInitFill() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1, new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler); SyncGenerationPolicy.ALL, filler);
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num); Assert.assertEquals(1, filler.getTop().num);
@ -83,7 +95,7 @@ public class TestValueQueue {
public void testWarmUp() throws Exception { public void testWarmUp() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.5f, 300, 1, new ValueQueue<String>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler); SyncGenerationPolicy.ALL, filler);
vq.initializeQueuesForKeys("k1", "k2", "k3"); vq.initializeQueuesForKeys("k1", "k2", "k3");
FillInfo[] fillInfos = FillInfo[] fillInfos =
@ -106,14 +118,17 @@ public class TestValueQueue {
public void testRefill() throws Exception { public void testRefill() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1, new ValueQueue<String>(100, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler); SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (10) and an async refill (91)
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
// Trigger refill
vq.getNext("k1");
Assert.assertEquals(1, filler.getTop().num);
Assert.assertEquals(10, filler.getTop().num); Assert.assertEquals(10, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 100);
// Refill task should add 91 values to get to a full queue (10 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(91, filler.getTop().num);
vq.shutdown(); vq.shutdown();
} }
@ -125,10 +140,27 @@ public class TestValueQueue {
public void testNoRefill() throws Exception { public void testNoRefill() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.5f, 300, 1, new ValueQueue<String>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler); SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (5) and an async refill (6)
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(5, filler.getTop().num); Assert.assertEquals(5, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 6 values to get to a full queue (5 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(6, filler.getTop().num);
// Take another value, queue is still above the watermark
Assert.assertEquals("test", vq.getNext("k1"));
// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertEquals(null, filler.getTop()); Assert.assertEquals(null, filler.getTop());
vq.shutdown(); vq.shutdown();
} }
@ -140,11 +172,29 @@ public class TestValueQueue {
public void testgetAtMostPolicyALL() throws Exception { public void testgetAtMostPolicyALL() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
final ValueQueue<String> vq = final ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1, new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler); SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (1) and an async refill (10)
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num); Assert.assertEquals(1, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue (1 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(10, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertNull(filler.getTop());
// Synchronous call: // Synchronous call:
// 1. Synchronously fill returned list // 1. Synchronously fill returned list
// 2. Start another async task to fill the queue in the cache // 2. Start another async task to fill the queue in the cache
@ -154,23 +204,16 @@ public class TestValueQueue {
filler.getTop().num); filler.getTop().num);
// Wait for the async task to finish // Wait for the async task to finish
GenericTestUtils.waitFor(new Supplier<Boolean>() { waitForRefill(vq, "k1", 10);
@Override // Refill task should add 10 values to get to a full queue
public Boolean get() {
int size = vq.getSize("k1");
if (size != 10) {
LOG.info("Current ValueQueue size is " + size);
return false;
}
return true;
}
}, 100, 3000);
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num); Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
// Drain completely after filled by the async thread // Drain completely after filled by the async thread
Assert.assertEquals("Failed to drain completely after async.", 10, vq.drain("k1");
vq.getAtMost("k1", 10).size()); Assert.assertEquals("Failed to drain completely after async.", 0,
// Synchronous call (No Async call since num > lowWatermark) vq.getSize("k1"));
// Synchronous call
Assert.assertEquals("Failed to get all 19.", 19, Assert.assertEquals("Failed to get all 19.", 19,
vq.getAtMost("k1", 19).size()); vq.getAtMost("k1", 19).size());
Assert.assertEquals("Failed in sync call.", 19, filler.getTop().num); Assert.assertEquals("Failed in sync call.", 19, filler.getTop().num);
@ -184,14 +227,29 @@ public class TestValueQueue {
public void testgetAtMostPolicyATLEAST_ONE() throws Exception { public void testgetAtMostPolicyATLEAST_ONE() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.3f, 300, 1, new ValueQueue<String>(10, 0.3f, 30000, 1,
SyncGenerationPolicy.ATLEAST_ONE, filler); SyncGenerationPolicy.ATLEAST_ONE, filler);
// Trigger a prefill (3) and an async refill (8)
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(3, filler.getTop().num); Assert.assertEquals(3, filler.getTop().num);
// Drain completely
Assert.assertEquals(2, vq.getAtMost("k1", 10).size()); // Wait for the async task to finish
// Asynch Refill call waitForRefill(vq, "k1", 10);
Assert.assertEquals(10, filler.getTop().num); // Refill task should add 8 values to get to a full queue (3 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Queue is empty, sync will return a single value and trigger a refill
Assert.assertEquals(1, vq.getAtMost("k1", 10).size());
Assert.assertEquals(1, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
vq.shutdown(); vq.shutdown();
} }
@ -202,16 +260,29 @@ public class TestValueQueue {
public void testgetAtMostPolicyLOW_WATERMARK() throws Exception { public void testgetAtMostPolicyLOW_WATERMARK() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.3f, 300, 1, new ValueQueue<String>(10, 0.3f, 30000, 1,
SyncGenerationPolicy.LOW_WATERMARK, filler); SyncGenerationPolicy.LOW_WATERMARK, filler);
// Trigger a prefill (3) and an async refill (8)
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(3, filler.getTop().num); Assert.assertEquals(3, filler.getTop().num);
// Drain completely
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 8 values to get to a full queue (3 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1");
// Queue is empty, sync will return 3 values and trigger a refill
Assert.assertEquals(3, vq.getAtMost("k1", 10).size()); Assert.assertEquals(3, vq.getAtMost("k1", 10).size());
// Synchronous call Assert.assertEquals(3, filler.getTop().num);
Assert.assertEquals(1, filler.getTop().num);
// Asynch Refill call // Wait for the async task to finish
Assert.assertEquals(10, filler.getTop().num); waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
vq.shutdown(); vq.shutdown();
} }
@ -219,11 +290,27 @@ public class TestValueQueue {
public void testDrain() throws Exception { public void testDrain() throws Exception {
MockFiller filler = new MockFiller(); MockFiller filler = new MockFiller();
ValueQueue<String> vq = ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1, new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler); SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (1) and an async refill (10)
Assert.assertEquals("test", vq.getNext("k1")); Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num); Assert.assertEquals(1, filler.getTop().num);
// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue (1 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(10, filler.getTop().num);
// Drain completely, no further refills triggered
vq.drain("k1"); vq.drain("k1");
// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertNull(filler.getTop()); Assert.assertNull(filler.getTop());
vq.shutdown(); vq.shutdown();
} }