From eb5ec703ba0d5c188822a37f6d7eed14af56a594 Mon Sep 17 00:00:00 2001 From: Oscar de la Pena Date: Thu, 19 Mar 2015 10:10:09 +0800 Subject: [PATCH] Fixes incorrect messages count in Provenance reporter. Adds Unit test to verify fix --- .../nifi/processors/kafka/PutKafka.java | 90 +++++++++---------- .../nifi/processors/kafka/TestPutKafka.java | 76 ++++++++++++++-- 2 files changed, 114 insertions(+), 52 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 4df4e08264..e0b7588c41 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -309,64 +309,61 @@ public class PutKafka extends AbstractProcessor { data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); } - createMessage: if ( data != null ) { + if ( data != null ) { // If the message has no data, ignore it. - if ( data.length == 0 ) { - data = null; - baos.reset(); - break createMessage; - } - - // either we ran out of data or we reached the end of the message. - // Either way, create the message because it's ready to send. - final KeyedMessage message; - if ( key == null ) { - message = new KeyedMessage<>(topic, data); - } else { - message = new KeyedMessage<>(topic, keyBytes, data); - } - - // Add the message to the list of messages ready to send. If we've reached our - // threshold of how many we're willing to send (or if we're out of data), go ahead - // and send the whole List. - messages.add(message); - messageBytes += data.length; - if ( messageBytes >= maxBufferSize || streamFinished ) { - // send the messages, then reset our state. - try { - producer.send(messages); - } catch (final Exception e) { - // we wrap the general exception in ProcessException because we want to separate - // failures in sending messages from general Exceptions that would indicate bugs - // in the Processor. Failure to send a message should be handled appropriately, but - // we don't want to catch the general Exception or RuntimeException in order to catch - // failures from Kafka's Producer. - throw new ProcessException("Failed to send messages to Kafka", e); + if ( data.length != 0 ) { + // either we ran out of data or we reached the end of the message. + // Either way, create the message because it's ready to send. + final KeyedMessage message; + if (key == null) { + message = new KeyedMessage<>(topic, data); + } else { + message = new KeyedMessage<>(topic, keyBytes, data); + } + + // Add the message to the list of messages ready to send. If we've reached our + // threshold of how many we're willing to send (or if we're out of data), go ahead + // and send the whole List. + messages.add(message); + messageBytes += data.length; + if (messageBytes >= maxBufferSize || streamFinished) { + // send the messages, then reset our state. + try { + producer.send(messages); + } catch (final Exception e) { + // we wrap the general exception in ProcessException because we want to separate + // failures in sending messages from general Exceptions that would indicate bugs + // in the Processor. Failure to send a message should be handled appropriately, but + // we don't want to catch the general Exception or RuntimeException in order to catch + // failures from Kafka's Producer. + throw new ProcessException("Failed to send messages to Kafka", e); + } + + messagesSent.addAndGet(messages.size()); // count number of messages sent + + // reset state + messages.clear(); + messageBytes = 0; + + // We've successfully sent a batch of messages. Keep track of the byte offset in the + // FlowFile of the last successfully sent message. This way, if the messages cannot + // all be successfully sent, we know where to split off the data. This allows us to then + // split off the first X number of bytes and send to 'success' and then split off the rest + // and send them to 'failure'. + lastMessageOffset.set(in.getBytesConsumed()); } - - messagesSent.addAndGet(messages.size()); // count number of messages sent - - // reset state - messages.clear(); - messageBytes = 0; - - // We've successfully sent a batch of messages. Keep track of the byte offset in the - // FlowFile of the last successfully sent message. This way, if the messages cannot - // all be successfully sent, we know where to split off the data. This allows us to then - // split off the first X number of bytes and send to 'success' and then split off the rest - // and send them to 'failure'. - lastMessageOffset.set(in.getBytesConsumed()); } - // reset BAOS so that we can start a new message. baos.reset(); data = null; + } } // If there are messages left, send them if ( !messages.isEmpty() ) { try { + messagesSent.addAndGet(messages.size()); // add count of messages producer.send(messages); } catch (final Exception e) { throw new ProcessException("Failed to send messages to Kafka", e); @@ -377,7 +374,6 @@ public class PutKafka extends AbstractProcessor { }); final long nanos = System.nanoTime() - start; - session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.transfer(flowFile, REL_SUCCESS); getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index b0f239451a..56a5c4b0ae 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import kafka.common.FailedToSendMessageException; import kafka.javaapi.producer.Producer; @@ -32,12 +33,14 @@ import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.nifi.annotation.lifecycle.OnScheduled; + import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.util.*; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; public class TestPutKafka { @@ -139,8 +142,70 @@ public class TestPutKafka { assertTrue(Arrays.equals("3".getBytes(), msgs.get(2))); assertTrue(Arrays.equals("4".getBytes(), msgs.get(3))); } - - + + @Test + public void testProvenanceReporterMessagesCount(){ + final TestableProcessor processor = new TestableProcessor(); + + ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter()); + + AtomicLong idGenerator = new AtomicLong(0L); + SharedSessionState sharedState = new SharedSessionState(processor, idGenerator); + Whitebox.setInternalState(sharedState, "provenanceReporter", spyProvenanceReporter); + MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue(); + MockSessionFactory sessionFactory = Mockito.mock(MockSessionFactory.class); + MockProcessSession mockProcessSession = new MockProcessSession(sharedState); + Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession); + + + final TestRunner runner = TestRunners.newTestRunner(processor); + Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue); + Whitebox.setInternalState(runner, "sessionFactory", sessionFactory); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); + runner.enqueue(bytes); + runner.run(); + + MockFlowFile mockFlowFile = mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); + Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1", "Sent 4 messages"); + } + + @Test + public void testProvenanceReporterWithoutDelimiterMessagesCount(){ + final TestableProcessor processor = new TestableProcessor(); + + ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter()); + + AtomicLong idGenerator = new AtomicLong(0L); + SharedSessionState sharedState = new SharedSessionState(processor, idGenerator); + Whitebox.setInternalState(sharedState, "provenanceReporter", spyProvenanceReporter); + MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue(); + MockSessionFactory sessionFactory = Mockito.mock(MockSessionFactory.class); + MockProcessSession mockProcessSession = new MockProcessSession(sharedState); + Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession); + + + final TestRunner runner = TestRunners.newTestRunner(processor); + Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue); + Whitebox.setInternalState(runner, "sessionFactory", sessionFactory); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); + runner.enqueue(bytes); + runner.run(); + + MockFlowFile mockFlowFile = mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); + Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1"); + } + @Test @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") public void testKeyValuePut() { @@ -233,4 +298,5 @@ public class TestPutKafka { failAfter = successCount; } } + }