diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 70957cefc2..5a05a0887c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -106,13 +106,15 @@ public class PublisherLease implements Closeable { Record record; int recordCount = 0; - try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + try { while ((record = recordSet.next()) != null) { recordCount++; baos.reset(); - writer.write(record); - writer.flush(); + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + writer.write(record); + writer.flush(); + } final byte[] messageContent = baos.toByteArray(); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index c2d143cf31..836a4b3c7d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -29,6 +30,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Callback; @@ -36,6 +38,17 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.junit.Assert; import org.junit.Before; @@ -191,4 +204,37 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + + @Test + public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger); + + final FlowFile flowFile = new MockFlowFile(1L); + final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8); + + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("person_id", RecordFieldType.LONG); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordSet recordSet = reader.createRecordSet(); + final RecordSchema schema = reader.getSchema(); + + final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age"); + + final String topic = "unit-test"; + final String keyField = "person_id"; + + final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class); + final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); + + Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); + + lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic); + + verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any()); + verify(writer, times(2)).write(any(Record.class)); + verify(producer, times(2)).send(any(), any()); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index abcd15f249..d18df7f5bc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -148,13 +148,15 @@ public class PublisherLease implements Closeable { Record record; int recordCount = 0; - try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + try { while ((record = recordSet.next()) != null) { recordCount++; baos.reset(); - writer.write(record); - writer.flush(); + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + writer.write(record); + writer.flush(); + } final byte[] messageContent = baos.toByteArray(); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 54c122231e..105a4d5d3a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Callback; @@ -35,6 +37,17 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.junit.Assert; import org.junit.Before; @@ -187,4 +200,37 @@ public class TestPublisherLease { verify(producer, times(1)).flush(); } + + @Test + public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException { + final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8); + + final FlowFile flowFile = new MockFlowFile(1L); + final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8); + + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("person_id", RecordFieldType.LONG); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordSet recordSet = reader.createRecordSet(); + final RecordSchema schema = reader.getSchema(); + + final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age"); + + final String topic = "unit-test"; + final String keyField = "person_id"; + + final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class); + final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); + + Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); + + lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic); + + verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any()); + verify(writer, times(2)).write(any(Record.class)); + verify(producer, times(2)).send(any(), any()); + } }