From e5ed62a98fe0c15fcba865edfe00eac673a6ac51 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 27 Dec 2017 17:38:57 +0900 Subject: [PATCH] NIFI-4724: Support 0 byte message with PublishKafka Before this fix, PublishKafka (0.9) and PublishKafka_0_10 fail with empty incoming FlowFiles due to 'transfer relationship not specified' error. Because the internal 'publish' method is not called as StreamDemarcator does not emit any token regardless whether demarcator is set or not. As for PublishKafka_0_11 and PublishKafka_1_0, empty FlowFiles are transferred to 'success' relationship, however no Kafka message is sent to Kafka. Since Kafka allows 0 byte body empty messages, NiFi should be able to send it, too. This commit changes above current situation to the followings, with all PublishKafka_* processors: - If demarcator is not set, then publish incoming FlowFile content as it is. This enables sending an empty Kafka message. - If demarcator is set, send each token as a separate message. Even if no token is found (empty incoming FlowFile), transfer the FlowFile to 'success'. This closes #2362. Signed-off-by: Mark Payne --- .../kafka/pubsub/InFlightMessageTracker.java | 4 ++ .../kafka/pubsub/PublisherLease.java | 18 +++++- .../kafka/pubsub/TestPublisherLease.java | 55 +++++++++++++++++- .../kafka/pubsub/InFlightMessageTracker.java | 4 ++ .../kafka/pubsub/PublisherLease.java | 18 +++++- .../kafka/pubsub/TestPublisherLease.java | 55 +++++++++++++++++- .../kafka/pubsub/InFlightMessageTracker.java | 8 +++ .../kafka/pubsub/PublisherLease.java | 18 +++++- .../kafka/pubsub/TestPublisherLease.java | 56 ++++++++++++++++++- .../kafka/pubsub/InFlightMessageTracker.java | 4 ++ .../kafka/pubsub/PublisherLease.java | 17 +++++- .../kafka/pubsub/TestPublisherLease.java | 54 +++++++++++++++++- 12 files changed, 299 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index 58157d9e01..bdb419c152 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -42,6 +42,10 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ public void trackEmpty(final FlowFile flowFile) { messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index c17c33146d..ea1c087a38 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -38,6 +38,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -71,9 +72,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); @@ -82,6 +95,7 @@ public class PublisherLease implements Closeable { return; } } + tracker.trackEmpty(flowFile); } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 836a4b3c7d..ac204f10e1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -81,7 +81,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -205,6 +207,57 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } + @Test public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index 317b274e8a..3ec3d1c53f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -45,6 +45,10 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ public void trackEmpty(final FlowFile flowFile) { messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index e8d744aa2d..72c90d2646 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -113,9 +114,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(logger); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); @@ -123,6 +136,7 @@ public class PublisherLease implements Closeable { // If we have a failure, don't try to send anything else. return; } + tracker.trackEmpty(flowFile); } } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 105a4d5d3a..d2b52dd228 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -80,7 +80,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -201,6 +203,57 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } + @Test public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index e7d5cb7163..bdb419c152 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -42,6 +42,14 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ + public void trackEmpty(final FlowFile flowFile) { + messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); + } + public int getAcknowledgedCount(final FlowFile flowFile) { final Counts counter = messageCountsByFlowFile.get(flowFile); return (counter == null) ? 0 : counter.getAcknowledgedCount(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 8fb4e67fa1..ca9537cebe 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -61,9 +62,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); @@ -72,6 +85,7 @@ public class PublisherLease implements Closeable { return; } } + tracker.trackEmpty(flowFile); } catch (final TokenTooLargeException ttle) { tracker.fail(flowFile, ttle); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index c2d143cf31..b67cefeaa8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -68,7 +68,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -191,4 +193,56 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } + } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java index 317b274e8a..3ec3d1c53f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -45,6 +45,10 @@ public class InFlightMessageTracker { } } + /** + * This method guarantees that the specified FlowFile to be transferred to + * 'success' relationship even if it did not derive any Kafka message. + */ public void trackEmpty(final FlowFile flowFile) { messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index e8d744aa2d..2b1cfe2610 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -113,9 +114,21 @@ public class PublisherLease implements Closeable { tracker = new InFlightMessageTracker(logger); } - try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + try { byte[] messageContent; - try { + if (demarcatorBytes == null || demarcatorBytes.length == 0) { + if (flowFile.getSize() > maxMessageSize) { + tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); + return; + } + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + publish(flowFile, messageKey, messageContent, topic, tracker); + return; + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { while ((messageContent = demarcator.nextToken()) != null) { publish(flowFile, messageKey, messageContent, topic, tracker); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 64451d5ab7..b2e1b0ef1f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -79,7 +79,9 @@ public class TestPublisherLease { } }; - final FlowFile flowFile = new MockFlowFile(1L); + final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L)); + // Need a size grater than zero to make the lease reads the InputStream. + Mockito.when(flowFile.getSize()).thenReturn(1L); final String topic = "unit-test"; final byte[] messageKey = null; final byte[] demarcatorBytes = null; @@ -200,6 +202,56 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + @Test + @SuppressWarnings("unchecked") + public void testZeroByteMessageSent() throws IOException { + final AtomicInteger poisonCount = new AtomicInteger(0); + + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) { + @Override + protected void poison() { + poisonCount.incrementAndGet(); + super.poison(); + } + }; + + final AtomicInteger correctMessages = new AtomicInteger(0); + final AtomicInteger incorrectMessages = new AtomicInteger(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + final byte[] value = record.value(); + final String valueString = new String(value, StandardCharsets.UTF_8); + if ("".equals(valueString)) { + correctMessages.incrementAndGet(); + } else { + incorrectMessages.incrementAndGet(); + } + + return null; + } + }).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + + final FlowFile flowFile = new MockFlowFile(1L); + final String topic = "unit-test"; + final byte[] messageKey = null; + final byte[] demarcatorBytes = null; + + final byte[] flowFileContent = new byte[0]; + lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic); + + assertEquals(0, poisonCount.get()); + + verify(producer, times(0)).flush(); + + final PublishResult result = lease.complete(); + + assertEquals(1, correctMessages.get()); + assertEquals(0, incorrectMessages.get()); + + verify(producer, times(1)).flush(); + } @Test public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {