diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java index f0d2e916ec..acdcec6a60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java @@ -77,6 +77,8 @@ public interface Connection extends Authorizable { List poll(FlowFileFilter filter, Set expiredRecords); + FlowFileRecord poll(Set expiredRecords); + void verifyCanUpdate() throws IllegalStateException; void verifyCanDelete() throws IllegalStateException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 8b81c81a92..728e8cf2b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -324,6 +324,11 @@ public final class StandardConnection implements Connection { return flowFileQueue.poll(filter, expiredRecords); } + @Override + public FlowFileRecord poll(final Set expiredRecords) { + return flowFileQueue.poll(expiredRecords); + } + @Override public boolean equals(final Object other) { if (!(other instanceof Connection)) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3a51816da7..512461f8a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1455,7 +1455,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) { final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % numConnections); final Set expired = new HashSet<>(); - final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired); + final FlowFileRecord flowFile = conn.poll(expired); removeExpired(expired, conn); if (flowFile != null) { @@ -1484,10 +1484,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Connection connection = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); - return get(connection, new QueuePoller() { + return get(connection, new ConnectionPoller() { @Override - public List poll(final FlowFileQueue queue, final Set expiredRecords) { - return queue.poll(new FlowFileFilter() { + public List poll(final Connection connection, final Set expiredRecords) { + return connection.poll(new FlowFileFilter() { int polled = 0; @Override @@ -1505,22 +1505,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public List get(final FlowFileFilter filter) { - return get(new QueuePoller() { + return get(new ConnectionPoller() { @Override - public List poll(final FlowFileQueue queue, final Set expiredRecords) { - return queue.poll(filter, expiredRecords); + public List poll(final Connection connection, final Set expiredRecords) { + return connection.poll(filter, expiredRecords); } }, true); } - private List get(final Connection connection, final QueuePoller poller, final boolean lockQueue) { + private List get(final Connection connection, final ConnectionPoller poller, final boolean lockQueue) { if (lockQueue) { connection.lock(); } try { final Set expired = new HashSet<>(); - final List newlySelected = poller.poll(connection.getFlowFileQueue(), expired); + final List newlySelected = poller.poll(connection, expired); removeExpired(expired, connection); if (newlySelected.isEmpty() && expired.isEmpty()) { @@ -1539,7 +1539,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - private List get(final QueuePoller poller, final boolean lockAllQueues) { + private List get(final ConnectionPoller poller, final boolean lockAllQueues) { final List connections = context.getPollableConnections(); if (lockAllQueues) { for (final Connection connection : connections) { @@ -1547,10 +1547,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + final int startIndex = context.getNextIncomingConnectionIndex(); + try { - for (final Connection conn : connections) { + for (int i = 0; i < connections.size(); i++) { + final int connectionIndex = (startIndex + i) % connections.size(); + final Connection conn = connections.get(connectionIndex); + final Set expired = new HashSet<>(); - final List newlySelected = poller.poll(conn.getFlowFileQueue(), expired); + final List newlySelected = poller.poll(conn, expired); removeExpired(expired, conn); if (newlySelected.isEmpty() && expired.isEmpty()) { @@ -3031,9 +3036,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * Callback interface used to poll a FlowFileQueue, in order to perform * functional programming-type of polling a queue */ - private static interface QueuePoller { + private static interface ConnectionPoller { - List poll(FlowFileQueue queue, Set expiredRecords); + List poll(Connection connection, Set expiredRecords); } private static class Checkpoint { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 4357aa33c5..87ab29b30d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -54,6 +54,7 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.LoggerFactory; @@ -93,6 +94,7 @@ public class TestFileSystemRepository { } @Test + @Ignore("Intended for manual testing only, in order to judge changes to performance") public void testWritePerformance() throws IOException { final long bytesToWrite = 1_000_000_000L; final int contentSize = 100; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index e0c9ffee5e..1dcb9919dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.notNull; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; @@ -69,6 +71,7 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.MissingFlowFileException; @@ -98,6 +101,7 @@ public class TestStandardProcessSession { private MockContentRepository contentRepo; private FlowFileQueue flowFileQueue; private ProcessContext context; + private Connectable connectable; private ProvenanceEventRepository provenanceRepo; private MockFlowFileRepository flowFileRepo; @@ -138,7 +142,6 @@ public class TestStandardProcessSession { } @Before - @SuppressWarnings("unchecked") public void setup() throws IOException { resourceClaimManager = new StandardResourceClaimManager(); @@ -147,33 +150,7 @@ public class TestStandardProcessSession { final CounterRepository counterRepo = Mockito.mock(CounterRepository.class); provenanceRepo = new MockProvenanceRepository(); - final Connection connection = Mockito.mock(Connection.class); - final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); - - final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); - flowFileQueue = Mockito.spy(actualQueue); - when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - flowFileQueue.put((FlowFileRecord) invocation.getArguments()[0]); - return null; - } - }).when(connection).enqueue(Mockito.any(FlowFileRecord.class)); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - flowFileQueue.putAll((Collection) invocation.getArguments()[0]); - return null; - } - }).when(connection).enqueue(Mockito.any(Collection.class)); - - final Connectable dest = Mockito.mock(Connectable.class); - when(connection.getDestination()).thenReturn(dest); - when(connection.getSource()).thenReturn(dest); + final Connection connection = createConnection(); final List connList = new ArrayList<>(); connList.add(connection); @@ -181,7 +158,7 @@ public class TestStandardProcessSession { final ProcessGroup procGroup = Mockito.mock(ProcessGroup.class); when(procGroup.getIdentifier()).thenReturn("proc-group-identifier-1"); - final Connectable connectable = Mockito.mock(Connectable.class); + connectable = Mockito.mock(Connectable.class); when(connectable.hasIncomingConnection()).thenReturn(true); when(connectable.getIncomingConnections()).thenReturn(connList); when(connectable.getProcessGroup()).thenReturn(procGroup); @@ -214,6 +191,141 @@ public class TestStandardProcessSession { session = new StandardProcessSession(context); } + @SuppressWarnings("unchecked") + private Connection createConnection() { + final Connection connection = Mockito.mock(Connection.class); + + if (flowFileQueue == null) { + final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); + final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); + + final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, + processScheduler, swapManager, null, 10000); + flowFileQueue = Mockito.spy(actualQueue); + } + + when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + flowFileQueue.put((FlowFileRecord) invocation.getArguments()[0]); + return null; + } + }).when(connection).enqueue(Mockito.any(FlowFileRecord.class)); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + flowFileQueue.putAll((Collection) invocation.getArguments()[0]); + return null; + } + }).when(connection).enqueue(Mockito.any(Collection.class)); + + final Connectable dest = Mockito.mock(Connectable.class); + when(connection.getDestination()).thenReturn(dest); + when(connection.getSource()).thenReturn(dest); + + Mockito.doAnswer(new Answer() { + @Override + public FlowFile answer(InvocationOnMock invocation) throws Throwable { + return flowFileQueue.poll(invocation.getArgumentAt(0, Set.class)); + } + }).when(connection).poll(any(Set.class)); + + Mockito.doAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return flowFileQueue.poll(invocation.getArgumentAt(0, FlowFileFilter.class), invocation.getArgumentAt(1, Set.class)); + } + }).when(connection).poll(any(FlowFileFilter.class), any(Set.class)); + + return connection; + } + + @Test + @SuppressWarnings("unchecked") + public void testRoundRobinOnSessionGetNoArgs() { + final List connList = new ArrayList<>(); + final Connection conn1 = createConnection(); + final Connection conn2 = createConnection(); + connList.add(conn1); + connList.add(conn2); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord); + flowFileQueue.put(flowFileRecord); + + when(connectable.getIncomingConnections()).thenReturn(connList); + + session.get(); + session.get(); + + verify(conn1, times(1)).poll(any(Set.class)); + verify(conn2, times(1)).poll(any(Set.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRoundRobinOnSessionGetWithCount() { + final List connList = new ArrayList<>(); + final Connection conn1 = createConnection(); + final Connection conn2 = createConnection(); + connList.add(conn1); + connList.add(conn2); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord); + flowFileQueue.put(flowFileRecord); + + when(connectable.getIncomingConnections()).thenReturn(connList); + + session.get(1); + session.get(1); + + verify(conn1, times(1)).poll(any(FlowFileFilter.class), any(Set.class)); + verify(conn2, times(1)).poll(any(FlowFileFilter.class), any(Set.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRoundRobinOnSessionGetWithFilter() { + final List connList = new ArrayList<>(); + final Connection conn1 = createConnection(); + final Connection conn2 = createConnection(); + connList.add(conn1); + connList.add(conn2); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord); + flowFileQueue.put(flowFileRecord); + + when(connectable.getIncomingConnections()).thenReturn(connList); + + final FlowFileFilter filter = ff -> FlowFileFilterResult.ACCEPT_AND_TERMINATE; + + session.get(filter); + session.get(filter); + + verify(conn1, times(1)).poll(any(FlowFileFilter.class), any(Set.class)); + verify(conn2, times(1)).poll(any(FlowFileFilter.class), any(Set.class)); + } + @Test public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()