NIFI-2754

- Migrating swap to active prior to swapping if necessary.
- This closes #1000.
This commit is contained in:
Peter Wicks 2016-09-09 22:10:01 -06:00 committed by Matt Gilman
parent 67a47dbead
commit 8a28395e9f
2 changed files with 28 additions and 1 deletions

View File

@ -389,7 +389,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
if (flowFile != null) {
incrementActiveQueueSize(-1, -flowFile.getSize());
}
} while (isExpired);
}
while (isExpired);
if (!expiredRecords.isEmpty()) {
incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
@ -547,6 +548,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
return;
}
migrateSwapToActive();
final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
int originalSwapQueueCount = swapQueue.size();

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -503,6 +504,29 @@ public class TestStandardFlowFileQueue {
assertNull(status.getFailureReason());
}
@Test(timeout = 5000)
public void testListFlowFilesResultsLimitedCollection() throws InterruptedException {
Collection<FlowFileRecord> tff = new ArrayList<>();
//Swap Size is 10000 records, so 30000 is equal to 3 swap files.
for (int i = 0; i < 30000; i++) {
tff.add(new TestFlowFile());
}
queue.putAll(tff);
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100);
assertNotNull(status);
assertEquals(30000, status.getQueueSize().getObjectCount());
while (status.getState() != ListFlowFileState.COMPLETE) {
Thread.sleep(100);
}
assertEquals(100, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
}
private class TestSwapManager implements FlowFileSwapManager {
private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();