NIFI-4004: Use RecordReaderFactory without FlowFile.

- Removed FlowFile from RecordReaderFactory, RecordSetWriterFactory and SchemaAccessStrategy.
- Renamed variable 'allowableValue' to 'strategy' to represent its meaning better.
- Removed creation of temporal FlowFile to resolve Record Schema from ConsumerLease.

- Removed unnecessary 'InputStream content' argument from
  RecordSetWriterFactory.getSchema method.

This closes #1877.
This commit is contained in:
Koji Kawamura 2017-06-01 21:50:17 +09:00 committed by Mark Payne
parent 20d23e836e
commit 1f67cbf628
48 changed files with 211 additions and 206 deletions

View File

@ -20,13 +20,13 @@ package org.apache.nifi.schema.access;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.serialization.record.RecordSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
@ -40,13 +40,14 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaText;
schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(variables).getValue();
if (schemaText == null || schemaText.trim().isEmpty()) {
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text");
}
logger.debug("For {} found schema text {}", flowFile, schemaText);
logger.debug("For {} found schema text {}", variables, schemaText);
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);

View File

@ -190,16 +190,16 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
// use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
child = session.create(originalFlowFile);
final FlowFile writableFlowFile = child;
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
child = session.write(child, (final OutputStream rawOut) -> {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
Record record = recordReader.nextRecord();
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, record == null ? null : record.getSchema());
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile.getAttributes(),
record == null ? null : record.getSchema());
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, out)) {
recordSetWriter.beginRecordSet();
if (record != null) {
recordSetWriter.write(record);

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@ -52,7 +51,7 @@ public class CommaSeparatedRecordReader extends AbstractControllerService implem
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final List<RecordField> fields = new ArrayList<>();

View File

@ -26,18 +26,12 @@ import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
@ -65,7 +59,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
final Iterator<Object[]> itr = records.iterator();
return new RecordReader() {

View File

@ -20,9 +20,9 @@ package org.apache.nifi.serialization.record;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
@ -54,12 +54,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema schema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return new SimpleRecordSchema(Collections.emptyList());
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
return new RecordSetWriter() {
private int recordCount = 0;
private boolean headerWritten = false;

View File

@ -22,9 +22,9 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.StreamUtils;
@ -43,7 +43,7 @@ public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final byte[] buffer = new byte[5];
try {
StreamUtils.fillBuffer(contentStream, buffer);

View File

@ -17,7 +17,6 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
@ -25,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
@ -46,34 +46,38 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
}
public boolean isFlowFileRequired() {
return true;
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) {
throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because it is missing one of the following three required attributes: "
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
+ SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
}
if (!isNumber(schemaProtocol)) {
throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Protocol Version number");
}
final int protocol = Integer.parseInt(schemaProtocol);
if (protocol != 1) {
throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1.");
}
if (!isNumber(schemaIdentifier)) {
throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Identifier number");
}
if (!isNumber(schemaVersion)) {
throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Version number");
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.StreamUtils;
@ -27,6 +26,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
@ -45,7 +45,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final byte[] buffer = new byte[13];
try {
StreamUtils.fillBuffer(contentStream, buffer);

View File

@ -20,15 +20,15 @@ package org.apache.nifi.schema.access;
import java.io.IOException;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.serialization.record.RecordSchema;
public class InheritSchemaFromRecord implements SchemaAccessStrategy {
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
if (readSchema == null) {
throw new SchemaNotFoundException("Cannot inherit Schema from Record because no schema was found");
}

View File

@ -17,26 +17,28 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;
public interface SchemaAccessStrategy {
/**
* Returns the schema for the given FlowFile using the supplied stream of content and configuration
*
* @param flowFile flowfile
* @param contentStream content of flowfile
* @param readSchema the schema that was read from the input FlowFile, or <code>null</code> if there was none
* @return the RecordSchema for the FlowFile
*/
RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/**
* @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}.
* Returns the schema for the given FlowFile using the supplied stream of content and configuration.
*
* @param variables Variables which is used to resolve Record Schema via Expression Language.
* This can be null or empty.
* @param contentStream The stream which is used to read the serialized content.
* @param readSchema The schema that was read from the input content, or <code>null</code> if there was none.
* @return The RecordSchema for the content.
*/
RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/**
* @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(Map, InputStream, RecordSchema)}.
*/
Set<SchemaField> getSuppliedSchemaFields();
}

View File

@ -18,13 +18,13 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
@ -43,10 +43,10 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(variables).getValue();
if (schemaName.trim().isEmpty()) {
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name.");
throw new SchemaNotFoundException(String.format("%s did not provide appropriate Schema Name", schemaNamePropertyValue));
}
try {

View File

@ -29,6 +29,7 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -456,18 +457,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
// We don't want to create a new FlowFile for each record that we receive, so we will just create
// a "temporary flowfile" that will be removed in the finally block below and use that to pass to
// the createRecordReader method.
final FlowFile tempFlowFile = session.create();
RecordSetWriter writer = null;
try {
for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
final Record record;
try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
final RecordReader reader = readerFactory.createRecordReader(Collections.EMPTY_MAP, in, logger);
record = reader.nextRecord();
} catch (final Exception e) {
handleParseFailure(consumerRecord, session, e);
@ -490,7 +486,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile, recordSchema);
writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
@ -504,7 +500,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
@ -544,8 +540,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
throw new ProcessException(e);
} finally {
session.remove(tempFlowFile);
}
}

View File

@ -325,16 +325,17 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final Map<String, String> attributes = flowFile.getAttributes();
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowFile, recordSet.getSchema());
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);

View File

@ -106,7 +106,7 @@ public class PublisherLease implements Closeable {
Record record;
int recordCount = 0;
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
while ((record = recordSet.next()) != null) {
recordCount++;
baos.reset();

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@ -64,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
return new RecordReader() {

View File

@ -20,9 +20,9 @@ package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
@ -52,12 +52,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
public RecordSchema getSchema(FlowFile flowFile, RecordSchema schema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return null;
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
return new RecordSetWriter() {
@Override

View File

@ -220,7 +220,7 @@ public class FetchParquetTest {
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(FlowFile.class), any(OutputStream.class))).thenReturn(recordSetWriter);
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(OutputStream.class))).thenReturn(recordSetWriter);
testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
testRunner.enableControllerService(recordSetWriterFactory);

View File

@ -22,7 +22,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -37,6 +36,7 @@ import java.io.InputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
/**
* A RecordReader implementation that allows the user to script the RecordReader instance
@ -52,10 +52,10 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
}
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createRecordReader(flowFile, in, logger);
return recordFactory.get().createRecordReader(variables, in, logger);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}

View File

@ -21,6 +21,7 @@ import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import javax.script.Invocable;
import javax.script.ScriptException;
@ -31,7 +32,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -55,10 +55,10 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createWriter(logger, schema, flowFile, out);
return recordFactory.get().createWriter(logger, schema, out);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}
@ -149,14 +149,14 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
}
@Override
public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final RecordSetWriterFactory writerFactory = recordFactory.get();
if (writerFactory == null) {
return null;
}
try {
return writerFactory.getSchema(flowFile, readSchema);
return writerFactory.getSchema(variables, readSchema);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}

View File

@ -27,7 +27,6 @@ import org.apache.nifi.script.ScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.MockPropertyValue
import org.apache.nifi.util.TestRunners
import org.junit.Before
@ -98,10 +97,9 @@ class ScriptedReaderTest {
recordReaderFactory.initialize initContext
recordReaderFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger)
RecordReader recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, logger)
assertNotNull(recordReader)
3.times {
@ -157,8 +155,7 @@ class ScriptedReaderTest {
recordReaderFactory.initialize initContext
recordReaderFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
mockFlowFile.putAttributes(['record.tag': 'myRecord'])
Map<String, String> schemaVariables = ['record.tag': 'myRecord']
InputStream inStream = new ByteArrayInputStream('''
<root>
@ -180,7 +177,7 @@ class ScriptedReaderTest {
</root>
'''.bytes)
RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger)
RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, logger)
assertNotNull(recordReader)
3.times {

View File

@ -30,7 +30,6 @@ import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.MockPropertyValue
import org.apache.nifi.util.TestRunners
import org.junit.Before
@ -100,11 +99,10 @@ class ScriptedRecordSetWriterTest {
recordSetWriterFactory.initialize initContext
recordSetWriterFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
def schema = recordSetWriterFactory.getSchema(mockFlowFile, null)
def schema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null)
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream)
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, outputStream)
assertNotNull(recordSetWriter)
def recordSchema = new SimpleRecordSchema(

View File

@ -16,7 +16,6 @@
*/
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
@ -58,7 +57,7 @@ class GroovyRecordReader implements RecordReader {
class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyRecordReader()
}
}

View File

@ -18,7 +18,6 @@
import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
@ -68,7 +67,7 @@ class GroovyXmlRecordReaderFactory extends AbstractControllerService implements
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue()
if (!schemaText) return null
@ -77,7 +76,7 @@ class GroovyXmlRecordReaderFactory extends AbstractControllerService implements
def entry = field.entrySet()[0]
new RecordField(entry.key, RecordFieldType.of(entry.value).dataType)
} as List<RecordField>)
return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream)
return new GroovyXmlRecordReader(variables.get('record.tag'), recordSchema, inputStream)
}
}

View File

@ -17,9 +17,6 @@
import groovy.xml.MarkupBuilder
import java.io.IOException
import java.io.InputStream
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
@ -102,12 +99,12 @@ class GroovyRecordSetWriter implements RecordSetWriter {
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@Override
public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return null;
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return null
}
@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter(out)
}

View File

@ -104,15 +104,16 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
final AtomicInteger recordCount = new AtomicInteger();
final FlowFile original = flowFile;
final Map<String, String> originalAttributes = flowFile.getAttributes();
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
writer.beginRecordSet();
Record record;

View File

@ -116,13 +116,14 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
final AtomicInteger numRecords = new AtomicInteger(0);
final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
final FlowFile original = flowFile;
final Map<String, String> originalAttributes = original.getAttributes();
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema());
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
Record record;
while ((record = reader.nextRecord()) != null) {
@ -135,7 +136,7 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
if (tuple == null) {
FlowFile outFlowFile = session.create(original);
final OutputStream out = session.write(outFlowFile);
recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, original, out);
recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out);
recordSetWriter.beginRecordSet();
tuple = new Tuple<>(outFlowFile, recordSetWriter);

View File

@ -376,9 +376,9 @@ public class ListenTCPRecord extends AbstractProcessor {
String mimeType = null;
WriteResult writeResult = null;
final RecordSchema recordSchema = recordSetWriterFactory.getSchema(flowFile, record.getSchema());
final RecordSchema recordSchema = recordSetWriterFactory.getSchema(Collections.EMPTY_MAP, record.getSchema());
try (final OutputStream out = session.write(flowFile);
final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, flowFile, out)) {
final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) {
// start the record set and write the first record from above
recordWriter.beginRecordSet();

View File

@ -191,9 +191,10 @@ public class PartitionRecord extends AbstractProcessor {
final Map<RecordValueMap, RecordSetWriter> writerMap = new HashMap<>();
try (final InputStream in = session.read(flowFile)) {
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger());
final RecordSchema writeSchema = writerFactory.getSchema(flowFile, reader.getSchema());
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
Record record;
while ((record = reader.nextRecord()) != null) {
@ -221,7 +222,7 @@ public class PartitionRecord extends AbstractProcessor {
final OutputStream out = session.write(childFlowFile);
writer = writerFactory.createWriter(getLogger(), writeSchema, childFlowFile, out);
writer = writerFactory.createWriter(getLogger(), writeSchema, out);
writer.beginRecordSet();
writerMap.put(recordValueMap, writer);
}

View File

@ -251,10 +251,11 @@ public class QueryRecord extends AbstractProcessor {
// Determine the schema for writing the data
final RecordSchema recordSchema;
try (final InputStream rawIn = session.read(original)) {
final RecordReader reader = recordReaderFactory.createRecordReader(original, rawIn, getLogger());
final Map<String, String> originalAttributes = original.getAttributes();
final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
final RecordSchema inputSchema = reader.getSchema();
recordSchema = recordSetWriterFactory.getSchema(original, inputSchema);
recordSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
@ -294,7 +295,7 @@ public class QueryRecord extends AbstractProcessor {
transformed = session.write(transformed, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) {
final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs);
writeResultRef.set(resultSetWriter.write(resultSet));
mimeTypeRef.set(resultSetWriter.getMimeType());

View File

@ -135,13 +135,14 @@ public class SplitRecord extends AbstractProcessor {
final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
final List<FlowFile> splits = new ArrayList<>();
final Map<String, String> originalAttributes = original.getAttributes();
try {
session.read(original, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
final RecordSchema schema = writerFactory.getSchema(original, reader.getSchema());
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
@ -154,7 +155,7 @@ public class SplitRecord extends AbstractProcessor {
final WriteResult writeResult;
try (final OutputStream out = session.write(split);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, split, out)) {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) {
if (maxRecords == 1) {
final Record record = pushbackSet.next();
writeResult = writer.write(record);

View File

@ -419,7 +419,7 @@ public class ValidateRecord extends AbstractProcessor {
}
final OutputStream out = session.write(flowFile);
final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, flowFile, out);
final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, out);
created.beginRecordSet();
return created;
}

View File

@ -135,7 +135,7 @@ public class RecordBin {
this.out = new ByteCountingOutputStream(rawOut);
recordWriter = writerFactory.createWriter(logger, record.getSchema(), flowFile, out);
recordWriter = writerFactory.createWriter(logger, record.getSchema(), out);
recordWriter.beginRecordSet();
}

View File

@ -22,10 +22,10 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -260,7 +260,7 @@ public class TestQueryRecord {
}
@Override
public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final List<RecordField> recordFields = columnNames.stream()
.map(name -> new RecordField(name, RecordFieldType.STRING.getDataType()))
.collect(Collectors.toList());
@ -268,7 +268,7 @@ public class TestQueryRecord {
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
return new RecordSetWriter() {
@Override

View File

@ -17,14 +17,16 @@
package org.apache.nifi.serialization;
import java.io.IOException;
import java.io.InputStream;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
/**
* <p>
* A Controller Service that is responsible for creating a {@link RecordReader}.
@ -32,6 +34,26 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
*/
public interface RecordReaderFactory extends ControllerService {
RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
/**
* Create a RecordReader instance to read records from specified InputStream.
* This method calls {@link #createRecordReader(Map, InputStream, ComponentLog)} with Attributes of the specified FlowFile.
* @param flowFile Attributes of this FlowFile are used to resolve Record Schema via Expression Language dynamically. This can be null.
* @param in InputStream containing Records. This can be null or empty stream.
* @param logger A logger bind to a component
* @return Created RecordReader instance
*/
default RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return createRecordReader(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(), in, logger);
}
/**
* Create a RecordReader instance to read records from specified InputStream.
* @param variables A map contains variables which is used to resolve Record Schema via Expression Language dynamically.
* This can be null or empty.
* @param in InputStream containing Records. This can be null or empty stream.
* @param logger A logger bind to a component
* @return Created RecordReader instance
*/
RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
}

View File

@ -18,24 +18,28 @@
package org.apache.nifi.serialization;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
/**
* <p>
* A Controller Service that is responsible for creating a {@link RecordSetWriter}. The writer is created
* based on a FlowFile and an InputStream for that FlowFile, but it is important to note that this the FlowFile passed
* to the {@link #createWriter(ComponentLog, FlowFile, InputStream)} may not be the FlowFile that the Writer will writer to.
* Rather, it is the FlowFile and InputStream from which the Writer's Schema should be determined. This is done because most
* Processors that make use of Record Writers also make use of Record Readers and the schema for the output is often determined
* by either reading the schema from the content of the input FlowFile or from referencing attributes of the
* input FlowFile.
* A Controller Service that is responsible for creating a {@link RecordSetWriter}.
* </p>
* <p>A writer is created with a RecordSchema and an OutputStream that the writer will write to.</p>
* <p>
* The schema can be retrieved by {@link #getSchema(Map, RecordSchema)} method, based on a Map containing variables,
* and a RecordSchema which is read from the incoming FlowFile.
* </p>
* <p>
* For most processors those make use of Record Writers also make use of Record Readers, and the schema for the output is often determined
* by either reading the schema from referencing attributes or the content of the input FlowFile.
* In this case, if a RecordSchema is known and already available when calling {@link #getSchema(Map, RecordSchema)} method,
* the schema should be specified so that it can be reused.
* </p>
*
* <p>
@ -47,18 +51,17 @@ public interface RecordSetWriterFactory extends ControllerService {
/**
* <p>
* Returns the Schema that will be used for writing Records. Note that the FlowFile and InputStream that are given
* may well be different than the FlowFile that the writer will write to. The given FlowFile and InputStream are
* Returns the Schema that will be used for writing Records. The given variables are
* intended to be used for determining the schema that should be used when writing records.
* </p>
*
* @param flowFile the FlowFile from which the schema should be determined.
* @param variables the variables which is used to resolve Record Schema via Expression Language, can be null or empty
* @param readSchema the schema that was read from the incoming FlowFile, or <code>null</code> if there is no input schema
*
* @return the Schema that should be used for writing Records
* @throws SchemaNotFoundException if unable to find the schema
*/
RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/**
* <p>
@ -68,11 +71,10 @@ public interface RecordSetWriterFactory extends ControllerService {
* @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
* because it allows messages to be logged for the component that is calling this Controller Service.
* @param schema the schema that will be used for writing records
* @param flowFile the FlowFile to write to
* @param out the OutputStream to write to
*
* @return a RecordSetWriter that can write record sets to an OutputStream
* @throws IOException if unable to read from the given InputStream
*/
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException;
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException;
}

View File

@ -31,7 +31,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -66,11 +65,11 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) {
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry schemaRegistry, ConfigurationContext context) {
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(strategy)) {
return new EmbeddedAvroSchemaAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
}
}
@ -84,12 +83,12 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue();
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
return new AvroReaderWithEmbeddedSchema(in);
} else {
final RecordSchema recordSchema = getSchema(flowFile, in, null);
final RecordSchema recordSchema = getSchema(variables, in, null);
final Schema avroSchema;
try {

View File

@ -36,7 +36,6 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaField;
@ -80,7 +79,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
"The FlowFile will have the Avro schema embedded into the content, as is typical with Avro");
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) throws IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException {
final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue();

View File

@ -20,13 +20,13 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>());
final Schema avroSchema = dataFileStream.getSchema();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);

View File

@ -23,6 +23,7 @@ import java.io.Reader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.csv.CSVFormat;
@ -30,7 +31,6 @@ import org.apache.commons.csv.CSVParser;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
if (this.context == null) {
throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -31,7 +32,6 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
@ -99,23 +99,23 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
// Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header.
final BufferedInputStream bufferedIn = new BufferedInputStream(in);
bufferedIn.mark(1024 * 1024);
final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null);
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null);
bufferedIn.reset();
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat);
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (strategy.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
return new CSVHeaderSchemaStrategy(context);
}
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
}
@Override

View File

@ -28,7 +28,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@ -69,7 +68,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader);
}

View File

@ -35,7 +35,6 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
@ -203,20 +202,20 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
return createAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
}
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) {
if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
return createAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
}
}
@ -224,8 +223,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
return new SchemaAccessStrategy() {
private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException {
return recordSchema;
}
@ -237,8 +237,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(flowFile, in, null);
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
}
}

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -34,7 +35,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeUtils;
@ -128,8 +128,8 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(flowFile, in, null);
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat);
}

View File

@ -27,7 +27,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@ -64,7 +63,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@ -28,7 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeUtils;
@ -69,7 +69,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, null), dateFormat, timeFormat, timestampFormat);
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, getSchema(variables, in, null), dateFormat, timeFormat, timestampFormat);
}
}

View File

@ -147,22 +147,22 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return schemaAccessStrategyList;
}
protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
if (allowableValue == null) {
protected SchemaAccessWriter getSchemaWriteStrategy(final String strategy) {
if (strategy == null) {
return null;
}
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
return new SchemaNameAsAttribute();
} else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
} else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
return new WriteAvroSchemaAttributeStrategy();
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
} else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
return new HortonworksEncodedSchemaReferenceWriter();
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
} else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new HortonworksAttributeSchemaReferenceWriter();
} else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
} else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
return new ConfluentSchemaRegistryWriter();
} else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
} else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
return new NopSchemaAccessWriter();
}

