NIFI-3909: This closes #1806. If we have a FlowFile with 0 records, ensure that PublishKafkaRecord_0_10 handles the flowfile properly

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-05-16 10:49:17 -04:00 committed by joewitt
parent f5f6cab646
commit fb94990e60
3 changed files with 34 additions and 0 deletions

View File

@ -42,6 +42,10 @@ public class InFlightMessageTracker {
} }
} }
public void trackEmpty(final FlowFile flowFile) {
messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
}
public int getAcknowledgedCount(final FlowFile flowFile) { public int getAcknowledgedCount(final FlowFile flowFile) {
final Counts counter = messageCountsByFlowFile.get(flowFile); final Counts counter = messageCountsByFlowFile.get(flowFile);
return (counter == null) ? 0 : counter.getAcknowledgedCount(); return (counter == null) ? 0 : counter.getAcknowledgedCount();

View File

@ -102,9 +102,11 @@ public class PublisherLease implements Closeable {
Record record; Record record;
final RecordSet recordSet = reader.createRecordSet(); final RecordSet recordSet = reader.createRecordSet();
int recordCount = 0;
try { try {
while ((record = recordSet.next()) != null) { while ((record = recordSet.next()) != null) {
recordCount++;
baos.reset(); baos.reset();
writer.write(record, baos); writer.write(record, baos);
@ -119,6 +121,10 @@ public class PublisherLease implements Closeable {
return; return;
} }
} }
if (recordCount == 0) {
tracker.trackEmpty(flowFile);
}
} catch (final TokenTooLargeException ttle) { } catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle); tracker.fail(flowFile, ttle);
} catch (final Exception e) { } catch (final Exception e) {

View File

@ -191,6 +191,30 @@ public class TestPublishKafkaRecord_0_10 {
.count()); .count());
} }
@Test
public void testNoRecordsInFlowFile() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue(new byte[0]));
final Map<FlowFile, Integer> msgCounts = new HashMap<>();
msgCounts.put(flowFiles.get(0), 0);
final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
when(mockLease.complete()).thenReturn(result);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).get(0);
mff.assertAttributeEquals("msg.count", "0");
}
@Test @Test
public void testSomeSuccessSomeFailure() throws IOException { public void testSomeSuccessSomeFailure() throws IOException {