NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner

This closes #1111
This commit is contained in:
Pierre Villard 2016-10-06 22:06:45 +02:00 committed by Matt Burgess
parent c13cfa6ea6
commit 6aefc0b910
1 changed files with 32 additions and 2 deletions

View File

@ -1250,7 +1250,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final List<Connection> connections = context.getPollableConnections(); final List<Connection> connections = context.getPollableConnections();
final int numConnections = connections.size(); final int numConnections = connections.size();
for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) { for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) {
final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % numConnections);
final Set<FlowFileRecord> expired = new HashSet<>(); final Set<FlowFileRecord> expired = new HashSet<>();
final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired); final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired);
removeExpired(expired, conn); removeExpired(expired, conn);
@ -1273,7 +1273,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return Collections.emptyList(); return Collections.emptyList();
} }
return get(new QueuePoller() { // get batch of flow files in a round-robin manner
final List<Connection> connections = context.getPollableConnections();
final Connection connection = connections.get(context.getNextIncomingConnectionIndex() % connections.size());
return get(connection, new QueuePoller() {
@Override @Override
public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) { public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) {
return queue.poll(new FlowFileFilter() { return queue.poll(new FlowFileFilter() {
@ -1302,6 +1306,32 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}, true); }, true);
} }
private List<FlowFile> get(final Connection connection, final QueuePoller poller, final boolean lockQueue) {
if (lockQueue) {
connection.lock();
}
try {
final Set<FlowFileRecord> expired = new HashSet<>();
final List<FlowFileRecord> newlySelected = poller.poll(connection.getFlowFileQueue(), expired);
removeExpired(expired, connection);
if (newlySelected.isEmpty() && expired.isEmpty()) {
return new ArrayList<>();
}
for (final FlowFileRecord flowFile : newlySelected) {
registerDequeuedRecord(flowFile, connection);
}
return new ArrayList<FlowFile>(newlySelected);
} finally {
if (lockQueue) {
connection.unlock();
}
}
}
private List<FlowFile> get(final QueuePoller poller, final boolean lockAllQueues) { private List<FlowFile> get(final QueuePoller poller, final boolean lockAllQueues) {
final List<Connection> connections = context.getPollableConnections(); final List<Connection> connections = context.getPollableConnections();
if (lockAllQueues) { if (lockAllQueues) {