NIFI-3921: Allow Record Writers to inherit schema from Record

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1902
This commit is contained in:
Mark Payne 2017-06-07 13:42:13 -04:00 committed by Matt Burgess
parent 4ed7511bee
commit e7dcb6f6c5
45 changed files with 573 additions and 370 deletions

View File

@ -17,33 +17,6 @@
package org.apache.nifi.avro; package org.apache.nifi.avro;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.apache.avro.JsonProperties;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.MathContext; import java.math.MathContext;
@ -61,6 +34,36 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AvroTypeUtil { public class AvroTypeUtil {
private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
public static final String AVRO_SCHEMA_FORMAT = "avro"; public static final String AVRO_SCHEMA_FORMAT = "avro";
@ -72,30 +75,142 @@ public class AvroTypeUtil {
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOGICAL_TYPE_DECIMAL = "decimal"; private static final String LOGICAL_TYPE_DECIMAL = "decimal";
public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { public static Schema extractAvroSchema(final RecordSchema recordSchema) {
if (recordSchema == null) { if (recordSchema == null) {
throw new IllegalArgumentException("RecordSchema cannot be null"); throw new IllegalArgumentException("RecordSchema cannot be null");
} }
final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat(); final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
if (!schemaFormatOption.isPresent()) { if (!schemaFormatOption.isPresent()) {
throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema"); return buildAvroSchema(recordSchema);
} }
final String schemaFormat = schemaFormatOption.get(); final String schemaFormat = schemaFormatOption.get();
if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) { if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
throw new SchemaNotFoundException("Schema provided is not in Avro format"); return buildAvroSchema(recordSchema);
} }
final Optional<String> textOption = recordSchema.getSchemaText(); final Optional<String> textOption = recordSchema.getSchemaText();
if (!textOption.isPresent()) { if (!textOption.isPresent()) {
throw new SchemaNotFoundException("No Schema text was present in the RecordSchema"); return buildAvroSchema(recordSchema);
} }
final String text = textOption.get(); final String text = textOption.get();
return new Schema.Parser().parse(text); return new Schema.Parser().parse(text);
} }
private static Schema buildAvroSchema(final RecordSchema recordSchema) {
final List<Field> avroFields = new ArrayList<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
avroFields.add(buildAvroField(recordField));
}
final Schema avroSchema = Schema.createRecord("nifiRecord", null, "org.apache.nifi", false, avroFields);
return avroSchema;
}
private static Field buildAvroField(final RecordField recordField) {
final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName());
final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
for (final String alias : recordField.getAliases()) {
field.addAlias(alias);
}
return field;
}
private static Schema buildAvroSchema(final DataType dataType, final String fieldName) {
final Schema schema;
switch (dataType.getFieldType()) {
case ARRAY:
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementDataType = arrayDataType.getElementType();
if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) {
schema = Schema.create(Type.BYTES);
} else {
final Schema elementType = buildAvroSchema(elementDataType, fieldName);
schema = Schema.createArray(elementType);
}
break;
case BIGINT:
schema = Schema.create(Type.STRING);
break;
case BOOLEAN:
schema = Schema.create(Type.BOOLEAN);
break;
case BYTE:
schema = Schema.create(Type.INT);
break;
case CHAR:
schema = Schema.create(Type.STRING);
break;
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> options = choiceDataType.getPossibleSubTypes();
final List<Schema> unionTypes = new ArrayList<>(options.size());
for (final DataType option : options) {
unionTypes.add(buildAvroSchema(option, fieldName));
}
schema = Schema.createUnion(unionTypes);
break;
case DATE:
schema = Schema.create(Type.INT);
LogicalTypes.date().addToSchema(schema);
break;
case DOUBLE:
schema = Schema.create(Type.DOUBLE);
break;
case FLOAT:
schema = Schema.create(Type.FLOAT);
break;
case INT:
schema = Schema.create(Type.INT);
break;
case LONG:
schema = Schema.create(Type.LONG);
break;
case MAP:
schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName));
break;
case RECORD:
final RecordDataType recordDataType = (RecordDataType) dataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
final List<Field> childFields = new ArrayList<>(childSchema.getFieldCount());
for (final RecordField field : childSchema.getFields()) {
childFields.add(buildAvroField(field));
}
schema = Schema.createRecord(fieldName + "Type", null, "org.apache.nifi", false, childFields);
break;
case SHORT:
schema = Schema.create(Type.INT);
break;
case STRING:
schema = Schema.create(Type.STRING);
break;
case TIME:
schema = Schema.create(Type.INT);
LogicalTypes.timeMillis().addToSchema(schema);
break;
case TIMESTAMP:
schema = Schema.create(Type.LONG);
LogicalTypes.timestampMillis().addToSchema(schema);
break;
default:
return null;
}
return nullable(schema);
}
private static Schema nullable(final Schema schema) {
return Schema.createUnion(Schema.create(Type.NULL), schema);
}
/** /**
* Returns a DataType for the given Avro Schema * Returns a DataType for the given Avro Schema
* *

View File

@ -40,7 +40,7 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue(); final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
if (schemaText == null || schemaText.trim().isEmpty()) { if (schemaText == null || schemaText.trim().isEmpty()) {
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text"); throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text");

View File

@ -43,6 +43,10 @@ public class SchemaAccessUtils {
+ "found at https://github.com/hortonworks/registry"); + "found at https://github.com/hortonworks/registry");
public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes", public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
"The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'"); "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema",
"The schema used to write records will be the same schema that was given to the Record when the Record was created.");
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry") .name("schema-registry")
@ -117,6 +121,8 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
@ -131,6 +137,8 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
@ -145,6 +153,8 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {

View File

@ -17,44 +17,36 @@
package org.apache.nifi.schema.access; package org.apache.nifi.schema.access;
import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
public class SchemaTextAsAttribute implements SchemaAccessWriter { public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
@Override @Override
public void writeHeader(final RecordSchema schema, final OutputStream out) { public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
} }
@Override @Override
public Map<String, String> getAttributes(final RecordSchema schema) { public Map<String, String> getAttributes(final RecordSchema schema) {
final Optional<String> textFormatOption = schema.getSchemaFormat(); final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
final Optional<String> textOption = schema.getSchemaText(); final String schemaText = avroSchema.toString();
return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get()); return Collections.singletonMap("avro.schema", schemaText);
} }
@Override @Override
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
final Optional<String> textFormatOption = schema.getSchemaFormat();
if (!textFormatOption.isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present");
}
final Optional<String> textOption = schema.getSchemaText();
if (!textOption.isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present");
}
} }
@Override @Override
public Set<SchemaField> getRequiredSchemaFields() { public Set<SchemaField> getRequiredSchemaFields() {
return schemaFields; return EnumSet.noneOf(SchemaField.class);
} }
} }

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.avro;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Test;
public class TestAvroTypeUtil {
@Test
public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType(), "hola", Collections.singleton("greeting")));
fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType()));
fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType()));
fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType()));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
fields.add(new RecordField("strings", arrayType));
final DataType mapType = RecordFieldType.MAP.getMapDataType(RecordFieldType.LONG.getDataType());
fields.add(new RecordField("map", mapType));
final List<RecordField> personFields = new ArrayList<>();
personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("dob", RecordFieldType.DATE.getDataType()));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
final DataType personType = RecordFieldType.RECORD.getRecordDataType(personSchema);
fields.add(new RecordField("person", personType));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
// everything should be a union, since it's nullable.
for (final Field field : avroSchema.getFields()) {
final Schema fieldSchema = field.schema();
assertEquals(Type.UNION, fieldSchema.getType());
assertTrue("Field " + field.name() + " does not contain NULL type", fieldSchema.getTypes().contains(Schema.create(Type.NULL)));
}
final RecordSchema afterConversion = AvroTypeUtil.createSchema(avroSchema);
assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("int").get());
assertEquals(RecordFieldType.LONG.getDataType(), afterConversion.getDataType("long").get());
assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("string").get());
assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get());
assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get());
assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get());
assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get());
assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get());
assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get());
assertEquals(RecordFieldType.DATE.getDataType(), afterConversion.getDataType("date").get());
assertEquals(RecordFieldType.TIMESTAMP.getDataType(), afterConversion.getDataType("timestamp").get());
assertEquals(arrayType, afterConversion.getDataType("strings").get());
assertEquals(mapType, afterConversion.getDataType("map").get());
assertEquals(personType, afterConversion.getDataType("person").get());
final RecordField stringField = afterConversion.getField("string").get();
assertEquals("hola", stringField.getDefaultValue());
assertEquals(Collections.singleton("greeting"), stringField.getAliases());
}
}

View File

@ -16,7 +16,22 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.commons.io.input.NullInputStream; import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -37,29 +52,11 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* Base processor for reading a data from HDFS that can be fetched into records. * Base processor for reading a data from HDFS that can be fetched into records.
*/ */
@ -187,7 +184,6 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final AtomicReference<WriteResult> writeResult = new AtomicReference<>(); final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
@ -200,22 +196,20 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) { final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList()); Record record = recordReader.nextRecord();
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, record == null ? null : record.getSchema());
final RecordSet recordSet = new RecordSet() {
@Override
public RecordSchema getSchema() throws IOException {
return emptySchema;
}
@Override
public Record next() throws IOException {
return recordReader.nextRecord();
}
};
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) { try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
writeResult.set(recordSetWriter.write(recordSet)); recordSetWriter.beginRecordSet();
if (record != null) {
recordSetWriter.write(record);
}
while ((record = recordReader.nextRecord()) != null) {
recordSetWriter.write(record);
}
writeResult.set(recordSetWriter.finishRecordSet());
mimeTypeRef.set(recordSetWriter.getMimeType()); mimeTypeRef.set(recordSetWriter.getMimeType());
} }
} catch (Exception e) { } catch (Exception e) {
@ -247,7 +241,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
} catch (final FileNotFoundException | AccessControlException e) { } catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e}); getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e});
final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage()); final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage());
session.transfer(failureFlowFile, REL_FAILURE); session.transfer(failureFlowFile, REL_FAILURE);
} catch (final IOException | FlowFileAccessException e) { } catch (final IOException | FlowFileAccessException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e}); getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e});
@ -255,7 +249,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
context.yield(); context.yield();
} catch (final Throwable t) { } catch (final Throwable t) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t}); getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t});
final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage()); final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage() == null ? t.toString() : t.getMessage());
session.transfer(failureFlowFile, REL_FAILURE); session.transfer(failureFlowFile, REL_FAILURE);
} }

View File

@ -16,6 +16,21 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -28,8 +43,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -42,10 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.exception.FailureException; import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.WriteResult;
@ -53,32 +63,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
/** /**
* Base class for processors that write Records to HDFS. * Base class for processors that write Records to HDFS.
*/ */
@ -156,18 +140,10 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
private volatile String remoteOwner; private volatile String remoteOwner;
private volatile String remoteGroup; private volatile String remoteGroup;
private volatile SchemaAccessStrategy schemaAccessStrategy;
private volatile Set<Relationship> putHdfsRecordRelationships; private volatile Set<Relationship> putHdfsRecordRelationships;
private volatile List<PropertyDescriptor> putHdfsRecordProperties; private volatile List<PropertyDescriptor> putHdfsRecordProperties;
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY,
SCHEMA_TEXT_PROPERTY,
HWX_SCHEMA_REF_ATTRIBUTES,
HWX_CONTENT_ENCODED_SCHEMA
));
@Override @Override
protected final void init(final ProcessorInitializationContext context) { protected final void init(final ProcessorInitializationContext context) {
@ -187,19 +163,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
.description("The parent directory to which files should be written. Will be created if it doesn't exist.") .description("The parent directory to which files should be written. Will be created if it doesn't exist.")
.build()); .build());
final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
props.add(new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
.description("Specifies how to obtain the schema that is to be used for writing the data.")
.allowableValues(strategies)
.defaultValue(getDefaultSchemaAccessStrategy().getValue())
.build());
props.add(SCHEMA_REGISTRY);
props.add(SCHEMA_NAME);
props.add(SCHEMA_TEXT);
final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]); final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
props.add(new PropertyDescriptor.Builder() props.add(new PropertyDescriptor.Builder()
@ -216,18 +179,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
this.putHdfsRecordProperties = Collections.unmodifiableList(props); this.putHdfsRecordProperties = Collections.unmodifiableList(props);
} }
protected List<AllowableValue> getSchemaAccessStrategyValues() {
return strategyList;
}
protected AllowableValue getDefaultSchemaAccessStrategy() {
return SCHEMA_NAME_PROPERTY;
}
private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
}
/** /**
* @param context the initialization context * @param context the initialization context
* @return the possible compression types * @return the possible compression types
@ -259,22 +210,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
return putHdfsRecordProperties; return putHdfsRecordProperties;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
}
@OnScheduled @OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException { public final void onScheduled(final ProcessContext context) throws IOException {
super.abstractOnScheduled(context); super.abstractOnScheduled(context);
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
final String schemaAccess = context.getProperty(descriptor).getValue();
this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
@ -365,8 +305,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
HDFSRecordWriter recordWriter = null; HDFSRecordWriter recordWriter = null;
try (final BufferedInputStream in = new BufferedInputStream(rawIn)) { try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
// if we fail to create the RecordReader then we want to route to failure, so we need to // if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry // handle this separately from the other IOExceptions which normally route to retry
@ -379,8 +317,9 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
} }
final RecordSet recordSet = recordReader.createRecordSet(); final RecordSet recordSet = recordReader.createRecordSet();
writeResult.set(recordWriter.write(recordSet));
recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
writeResult.set(recordWriter.write(recordSet));
} catch (Exception e) { } catch (Exception e) {
exceptionHolder.set(e); exceptionHolder.set(e);
} finally { } finally {

View File

@ -18,7 +18,6 @@
package org.apache.nifi.serialization.record; package org.apache.nifi.serialization.record;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
@ -55,7 +54,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream content) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema schema) throws SchemaNotFoundException, IOException {
return new SimpleRecordSchema(Collections.emptyList()); return new SimpleRecordSchema(Collections.emptyList());
} }

View File

@ -47,7 +47,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE); final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE); final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);

View File

@ -45,7 +45,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final byte[] buffer = new byte[13]; final byte[] buffer = new byte[13];
try { try {
StreamUtils.fillBuffer(contentStream, buffer); StreamUtils.fillBuffer(contentStream, buffer);

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import java.io.IOException;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.serialization.record.RecordSchema;
public class InheritSchemaFromRecord implements SchemaAccessStrategy {
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
if (readSchema == null) {
throw new SchemaNotFoundException("Cannot inherit Schema from Record because no schema was found");
}
return readSchema;
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return EnumSet.allOf(SchemaField.class);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
public class NopSchemaAccessWriter implements SchemaAccessWriter {
@Override
public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
}
@Override
public Map<String, String> getAttributes(RecordSchema schema) {
return Collections.emptyMap();
}
@Override
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
}
@Override
public Set<SchemaField> getRequiredSchemaFields() {
return EnumSet.noneOf(SchemaField.class);
}
}

View File

@ -30,9 +30,10 @@ public interface SchemaAccessStrategy {
* *
* @param flowFile flowfile * @param flowFile flowfile
* @param contentStream content of flowfile * @param contentStream content of flowfile
* @param readSchema the schema that was read from the input FlowFile, or <code>null</code> if there was none
* @return the RecordSchema for the FlowFile * @return the RecordSchema for the FlowFile
*/ */
RecordSchema getSchema(FlowFile flowFile, InputStream contentStream) throws SchemaNotFoundException, IOException; RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/** /**
* @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}. * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}.

View File

@ -43,7 +43,7 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue(); final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
if (schemaName.trim().isEmpty()) { if (schemaName.trim().isEmpty()) {
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name."); throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name.");

View File

@ -465,7 +465,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordSchema writeSchema; final RecordSchema writeSchema;
try { try {
writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); writeSchema = writerFactory.getSchema(flowFile, recordSchema);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);

View File

@ -59,6 +59,7 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"}) @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. " @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. "
@ -309,6 +310,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final long startTime = System.nanoTime(); final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) { try (final PublisherLease lease = pool.obtainPublisher()) {
@ -323,24 +326,16 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final RecordSchema schema;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
schema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
continue;
}
try { try {
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, new InputStreamCallback() {
@Override @Override
public void process(final InputStream rawIn) throws IOException { public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) { try (final InputStream in = new BufferedInputStream(rawIn)) {
final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger()); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic); final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowFile, recordSet.getSchema());
lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic);
} catch (final SchemaNotFoundException | MalformedRecordException e) { } catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e); throw new ProcessException(e);
} }

View File

@ -33,7 +33,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
@ -96,7 +95,7 @@ public class PublisherLease implements Closeable {
} }
} }
void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema, void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
final String messageKeyField, final String topic) throws IOException { final String messageKeyField, final String topic) throws IOException {
if (tracker == null) { if (tracker == null) {
tracker = new InFlightMessageTracker(); tracker = new InFlightMessageTracker();
@ -105,7 +104,6 @@ public class PublisherLease implements Closeable {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record; Record record;
final RecordSet recordSet = reader.createRecordSet();
int recordCount = 0; int recordCount = 0;
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) { try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {

View File

@ -42,10 +42,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -65,7 +65,7 @@ public class TestPublishKafkaRecord_0_10 {
public void setup() throws InitializationException, IOException { public void setup() throws InitializationException, IOException {
mockPool = mock(PublisherPool.class); mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class); mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
any(RecordSchema.class), any(String.class), any(String.class)); any(RecordSchema.class), any(String.class), any(String.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease); when(mockPool.obtainPublisher()).thenReturn(mockLease);
@ -104,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison(); verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close(); verify(mockLease, times(1)).close();
@ -123,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison(); verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close(); verify(mockLease, times(1)).close();
@ -138,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close(); verify(mockLease, times(1)).close();
} }
@ -155,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close(); verify(mockLease, times(1)).close();
} }
@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison(); verify(mockLease, times(0)).poison();
@ -207,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison(); verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close(); verify(mockLease, times(1)).close();
@ -241,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2); runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2); runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete(); verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close(); verify(mockLease, times(1)).close();

View File

@ -18,7 +18,6 @@
package org.apache.nifi.processors.kafka.pubsub.util; package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
@ -53,7 +52,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
} }
@Override @Override
public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(FlowFile flowFile, RecordSchema schema) throws SchemaNotFoundException, IOException {
return null; return null;
} }

View File

@ -185,7 +185,7 @@ public class PutParquet extends AbstractPutHDFSRecord {
return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema); return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
} }
private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) { private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
builder.withConf(conf); builder.withConf(conf);
// Required properties // Required properties

View File

@ -45,7 +45,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -217,7 +216,7 @@ public class FetchParquetTest {
configure(proc); configure(proc);
final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class); final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException")); when(recordSetWriter.write(any(Record.class))).thenThrow(new IOException("IOException"));
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");

View File

@ -16,12 +16,25 @@
*/ */
package org.apache.nifi.processors.parquet; package org.apache.nifi.processors.parquet;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -32,7 +45,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
@ -49,21 +61,10 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
public class PutParquetTest { public class PutParquetTest {
@ -76,6 +77,10 @@ public class PutParquetTest {
private MockRecordParser readerFactory; private MockRecordParser readerFactory;
private TestRunner testRunner; private TestRunner testRunner;
@BeforeClass
public static void setupLogging() {
BasicConfigurator.configure();
}
@Before @Before
public void setup() throws IOException, InitializationException { public void setup() throws IOException, InitializationException {
@ -108,8 +113,6 @@ public class PutParquetTest {
testRunner.enableControllerService(readerFactory); testRunner.enableControllerService(readerFactory);
testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory"); testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
testRunner.setProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, schema.toString());
} }
@Test @Test
@ -325,7 +328,6 @@ public class PutParquetTest {
@Test @Test
public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException { public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException {
configure(proc, 10); configure(proc, 10);
testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis(); final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis();
@ -339,39 +341,6 @@ public class PutParquetTest {
testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
} }
@Test
public void testSchemaWithELMissingShouldRouteToFailure() throws InitializationException, IOException {
configure(proc, 10);
testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
final String filename = "testSchemaWithELMissingShouldRouteToFailure-" + System.currentTimeMillis();
// don't provide my.schema as an attribute
final Map<String,String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
}
@Test
public void testInvalidSchemaShouldRouteToFailure() throws InitializationException, IOException {
configure(proc, 10);
testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
final String filename = "testInvalidSchemaShouldRouteToFailure-" + System.currentTimeMillis();
// don't provide my.schema as an attribute
final Map<String,String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
flowFileAttributes.put("my.schema", "NOT A SCHEMA");
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
}
@Test @Test
public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
configure(proc, 10); configure(proc, 10);
@ -427,6 +396,7 @@ public class PutParquetTest {
final RecordReader recordReader = Mockito.mock(RecordReader.class); final RecordReader recordReader = Mockito.mock(RecordReader.class);
when(recordReader.createRecordSet()).thenReturn(recordSet); when(recordReader.createRecordSet()).thenReturn(recordSet);
when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema));
final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");

View File

@ -16,6 +16,15 @@
*/ */
package org.apache.nifi.record.script; package org.apache.nifi.record.script;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
import javax.script.Invocable;
import javax.script.ScriptException;
import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -30,15 +39,6 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
/** /**
* A RecordSetWriter implementation that allows the user to script the RecordWriter instance * A RecordSetWriter implementation that allows the user to script the RecordWriter instance
*/ */
@ -149,14 +149,14 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
} }
@Override @Override
public RecordSchema getSchema(FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final RecordSetWriterFactory writerFactory = recordFactory.get(); final RecordSetWriterFactory writerFactory = recordFactory.get();
if (writerFactory == null) { if (writerFactory == null) {
return null; return null;
} }
try { try {
return writerFactory.getSchema(flowFile, in); return writerFactory.getSchema(flowFile, readSchema);
} catch (UndeclaredThrowableException ute) { } catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause()); throw new IOException(ute.getCause());
} }

View File

@ -101,9 +101,7 @@ class ScriptedRecordSetWriterTest {
recordSetWriterFactory.onEnabled configurationContext recordSetWriterFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L) MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) def schema = recordSetWriterFactory.getSchema(mockFlowFile, null)
def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream)
ByteArrayOutputStream outputStream = new ByteArrayOutputStream() ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream) RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream)

