NIFI-3760: This closes #1717. Fixed issue that caused session to continually poll from the same connection when using ProcessSession.get(FlowFileFilter, Set)

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-04-28 16:15:44 -04:00 committed by joewitt
parent 40de1b18d9
commit 016ae3191e
5 changed files with 169 additions and 43 deletions

View File

@ -77,6 +77,8 @@ public interface Connection extends Authorizable {
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
void verifyCanUpdate() throws IllegalStateException;
void verifyCanDelete() throws IllegalStateException;

View File

@ -324,6 +324,11 @@ public final class StandardConnection implements Connection {
return flowFileQueue.poll(filter, expiredRecords);
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
return flowFileQueue.poll(expiredRecords);
}
@Override
public boolean equals(final Object other) {
if (!(other instanceof Connection)) {

View File

@ -1455,7 +1455,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) {
final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % numConnections);
final Set<FlowFileRecord> expired = new HashSet<>();
final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired);
final FlowFileRecord flowFile = conn.poll(expired);
removeExpired(expired, conn);
if (flowFile != null) {
@ -1484,10 +1484,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Connection connection = connections.get(context.getNextIncomingConnectionIndex() % connections.size());
return get(connection, new QueuePoller() {
return get(connection, new ConnectionPoller() {
@Override
public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) {
return queue.poll(new FlowFileFilter() {
public List<FlowFileRecord> poll(final Connection connection, final Set<FlowFileRecord> expiredRecords) {
return connection.poll(new FlowFileFilter() {
int polled = 0;
@Override
@ -1505,22 +1505,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public List<FlowFile> get(final FlowFileFilter filter) {
return get(new QueuePoller() {
return get(new ConnectionPoller() {
@Override
public List<FlowFileRecord> poll(final FlowFileQueue queue, final Set<FlowFileRecord> expiredRecords) {
return queue.poll(filter, expiredRecords);
public List<FlowFileRecord> poll(final Connection connection, final Set<FlowFileRecord> expiredRecords) {
return connection.poll(filter, expiredRecords);
}
}, true);
}
private List<FlowFile> get(final Connection connection, final QueuePoller poller, final boolean lockQueue) {
private List<FlowFile> get(final Connection connection, final ConnectionPoller poller, final boolean lockQueue) {
if (lockQueue) {
connection.lock();
}
try {
final Set<FlowFileRecord> expired = new HashSet<>();
final List<FlowFileRecord> newlySelected = poller.poll(connection.getFlowFileQueue(), expired);
final List<FlowFileRecord> newlySelected = poller.poll(connection, expired);
removeExpired(expired, connection);
if (newlySelected.isEmpty() && expired.isEmpty()) {
@ -1539,7 +1539,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
private List<FlowFile> get(final QueuePoller poller, final boolean lockAllQueues) {
private List<FlowFile> get(final ConnectionPoller poller, final boolean lockAllQueues) {
final List<Connection> connections = context.getPollableConnections();
if (lockAllQueues) {
for (final Connection connection : connections) {
@ -1547,10 +1547,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
final int startIndex = context.getNextIncomingConnectionIndex();
try {
for (final Connection conn : connections) {
for (int i = 0; i < connections.size(); i++) {
final int connectionIndex = (startIndex + i) % connections.size();
final Connection conn = connections.get(connectionIndex);
final Set<FlowFileRecord> expired = new HashSet<>();
final List<FlowFileRecord> newlySelected = poller.poll(conn.getFlowFileQueue(), expired);
final List<FlowFileRecord> newlySelected = poller.poll(conn, expired);
removeExpired(expired, conn);
if (newlySelected.isEmpty() && expired.isEmpty()) {
@ -3031,9 +3036,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
* Callback interface used to poll a FlowFileQueue, in order to perform
* functional programming-type of polling a queue
*/
private static interface QueuePoller {
private static interface ConnectionPoller {
List<FlowFileRecord> poll(FlowFileQueue queue, Set<FlowFileRecord> expiredRecords);
List<FlowFileRecord> poll(Connection connection, Set<FlowFileRecord> expiredRecords);
}
private static class Checkpoint {

View File

@ -54,6 +54,7 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@ -93,6 +94,7 @@ public class TestFileSystemRepository {
}
@Test
@Ignore("Intended for manual testing only, in order to judge changes to performance")
public void testWritePerformance() throws IOException {
final long bytesToWrite = 1_000_000_000L;
final int contentSize = 100;

View File

@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
@ -69,6 +71,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
@ -98,6 +101,7 @@ public class TestStandardProcessSession {
private MockContentRepository contentRepo;
private FlowFileQueue flowFileQueue;
private ProcessContext context;
private Connectable connectable;
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
@ -138,7 +142,6 @@ public class TestStandardProcessSession {
}
@Before
@SuppressWarnings("unchecked")
public void setup() throws IOException {
resourceClaimManager = new StandardResourceClaimManager();
@ -147,33 +150,7 @@ public class TestStandardProcessSession {
final CounterRepository counterRepo = Mockito.mock(CounterRepository.class);
provenanceRepo = new MockProvenanceRepository();
final Connection connection = Mockito.mock(Connection.class);
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
flowFileQueue = Mockito.spy(actualQueue);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
flowFileQueue.put((FlowFileRecord) invocation.getArguments()[0]);
return null;
}
}).when(connection).enqueue(Mockito.any(FlowFileRecord.class));
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
flowFileQueue.putAll((Collection<FlowFileRecord>) invocation.getArguments()[0]);
return null;
}
}).when(connection).enqueue(Mockito.any(Collection.class));
final Connectable dest = Mockito.mock(Connectable.class);
when(connection.getDestination()).thenReturn(dest);
when(connection.getSource()).thenReturn(dest);
final Connection connection = createConnection();
final List<Connection> connList = new ArrayList<>();
connList.add(connection);
@ -181,7 +158,7 @@ public class TestStandardProcessSession {
final ProcessGroup procGroup = Mockito.mock(ProcessGroup.class);
when(procGroup.getIdentifier()).thenReturn("proc-group-identifier-1");
final Connectable connectable = Mockito.mock(Connectable.class);
connectable = Mockito.mock(Connectable.class);
when(connectable.hasIncomingConnection()).thenReturn(true);
when(connectable.getIncomingConnections()).thenReturn(connList);
when(connectable.getProcessGroup()).thenReturn(procGroup);
@ -214,6 +191,141 @@ public class TestStandardProcessSession {
session = new StandardProcessSession(context);
}
@SuppressWarnings("unchecked")
private Connection createConnection() {
final Connection connection = Mockito.mock(Connection.class);
if (flowFileQueue == null) {
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null,
processScheduler, swapManager, null, 10000);
flowFileQueue = Mockito.spy(actualQueue);
}
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
flowFileQueue.put((FlowFileRecord) invocation.getArguments()[0]);
return null;
}
}).when(connection).enqueue(Mockito.any(FlowFileRecord.class));
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
flowFileQueue.putAll((Collection<FlowFileRecord>) invocation.getArguments()[0]);
return null;
}
}).when(connection).enqueue(Mockito.any(Collection.class));
final Connectable dest = Mockito.mock(Connectable.class);
when(connection.getDestination()).thenReturn(dest);
when(connection.getSource()).thenReturn(dest);
Mockito.doAnswer(new Answer<FlowFile>() {
@Override
public FlowFile answer(InvocationOnMock invocation) throws Throwable {
return flowFileQueue.poll(invocation.getArgumentAt(0, Set.class));
}
}).when(connection).poll(any(Set.class));
Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
@Override
public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
return flowFileQueue.poll(invocation.getArgumentAt(0, FlowFileFilter.class), invocation.getArgumentAt(1, Set.class));
}
}).when(connection).poll(any(FlowFileFilter.class), any(Set.class));
return connection;
}
@Test
@SuppressWarnings("unchecked")
public void testRoundRobinOnSessionGetNoArgs() {
final List<Connection> connList = new ArrayList<>();
final Connection conn1 = createConnection();
final Connection conn2 = createConnection();
connList.add(conn1);
connList.add(conn2);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
flowFileQueue.put(flowFileRecord);
when(connectable.getIncomingConnections()).thenReturn(connList);
session.get();
session.get();
verify(conn1, times(1)).poll(any(Set.class));
verify(conn2, times(1)).poll(any(Set.class));
}
@Test
@SuppressWarnings("unchecked")
public void testRoundRobinOnSessionGetWithCount() {
final List<Connection> connList = new ArrayList<>();
final Connection conn1 = createConnection();
final Connection conn2 = createConnection();
connList.add(conn1);
connList.add(conn2);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
flowFileQueue.put(flowFileRecord);
when(connectable.getIncomingConnections()).thenReturn(connList);
session.get(1);
session.get(1);
verify(conn1, times(1)).poll(any(FlowFileFilter.class), any(Set.class));
verify(conn2, times(1)).poll(any(FlowFileFilter.class), any(Set.class));
}
@Test
@SuppressWarnings("unchecked")
public void testRoundRobinOnSessionGetWithFilter() {
final List<Connection> connList = new ArrayList<>();
final Connection conn1 = createConnection();
final Connection conn2 = createConnection();
connList.add(conn1);
connList.add(conn2);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
flowFileQueue.put(flowFileRecord);
when(connectable.getIncomingConnections()).thenReturn(connList);
final FlowFileFilter filter = ff -> FlowFileFilterResult.ACCEPT_AND_TERMINATE;
session.get(filter);
session.get(filter);
verify(conn1, times(1)).poll(any(FlowFileFilter.class), any(Set.class));
verify(conn2, times(1)).poll(any(FlowFileFilter.class), any(Set.class));
}
@Test
public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()