Fixes incorrect messages count in Provenance reporter. Adds Unit test to verify fix

This commit is contained in:
Oscar de la Pena 2015-03-19 10:10:09 +08:00
parent dea9e22475
commit eb5ec703ba
2 changed files with 114 additions and 52 deletions

View File

@ -309,14 +309,9 @@ public class PutKafka extends AbstractProcessor {
data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
} }
createMessage: if ( data != null ) { if ( data != null ) {
// If the message has no data, ignore it. // If the message has no data, ignore it.
if ( data.length == 0 ) { if ( data.length != 0 ) {
data = null;
baos.reset();
break createMessage;
}
// either we ran out of data or we reached the end of the message. // either we ran out of data or we reached the end of the message.
// Either way, create the message because it's ready to send. // Either way, create the message because it's ready to send.
final KeyedMessage<byte[], byte[]> message; final KeyedMessage<byte[], byte[]> message;
@ -357,16 +352,18 @@ public class PutKafka extends AbstractProcessor {
// and send them to 'failure'. // and send them to 'failure'.
lastMessageOffset.set(in.getBytesConsumed()); lastMessageOffset.set(in.getBytesConsumed());
} }
}
// reset BAOS so that we can start a new message. // reset BAOS so that we can start a new message.
baos.reset(); baos.reset();
data = null; data = null;
} }
} }
// If there are messages left, send them // If there are messages left, send them
if ( !messages.isEmpty() ) { if ( !messages.isEmpty() ) {
try { try {
messagesSent.addAndGet(messages.size()); // add count of messages
producer.send(messages); producer.send(messages);
} catch (final Exception e) { } catch (final Exception e) {
throw new ProcessException("Failed to send messages to Kafka", e); throw new ProcessException("Failed to send messages to Kafka", e);
@ -377,7 +374,6 @@ public class PutKafka extends AbstractProcessor {
}); });
final long nanos = System.nanoTime() - start; final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});

View File

@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import kafka.common.FailedToSendMessageException; import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.Producer;
@ -32,12 +33,14 @@ import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; import kafka.producer.ProducerConfig;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.*;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
public class TestPutKafka { public class TestPutKafka {
@ -140,6 +143,68 @@ public class TestPutKafka {
assertTrue(Arrays.equals("4".getBytes(), msgs.get(3))); assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
} }
@Test
public void testProvenanceReporterMessagesCount(){
final TestableProcessor processor = new TestableProcessor();
ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter());
AtomicLong idGenerator = new AtomicLong(0L);
SharedSessionState sharedState = new SharedSessionState(processor, idGenerator);
Whitebox.setInternalState(sharedState, "provenanceReporter", spyProvenanceReporter);
MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue();
MockSessionFactory sessionFactory = Mockito.mock(MockSessionFactory.class);
MockProcessSession mockProcessSession = new MockProcessSession(sharedState);
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
final TestRunner runner = TestRunners.newTestRunner(processor);
Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
MockFlowFile mockFlowFile = mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1", "Sent 4 messages");
}
@Test
public void testProvenanceReporterWithoutDelimiterMessagesCount(){
final TestableProcessor processor = new TestableProcessor();
ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter());
AtomicLong idGenerator = new AtomicLong(0L);
SharedSessionState sharedState = new SharedSessionState(processor, idGenerator);
Whitebox.setInternalState(sharedState, "provenanceReporter", spyProvenanceReporter);
MockFlowFileQueue flowFileQueue = sharedState.getFlowFileQueue();
MockSessionFactory sessionFactory = Mockito.mock(MockSessionFactory.class);
MockProcessSession mockProcessSession = new MockProcessSession(sharedState);
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
final TestRunner runner = TestRunners.newTestRunner(processor);
Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
MockFlowFile mockFlowFile = mockProcessSession.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1");
}
@Test @Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
@ -233,4 +298,5 @@ public class TestPutKafka {
failAfter = successCount; failAfter = successCount;
} }
} }
} }