diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index abcd15f249..d18df7f5bc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -148,13 +148,15 @@ public class PublisherLease implements Closeable { Record record; int recordCount = 0; - try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + try { while ((record = recordSet.next()) != null) { recordCount++; baos.reset(); - writer.write(record); - writer.flush(); + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + writer.write(record); + writer.flush(); + } final byte[] messageContent = baos.toByteArray(); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 54c122231e..64451d5ab7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Callback; @@ -35,6 +37,16 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.junit.Assert; import org.junit.Before; @@ -187,4 +199,36 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + + + @Test + public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8); + + final FlowFile flowFile = new MockFlowFile(1L); + final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8); + + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("person_id", RecordFieldType.LONG); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordSet recordSet = reader.createRecordSet(); + final RecordSchema schema = reader.getSchema(); + + final String topic = "unit-test"; + final String keyField = "person_id"; + + final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class); + final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); + + Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); + + lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic); + + verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any()); + verify(writer, times(2)).write(any(Record.class)); + verify(producer, times(2)).send(any(), any()); + } }