mirror of https://github.com/apache/nifi.git
NIFI-4639: fresh writer for each output record
This commit is contained in:
parent
8e6649ba15
commit
c9cc76b5c8
|
@ -106,13 +106,15 @@ public class PublisherLease implements Closeable {
|
||||||
Record record;
|
Record record;
|
||||||
int recordCount = 0;
|
int recordCount = 0;
|
||||||
|
|
||||||
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
|
try {
|
||||||
while ((record = recordSet.next()) != null) {
|
while ((record = recordSet.next()) != null) {
|
||||||
recordCount++;
|
recordCount++;
|
||||||
baos.reset();
|
baos.reset();
|
||||||
|
|
||||||
|
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
|
||||||
writer.write(record);
|
writer.write(record);
|
||||||
writer.flush();
|
writer.flush();
|
||||||
|
}
|
||||||
|
|
||||||
final byte[] messageContent = baos.toByteArray();
|
final byte[] messageContent = baos.toByteArray();
|
||||||
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
|
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -29,6 +30,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.Callback;
|
import org.apache.kafka.clients.producer.Callback;
|
||||||
|
@ -36,6 +38,17 @@ import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
|
||||||
|
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
|
||||||
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriter;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSet;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -191,4 +204,37 @@ public class TestPublisherLease {
|
||||||
|
|
||||||
verify(producer, times(1)).flush();
|
verify(producer, times(1)).flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
|
||||||
|
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger);
|
||||||
|
|
||||||
|
final FlowFile flowFile = new MockFlowFile(1L);
|
||||||
|
final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
readerService.addSchemaField("person_id", RecordFieldType.LONG);
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
|
||||||
|
final RecordSet recordSet = reader.createRecordSet();
|
||||||
|
final RecordSchema schema = reader.getSchema();
|
||||||
|
|
||||||
|
final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");
|
||||||
|
|
||||||
|
final String topic = "unit-test";
|
||||||
|
final String keyField = "person_id";
|
||||||
|
|
||||||
|
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
|
||||||
|
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
|
||||||
|
|
||||||
|
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
|
||||||
|
|
||||||
|
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
|
||||||
|
|
||||||
|
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
|
||||||
|
verify(writer, times(2)).write(any(Record.class));
|
||||||
|
verify(producer, times(2)).send(any(), any());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,13 +148,15 @@ public class PublisherLease implements Closeable {
|
||||||
Record record;
|
Record record;
|
||||||
int recordCount = 0;
|
int recordCount = 0;
|
||||||
|
|
||||||
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
|
try {
|
||||||
while ((record = recordSet.next()) != null) {
|
while ((record = recordSet.next()) != null) {
|
||||||
recordCount++;
|
recordCount++;
|
||||||
baos.reset();
|
baos.reset();
|
||||||
|
|
||||||
|
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
|
||||||
writer.write(record);
|
writer.write(record);
|
||||||
writer.flush();
|
writer.flush();
|
||||||
|
}
|
||||||
|
|
||||||
final byte[] messageContent = baos.toByteArray();
|
final byte[] messageContent = baos.toByteArray();
|
||||||
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
|
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -28,6 +29,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.Callback;
|
import org.apache.kafka.clients.producer.Callback;
|
||||||
|
@ -35,6 +37,17 @@ import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
|
||||||
|
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
|
||||||
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriter;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSet;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -187,4 +200,37 @@ public class TestPublisherLease {
|
||||||
|
|
||||||
verify(producer, times(1)).flush();
|
verify(producer, times(1)).flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
|
||||||
|
final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final FlowFile flowFile = new MockFlowFile(1L);
|
||||||
|
final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
readerService.addSchemaField("person_id", RecordFieldType.LONG);
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
|
||||||
|
final RecordSet recordSet = reader.createRecordSet();
|
||||||
|
final RecordSchema schema = reader.getSchema();
|
||||||
|
|
||||||
|
final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");
|
||||||
|
|
||||||
|
final String topic = "unit-test";
|
||||||
|
final String keyField = "person_id";
|
||||||
|
|
||||||
|
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
|
||||||
|
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
|
||||||
|
|
||||||
|
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
|
||||||
|
|
||||||
|
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic);
|
||||||
|
|
||||||
|
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any());
|
||||||
|
verify(writer, times(2)).write(any(Record.class));
|
||||||
|
verify(producer, times(2)).send(any(), any());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue