diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index 58157d9e01..bdb419c152 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -42,6 +42,10 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ public void trackEmpty(final FlowFile flowFile) { messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index c17c33146d..ea1c087a38 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -38,6 +38,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -71,9 +72,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); @@ -82,6 +95,7 @@ public class PublisherLease implements Closeable { return; } } + tracker.trackEmpty(flowFile); } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 836a4b3c7d..ac204f10e1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -81,7 +81,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -205,6 +207,57 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } + @Test public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index 317b274e8a..3ec3d1c53f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -45,6 +45,10 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ public void trackEmpty(final FlowFile flowFile) { messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index e8d744aa2d..72c90d2646 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -113,9 +114,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(logger); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); @@ -123,6 +136,7 @@ public class PublisherLease implements Closeable { // If we have a failure, don't try to send anything else. return; } + tracker.trackEmpty(flowFile); } } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 105a4d5d3a..d2b52dd228 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -80,7 +80,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -201,6 +203,57 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } + @Test public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index e7d5cb7163..bdb419c152 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -42,6 +42,14 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ + public void trackEmpty(final FlowFile flowFile) { + messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); + } + public int getAcknowledgedCount(final FlowFile flowFile) { final Counts counter = messageCountsByFlowFile.get(flowFile); return (counter == null) ? 0 : counter.getAcknowledgedCount(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 8fb4e67fa1..ca9537cebe 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -61,9 +62,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); @@ -72,6 +85,7 @@ public class PublisherLease implements Closeable { return; } } + tracker.trackEmpty(flowFile); } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index c2d143cf31..b67cefeaa8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -68,7 +68,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -191,4 +193,56 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } + } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index 317b274e8a..3ec3d1c53f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -45,6 +45,10 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ public void trackEmpty(final FlowFile flowFile) { messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index e8d744aa2d..2b1cfe2610 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -113,9 +114,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(logger); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 64451d5ab7..b2e1b0ef1f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -79,7 +79,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -200,6 +202,56 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } @Test public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {