NIFI-3250 Fixing logic in StandardFlowFileQueue when migrating flow files to the active queue

This closes #3250.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Bryan Bende 2016-12-22 15:48:45 -05:00 committed by Aldrin Piri
parent af8ed8b7de
commit 978f483ba8
No known key found for this signature in database
GPG Key ID: 531AEBAA4CFE5D00
2 changed files with 62 additions and 6 deletions

View File

@ -448,7 +448,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
// Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
// to disk, because we want them to be swapped back in in the same order that they were swapped out.
if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
final int activeQueueSize = activeQueue.size();
if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
return;
}

View File

@ -74,6 +74,12 @@ public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
private Connection connection = null;
private FlowFileRepository flowFileRepo = null;
private ProvenanceEventRepository provRepo = null;
private ResourceClaimManager claimManager = null;
private ProcessScheduler scheduler = null;
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
@BeforeClass
@ -86,16 +92,16 @@ public class TestStandardFlowFileQueue {
public void setup() {
provRecords.clear();
final Connection connection = Mockito.mock(Connection.class);
connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
scheduler = Mockito.mock(ProcessScheduler.class);
swapManager = new TestSwapManager();
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
flowFileRepo = Mockito.mock(FlowFileRepository.class);
provRepo = Mockito.mock(ProvenanceEventRepository.class);
claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
Mockito.doAnswer(new Answer<Object>() {
@ -382,6 +388,55 @@ public class TestStandardFlowFileQueue {
queue.poll(exp);
}
@Test
public void testSwapInWhenThresholdIsLessThanSwapSize() {
// create a queue where the swap threshold is less than 10k
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000);
for (int i = 1; i <= 20000; i++) {
queue.put(new TestFlowFile());
}
assertEquals(1, swapManager.swappedOut.size());
queue.put(new TestFlowFile());
assertEquals(1, swapManager.swappedOut.size());
final Set<FlowFileRecord> exp = new HashSet<>();
// At this point there should be:
// 1k flow files in the active queue
// 9,001 flow files in the swap queue
// 10k flow files swapped to disk
for (int i = 0; i < 999; i++) { //
final FlowFileRecord flowFile = queue.poll(exp);
assertNotNull(flowFile);
assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
queue.acknowledge(Collections.singleton(flowFile));
assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
}
assertEquals(0, swapManager.swapInCalledCount);
assertEquals(1, queue.getActiveQueueSize().getObjectCount());
assertNotNull(queue.poll(exp));
assertEquals(0, swapManager.swapInCalledCount);
assertEquals(0, queue.getActiveQueueSize().getObjectCount());
assertEquals(1, swapManager.swapOutCalledCount);
assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
assertEquals(1, swapManager.swapInCalledCount);
assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
assertTrue(swapManager.swappedOut.isEmpty());
queue.poll(exp);
}
@Test
public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
for (int i = 1; i <= 20000; i++) {