From fb94990e60068eb244cae1d12d3a0790ece29adc Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 16 May 2017 10:49:17 -0400 Subject: [PATCH] NIFI-3909: This closes #1806. If we have a FlowFile with 0 records, ensure that PublishKafkaRecord_0_10 handles the flowfile properly Signed-off-by: joewitt --- .../kafka/pubsub/InFlightMessageTracker.java | 4 ++++ .../kafka/pubsub/PublisherLease.java | 6 +++++ .../pubsub/TestPublishKafkaRecord_0_10.java | 24 +++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index e7d5cb7163..58157d9e01 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -42,6 +42,10 @@ public class InFlightMessageTracker { } } + public void trackEmpty(final FlowFile flowFile) { + messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); + } + public int getAcknowledgedCount(final FlowFile flowFile) { final Counts counter = messageCountsByFlowFile.get(flowFile); return (counter == null) ? 0 : counter.getAcknowledgedCount(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index f08f7a9208..be2697b1cf 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -102,9 +102,11 @@ public class PublisherLease implements Closeable { Record record; final RecordSet recordSet = reader.createRecordSet(); + int recordCount = 0; try { while ((record = recordSet.next()) != null) { + recordCount++; baos.reset(); writer.write(record, baos); @@ -119,6 +121,10 @@ public class PublisherLease implements Closeable { return; } } + + if (recordCount == 0) { + tracker.trackEmpty(flowFile); + } } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); } catch (final Exception e) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java index c1df792101..8c6efb75af 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java @@ -191,6 +191,30 @@ public class TestPublishKafkaRecord_0_10 { .count()); } + @Test + public void testNoRecordsInFlowFile() throws IOException { + final List flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue(new byte[0])); + + final Map msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 0); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap()); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).get(0); + mff.assertAttributeEquals("msg.count", "0"); + } + @Test public void testSomeSuccessSomeFailure() throws IOException {