View File

@ -102,7 +102,7 @@ class GroovyRecordSetWriter implements RecordSetWriter {
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@Override @Override
public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return null; return null;
} }

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -100,15 +99,6 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema writeSchema;
try (final InputStream rawIn = session.read(flowFile);
final InputStream in = new BufferedInputStream(rawIn)) {
writeSchema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger(); final AtomicInteger recordCount = new AtomicInteger();
@ -119,9 +109,10 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
@Override @Override
public void process(final InputStream in, final OutputStream out) throws IOException { public void process(final InputStream in, final OutputStream out) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger()); try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema());
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
writer.beginRecordSet(); writer.beginRecordSet();
Record record; Record record;
@ -135,14 +126,14 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes()); attributes.putAll(writeResult.getAttributes());
recordCount.set(writeResult.getRecordCount()); recordCount.set(writeResult.getRecordCount());
}
} catch (final SchemaNotFoundException | MalformedRecordException e) { } catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e); throw new ProcessException("Could not parse incoming data", e);
} }
} }
}); });
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to process {}", new Object[] {flowFile, e}); getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -112,15 +111,7 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema writeSchema;
try (final InputStream rawIn = session.read(flowFile);
final InputStream in = new BufferedInputStream(rawIn)) {
writeSchema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final AtomicInteger numRecords = new AtomicInteger(0); final AtomicInteger numRecords = new AtomicInteger(0);
final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>(); final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
@ -131,6 +122,8 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema());
Record record; Record record;
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext); final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -175,15 +174,6 @@ public class PartitionRecord extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema writeSchema;
try (final InputStream rawIn = session.read(flowFile);
final InputStream in = new BufferedInputStream(rawIn)) {
writeSchema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
getLogger().error("Failed to partition records for {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final Map<String, RecordPath> recordPaths; final Map<String, RecordPath> recordPaths;
try { try {
@ -203,6 +193,8 @@ public class PartitionRecord extends AbstractProcessor {
try (final InputStream in = session.read(flowFile)) { try (final InputStream in = session.read(flowFile)) {
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
final RecordSchema writeSchema = writerFactory.getSchema(flowFile, reader.getSchema());
Record record; Record record;
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
final Map<String, List<ValueWrapper>> recordMap = new HashMap<>(); final Map<String, List<ValueWrapper>> recordMap = new HashMap<>();

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -72,9 +71,10 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable; import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.serialization.record.ResultSetRecordSet;
@ -241,19 +241,19 @@ public class QueryRecord extends AbstractProcessor {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY) final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
.asControllerService(RecordSetWriterFactory.class); final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
.asControllerService(RecordReaderFactory.class);
final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>(); final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
final Set<FlowFile> createdFlowFiles = new HashSet<>(); final Set<FlowFile> createdFlowFiles = new HashSet<>();
// Determine the schema for writing the data
final RecordSchema recordSchema; final RecordSchema recordSchema;
try (final InputStream rawIn = session.read(original)) {
final RecordReader reader = recordReaderFactory.createRecordReader(original, rawIn, getLogger());
final RecordSchema inputSchema = reader.getSchema();
try (final InputStream rawIn = session.read(original); recordSchema = recordSetWriterFactory.getSchema(original, inputSchema);
final InputStream in = new BufferedInputStream(rawIn)) {
recordSchema = resultSetWriterFactory.getSchema(original, in);
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e}); getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE); session.transfer(original, REL_FAILURE);
@ -281,9 +281,9 @@ public class QueryRecord extends AbstractProcessor {
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>(); final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
final QueryResult queryResult; final QueryResult queryResult;
if (context.getProperty(CACHE_SCHEMA).asBoolean()) { if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
queryResult = queryWithCache(session, original, sql, context, recordParserFactory); queryResult = queryWithCache(session, original, sql, context, recordReaderFactory);
} else { } else {
queryResult = query(session, original, sql, context, recordParserFactory); queryResult = query(session, original, sql, context, recordReaderFactory);
} }
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(); final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
@ -293,7 +293,7 @@ public class QueryRecord extends AbstractProcessor {
transformed = session.write(transformed, new OutputStreamCallback() { transformed = session.write(transformed, new OutputStreamCallback() {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
try (final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) { try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) {
final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs); final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs);
writeResultRef.set(resultSetWriter.write(resultSet)); writeResultRef.set(resultSetWriter.write(resultSet));
mimeTypeRef.set(resultSetWriter.getMimeType()); mimeTypeRef.set(resultSetWriter.getMimeType());
@ -362,6 +362,7 @@ public class QueryRecord extends AbstractProcessor {
session.adjustCounter("Records Read", recordsRead, false); session.adjustCounter("Records Read", recordsRead, false);
} }
private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session, private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session,
final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException { final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException {

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -132,15 +131,6 @@ public class SplitRecord extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema;
try (final InputStream rawIn = session.read(original);
final InputStream in = new BufferedInputStream(rawIn)) {
schema = writerFactory.getSchema(original, in);
} catch (final Exception e) {
getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
return;
}
final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger(); final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
@ -151,6 +141,8 @@ public class SplitRecord extends AbstractProcessor {
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
final RecordSchema schema = writerFactory.getSchema(original, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet(); final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet); final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);

View File

@ -16,6 +16,14 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
@ -38,15 +46,6 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class TestQueryRecord { public class TestQueryRecord {
static { static {
@ -261,7 +260,7 @@ public class TestQueryRecord {
} }
@Override @Override
public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final List<RecordField> recordFields = columnNames.stream() final List<RecordField> recordFields = columnNames.stream()
.map(name -> new RecordField(name, RecordFieldType.STRING.getDataType())) .map(name -> new RecordField(name, RecordFieldType.STRING.getDataType()))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -53,11 +53,12 @@ public interface RecordSetWriterFactory extends ControllerService {
* </p> * </p>
* *
* @param flowFile the FlowFile from which the schema should be determined. * @param flowFile the FlowFile from which the schema should be determined.
* @param content the contents of the FlowFile from which to determine the schema * @param readSchema the schema that was read from the incoming FlowFile, or <code>null</code> if there is no input schema
*
* @return the Schema that should be used for writing Records * @return the Schema that should be used for writing Records
* @throws SchemaNotFoundException if unable to find the schema * @throws SchemaNotFoundException if unable to find the schema
*/ */
RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException; RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/** /**
* <p> * <p>

View File

@ -89,7 +89,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
return new AvroReaderWithEmbeddedSchema(in); return new AvroReaderWithEmbeddedSchema(in);
} else { } else {
final RecordSchema recordSchema = getSchema(flowFile, in); final RecordSchema recordSchema = getSchema(flowFile, in, null);
final Schema avroSchema; final Schema avroSchema;
try { try {

View File

@ -65,7 +65,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
try { try {
final Schema avroSchema; final Schema avroSchema;
try { try {
if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { if (recordSchema.getSchemaFormat().isPresent() && recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
final Optional<String> textOption = recordSchema.getSchemaText(); final Optional<String> textOption = recordSchema.getSchemaText();
if (textOption.isPresent()) { if (textOption.isPresent()) {
avroSchema = compileAvroSchema(textOption.get()); avroSchema = compileAvroSchema(textOption.get());

View File

@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>()); final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>());
final Schema avroSchema = dataFileStream.getSchema(); final Schema avroSchema = dataFileStream.getSchema();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);

View File

@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
if (this.context == null) { if (this.context == null) {
throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
} }

View File

@ -89,7 +89,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
// Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header. // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header.
final BufferedInputStream bufferedIn = new BufferedInputStream(in); final BufferedInputStream bufferedIn = new BufferedInputStream(in);
bufferedIn.mark(1024 * 1024); bufferedIn.mark(1024 * 1024);
final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn)); final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null);
bufferedIn.reset(); bufferedIn.reset();
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat); return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat);

View File

@ -222,7 +222,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class); private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
return recordSchema; return recordSchema;
} }
@ -235,7 +235,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override @Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(flowFile, in); final RecordSchema schema = getSchema(flowFile, in, null);
return new GrokRecordReader(in, grok, schema, appendUnmatchedLine); return new GrokRecordReader(in, grok, schema, appendUnmatchedLine);
} }
} }

View File

@ -129,7 +129,7 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
@Override @Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(flowFile, in); final RecordSchema schema = getSchema(flowFile, in, null);
return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat); return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat);
} }

View File

@ -70,6 +70,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
@Override @Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in), dateFormat, timeFormat, timestampFormat); return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, null), dateFormat, timeFormat, timestampFormat);
} }
} }

View File

@ -17,6 +17,10 @@
package org.apache.nifi.serialization; package org.apache.nifi.serialization;
import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -34,11 +38,12 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter; import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter; import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNameAsAttribute; import org.apache.nifi.schema.access.SchemaNameAsAttribute;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.access.SchemaTextAsAttribute; import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryService { public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryService {
@ -58,6 +63,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes", static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
"The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if " "The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
+ "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data."); + "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile.");
/** /**
* This constant is just a base spec for the actual PropertyDescriptor. * This constant is just a base spec for the actual PropertyDescriptor.
@ -67,7 +73,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder() private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder()
.name("Schema Write Strategy") .name("Schema Write Strategy")
.description("Specifies how the schema for a Record should be added to the data.") .description("Specifies how the schema for a Record should be added to the data.")
.allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA) .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)
.defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue()) .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue())
.required(true) .required(true)
.build(); .build();
@ -76,7 +82,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private volatile ConfigurationContext configurationContext; private volatile ConfigurationContext configurationContext;
private volatile SchemaAccessWriter schemaAccessWriter; private volatile SchemaAccessWriter schemaAccessWriter;
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA));
private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
@Override @Override
@ -98,6 +107,11 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return SCHEMA_NAME_ATTRIBUTE; return SCHEMA_NAME_ATTRIBUTE;
} }
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return INHERIT_RECORD_SCHEMA;
}
protected PropertyDescriptor getSchemaWriteStrategyDescriptor() { protected PropertyDescriptor getSchemaWriteStrategyDescriptor() {
return getPropertyDescriptor(SCHEMA_WRITE_STRATEGY.getName()); return getPropertyDescriptor(SCHEMA_WRITE_STRATEGY.getName());
} }
@ -121,7 +135,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
} }
protected List<AllowableValue> getSchemaWriteStrategyValues() { protected List<AllowableValue> getSchemaWriteStrategyValues() {
return strategyList; return schemaWriteStrategyList;
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
return schemaAccessStrategyList;
} }
protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) { protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
@ -132,11 +151,13 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
return new SchemaNameAsAttribute(); return new SchemaNameAsAttribute();
} else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) { } else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
return new SchemaTextAsAttribute(); return new WriteAvroSchemaAttributeStrategy();
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
return new HortonworksEncodedSchemaReferenceWriter(); return new HortonworksEncodedSchemaReferenceWriter();
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new HortonworksAttributeSchemaReferenceWriter(); return new HortonworksAttributeSchemaReferenceWriter();
} else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
return new NopSchemaAccessWriter();
} }
return null; return null;

View File

@ -32,6 +32,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -55,8 +56,10 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
private volatile ConfigurationContext configurationContext; private volatile ConfigurationContext configurationContext;
private volatile SchemaAccessStrategy schemaAccessStrategy; private volatile SchemaAccessStrategy schemaAccessStrategy;
private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
protected PropertyDescriptor getSchemaAcessStrategyDescriptor() { protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
@ -95,6 +98,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, schemaRegistry, context); this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
} }
@Override
protected ConfigurationContext getConfigurationContext() { protected ConfigurationContext getConfigurationContext() {
return configurationContext; return configurationContext;
} }
@ -103,13 +107,22 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
return schemaAccessStrategy; return schemaAccessStrategy;
} }
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { public final RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(); final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
if (accessStrategy == null) { if (accessStrategy == null) {
throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service"); throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
} }
return getSchemaAccessStrategy().getSchema(flowFile, contentStream); return getSchemaAccessStrategy().getSchema(flowFile, contentStream, readSchema);
}
public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
if (accessStrategy == null) {
throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
}
return getSchemaAccessStrategy().getSchema(flowFile, EMPTY_INPUT_STREAM, readSchema);
} }
@Override @Override

View File

@ -18,11 +18,9 @@
package org.apache.nifi.text; package org.apache.nifi.text;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -38,7 +36,6 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"}) @Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
@ -46,8 +43,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
+ "text is able to make use of the Expression Language to reference each of the fields that are available " + "text is able to make use of the Expression Language to reference each of the fields that are available "
+ "in a Record. Each record in the RecordSet will be separated by a single newline character.") + "in a Record. Each record in the RecordSet will be separated by a single newline character.")
public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(Collections.emptyList());
static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder() static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
.name("Text") .name("Text")
.description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.") .description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.")
@ -87,7 +82,7 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
} }
@Override @Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return EMPTY_SCHEMA; return readSchema;
} }
} }

View File

@ -27,7 +27,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.schema.access.SchemaTextAsAttribute; import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.WriteResult;
import org.junit.Assert; import org.junit.Assert;
@ -36,7 +36,7 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
@Override @Override
protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException {
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out); return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out);
} }
@Override @Override

View File

@ -56,7 +56,7 @@ public class TestCSVHeaderSchemaStrategy {
final RecordSchema schema; final RecordSchema schema;
try (final InputStream bais = new ByteArrayInputStream(headerBytes)) { try (final InputStream bais = new ByteArrayInputStream(headerBytes)) {
schema = strategy.getSchema(null, bais); schema = strategy.getSchema(null, bais, null);
} }
final List<String> expectedFieldNames = Arrays.asList("a", "b", "c", "d", "e,z", "f"); final List<String> expectedFieldNames = Arrays.asList("a", "b", "c", "d", "e,z", "f");