View File

@ -24,7 +24,6 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaField;
@ -41,6 +40,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
@ -57,7 +57,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
private volatile ConfigurationContext configurationContext;
private volatile SchemaAccessStrategy schemaAccessStrategy;
private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
@ -108,22 +108,17 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
return schemaAccessStrategy;
}
public final RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
public final RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
if (accessStrategy == null) {
throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
}
return getSchemaAccessStrategy().getSchema(flowFile, contentStream, readSchema);
return getSchemaAccessStrategy().getSchema(variables, contentStream, readSchema);
}
public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
if (accessStrategy == null) {
throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
}
return getSchemaAccessStrategy().getSchema(flowFile, EMPTY_INPUT_STREAM, readSchema);
public RecordSchema getSchema(final Map<String, String> variables, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return getSchema(variables, EMPTY_INPUT_STREAM, readSchema);
}
@Override
@ -148,12 +143,12 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
return suppliedFields;
}
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue == null) {
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (strategy == null) {
return null;
}
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
return SchemaAccessUtils.getSchemaAccessStrategy(strategy, schemaRegistry, context);
}
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {

View File

@ -17,7 +17,6 @@
package org.apache.nifi.text;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
@ -29,10 +28,11 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.access.InheritSchemaFromRecord;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
@ -77,12 +77,13 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
return new FreeFormTextWriter(textValue, characterSet, out);
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return readSchema;
protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry schemaRegistry, ConfigurationContext context) {
return new InheritSchemaFromRecord();
}
}