mirror of https://github.com/apache/nifi.git
NIFI-3658: Created processor for converting records between data formats with like schemas
Signed-off-by: Matt Burgess <mattyb149@apache.org> NIFI-3658: Incorporated PR review feedback; added counter; clarified documentation Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #1668
This commit is contained in:
parent
7e7f9a5deb
commit
b93cf7bbdb
|
@ -0,0 +1,161 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.StreamCallback;
|
||||||
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriter;
|
||||||
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
|
import org.apache.nifi.serialization.RowRecordReaderFactory;
|
||||||
|
import org.apache.nifi.serialization.WriteResult;
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@SupportsBatching
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@SideEffectFree
|
||||||
|
@Tags({"convert", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
|
||||||
|
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
|
||||||
|
})
|
||||||
|
@CapabilityDescription("Converts records from one data format to another using configured Record Reader and Record Write Controller Services. "
|
||||||
|
+ "The Reader and Writer must be configured with \"matching\" schemas. By this, we mean the schemas must have the same field names. The types of the fields "
|
||||||
|
+ "do not have to be the same if a field value can be coerced from one type to another. For instance, if the input schema has a field named \"balance\" of type double, "
|
||||||
|
+ "the output schema can have a field named \"balance\" with a type of string, double, or float. If any field is present in the input that is not present in the output, "
|
||||||
|
+ "the field will be left out of the output. If any field is specified in the output schema but is not present in the input data/schema, then the field will not be "
|
||||||
|
+ "present in the output or will have a null value, depending on the writer.")
|
||||||
|
public class ConvertRecord extends AbstractProcessor {
|
||||||
|
|
||||||
|
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||||
|
.name("record-reader")
|
||||||
|
.displayName("Record Reader")
|
||||||
|
.description("Specifies the Controller Service to use for reading incoming data")
|
||||||
|
.identifiesControllerService(RowRecordReaderFactory.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||||
|
.name("record-writer")
|
||||||
|
.displayName("Record Writer")
|
||||||
|
.description("Specifies the Controller Service to use for writing out the records")
|
||||||
|
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("FlowFiles that are successfully transformed will be routed to this relationship")
|
||||||
|
.build();
|
||||||
|
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
|
.name("failure")
|
||||||
|
.description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
|
||||||
|
+ "the unchanged FlowFile will be routed to this relationship")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(RECORD_READER);
|
||||||
|
properties.add(RECORD_WRITER);
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final RowRecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RowRecordReaderFactory.class);
|
||||||
|
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||||
|
final RecordSetWriter writer = writerFactory.createWriter(getLogger());
|
||||||
|
|
||||||
|
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
final FlowFile original = flowFile;
|
||||||
|
try {
|
||||||
|
flowFile = session.write(flowFile, new StreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final InputStream in, final OutputStream out) throws IOException {
|
||||||
|
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
|
||||||
|
|
||||||
|
final WriteResult writeResult = writer.write(reader.createRecordSet(), out);
|
||||||
|
writeResultRef.set(writeResult);
|
||||||
|
|
||||||
|
} catch (final MalformedRecordException e) {
|
||||||
|
throw new ProcessException("Could not parse incoming data", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (final ProcessException e) {
|
||||||
|
getLogger().error("Failed to convert {}", new Object[] {flowFile, e});
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final WriteResult writeResult = writeResultRef.get();
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
|
||||||
|
attributes.putAll(writeResult.getAttributes());
|
||||||
|
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
session.adjustCounter("Records Converted", writeResult.getRecordCount(), false);
|
||||||
|
getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.CompressContent
|
||||||
org.apache.nifi.processors.standard.ControlRate
|
org.apache.nifi.processors.standard.ControlRate
|
||||||
org.apache.nifi.processors.standard.ConvertCharacterSet
|
org.apache.nifi.processors.standard.ConvertCharacterSet
|
||||||
org.apache.nifi.processors.standard.ConvertJSONToSQL
|
org.apache.nifi.processors.standard.ConvertJSONToSQL
|
||||||
|
org.apache.nifi.processors.standard.ConvertRecord
|
||||||
org.apache.nifi.processors.standard.DebugFlow
|
org.apache.nifi.processors.standard.DebugFlow
|
||||||
org.apache.nifi.processors.standard.DetectDuplicate
|
org.apache.nifi.processors.standard.DetectDuplicate
|
||||||
org.apache.nifi.processors.standard.DistributeLoad
|
org.apache.nifi.processors.standard.DistributeLoad
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.standard.util.record.MockRecordParser;
|
||||||
|
import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestConvertRecord {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSuccessfulConversion() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
runner.enqueue("");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
|
||||||
|
|
||||||
|
out.assertAttributeEquals("record.count", "3");
|
||||||
|
out.assertAttributeEquals("mime.type", "text/plain");
|
||||||
|
out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadFailure() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser(2);
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
final MockFlowFile original = runner.enqueue("hello");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
// Original FlowFile should be routed to 'failure' relationship without modification
|
||||||
|
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
|
||||||
|
assertTrue(original == out);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteFailure() throws InitializationException {
|
||||||
|
final MockRecordParser readerService = new MockRecordParser();
|
||||||
|
final MockRecordWriter writerService = new MockRecordWriter("header", false, 2);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
|
||||||
|
runner.addControllerService("reader", readerService);
|
||||||
|
runner.enableControllerService(readerService);
|
||||||
|
runner.addControllerService("writer", writerService);
|
||||||
|
runner.enableControllerService(writerService);
|
||||||
|
|
||||||
|
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
|
||||||
|
|
||||||
|
readerService.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
readerService.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
|
||||||
|
readerService.addRecord("John Doe", 48);
|
||||||
|
readerService.addRecord("Jane Doe", 47);
|
||||||
|
readerService.addRecord("Jimmy Doe", 14);
|
||||||
|
|
||||||
|
final MockFlowFile original = runner.enqueue("hello");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
// Original FlowFile should be routed to 'failure' relationship without modification
|
||||||
|
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
|
||||||
|
assertTrue(original == out);
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,9 +31,21 @@ import org.apache.nifi.serialization.record.RecordSet;
|
||||||
|
|
||||||
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
|
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
|
||||||
private final String header;
|
private final String header;
|
||||||
|
private final int failAfterN;
|
||||||
|
private final boolean quoteValues;
|
||||||
|
|
||||||
public MockRecordWriter(final String header) {
|
public MockRecordWriter(final String header) {
|
||||||
|
this(header, true, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockRecordWriter(final String header, final boolean quoteValues) {
|
||||||
|
this(header, quoteValues, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
|
||||||
this.header = header;
|
this.header = header;
|
||||||
|
this.quoteValues = quoteValues;
|
||||||
|
this.failAfterN = failAfterN;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -48,13 +60,20 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
||||||
final int numCols = rs.getSchema().getFieldCount();
|
final int numCols = rs.getSchema().getFieldCount();
|
||||||
Record record = null;
|
Record record = null;
|
||||||
while ((record = rs.next()) != null) {
|
while ((record = rs.next()) != null) {
|
||||||
recordCount++;
|
if (++recordCount > failAfterN && failAfterN > -1) {
|
||||||
|
throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
|
||||||
|
}
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (final String fieldName : record.getSchema().getFieldNames()) {
|
for (final String fieldName : record.getSchema().getFieldNames()) {
|
||||||
final String val = record.getAsString(fieldName);
|
final String val = record.getAsString(fieldName);
|
||||||
|
if (quoteValues) {
|
||||||
out.write("\"".getBytes());
|
out.write("\"".getBytes());
|
||||||
out.write(val.getBytes());
|
out.write(val.getBytes());
|
||||||
out.write("\"".getBytes());
|
out.write("\"".getBytes());
|
||||||
|
} else {
|
||||||
|
out.write(val.getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
if (i++ < numCols - 1) {
|
if (i++ < numCols - 1) {
|
||||||
out.write(",".getBytes());
|
out.write(",".getBytes());
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -51,4 +52,29 @@ public interface RecordReader extends Closeable {
|
||||||
* @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse the underlying data
|
* @throws MalformedRecordException if an unrecoverable failure occurs when trying to parse the underlying data
|
||||||
*/
|
*/
|
||||||
RecordSchema getSchema() throws MalformedRecordException;
|
RecordSchema getSchema() throws MalformedRecordException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a RecordSet that returns the records in this Record Reader in a streaming fashion
|
||||||
|
*/
|
||||||
|
default RecordSet createRecordSet() {
|
||||||
|
return new RecordSet() {
|
||||||
|
@Override
|
||||||
|
public RecordSchema getSchema() throws IOException {
|
||||||
|
try {
|
||||||
|
return RecordReader.this.getSchema();
|
||||||
|
} catch (final MalformedRecordException mre) {
|
||||||
|
throw new IOException(mre);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Record next() throws IOException {
|
||||||
|
try {
|
||||||
|
return RecordReader.this.nextRecord();
|
||||||
|
} catch (final MalformedRecordException mre) {
|
||||||
|
throw new IOException(mre);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue