NIFI-4639: Updated PublishKafka 1.0 processor to use a fresh writer for each output record as well. This closes #2292.

This commit is contained in:
Mark Payne 2017-12-08 09:13:52 -05:00
parent c9cc76b5c8
commit 113ad5ecfa
2 changed files with 49 additions and 3 deletions

View File

@ -148,13 +148,15 @@ public class PublisherLease implements Closeable {
Record record; Record record;
int recordCount = 0; int recordCount = 0;
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { try {
while ((record = recordSet.next()) != null) { while ((record = recordSet.next()) != null) {
recordCount++; recordCount++;
baos.reset(); baos.reset();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
writer.write(record); writer.write(record);
writer.flush(); writer.flush();
}
final byte[] messageContent = baos.toByteArray(); final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
@ -35,6 +37,16 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -187,4 +199,36 @@ public class TestPublisherLease {
verify(producer, times(1)).flush(); verify(producer, times(1)).flush();
} }
@Test
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
final FlowFile flowFile = new MockFlowFile(1L);
final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
final MockRecordParser readerService = new MockRecordParser();
readerService.addSchemaField("person_id", RecordFieldType.LONG);
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();
final String topic = "unit-test";
final String keyField = "person_id";
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
}
} }