NIFI-7989: Add Update Field Names and Record Writer to UpdateHiveTable processors

NIFI-7989: Only rewrite records if a field name doesn't match a table column name exactly
NIFI-7989: Rewrite records for created tables if Update Field Names is set

This closes #4750.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Matthew Burgess 2021-01-07 18:02:39 -05:00 committed by Peter Turcsanyi
parent 3cc8d767b3
commit b9076ca26e
5 changed files with 761 additions and 30 deletions

View File

@ -27,10 +27,13 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -41,8 +44,15 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException; import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
@ -55,9 +65,12 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -73,7 +86,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."), + "and 'failure' relationships, and contains the target table name."),
@WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned).") + "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned)."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer, only if a Record Writer is specified "
+ "and Update Field Names is 'true'."),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile, only if a Record Writer is specified and Update Field Names is 'true'.")
}) })
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@RequiresInstanceClassLoading @RequiresInstanceClassLoading
@ -163,6 +179,31 @@ public class UpdateHiveTable extends AbstractProcessor {
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS) .dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build(); .build();
static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("hive-update-field-names")
.displayName("Update Field Names")
.description("This property indicates whether to update the output schema such that the field names are set to the exact column names from the specified "
+ "table. This should be used if the incoming record field names may not match the table's column names in terms of upper- and lower-case. For example, this property should be "
+ "set to true if the output FlowFile (and target table storage) is Avro format, as Hive/Impala expects the field names to match the column names exactly.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.name("hive-record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer should use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types. If Create Table Strategy is set "
+ "'Create If Not Exists', the Record Writer's output format must match the Record Reader's format in order for the data to be placed in the created table location. Note that "
+ "this property is only used if 'Update Field Names' is set to true and the field names do not all match the column names exactly. If no "
+ "update is needed for any field names (or 'Update Field Names' is false), the Record Writer is not used and instead the input FlowFile is routed to success or failure "
+ "without modification.")
.identifiesControllerService(RecordSetWriterFactory.class)
.dependsOn(UPDATE_FIELD_NAMES, "true")
.required(true)
.build();
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder() static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder()
.name("hive-external-table-location") .name("hive-external-table-location")
.displayName("External Table Location") .displayName("External Table Location")
@ -237,6 +278,8 @@ public class UpdateHiveTable extends AbstractProcessor {
props.add(TABLE_MANAGEMENT_STRATEGY); props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION); props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT); props.add(TABLE_STORAGE_FORMAT);
props.add(UPDATE_FIELD_NAMES);
props.add(RECORD_WRITER_FACTORY);
props.add(QUERY_TIMEOUT); props.add(QUERY_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(props); propertyDescriptors = Collections.unmodifiableList(props);
@ -257,6 +300,30 @@ public class UpdateHiveTable extends AbstractProcessor {
return relationships; return relationships;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
final boolean recordWriterFactorySet = validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
final boolean createIfNotExists = validationContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
if (!recordWriterFactorySet && updateFieldNames) {
validationResults.add(new ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
.explanation("Record Writer must be set if 'Update Field Names' is true").valid(false).build());
}
final String tableManagementStrategy = validationContext.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if (!ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
managedTable = MANAGED_TABLE.getValue().equals(tableManagementStrategy);
// Ensure valid configuration for external tables
if (createIfNotExists && !managedTable && !validationContext.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
validationResults.add(new ValidationResult.Builder().subject(EXTERNAL_TABLE_LOCATION.getDisplayName())
.explanation("External Table Location must be set when Table Management Strategy is set to External").valid(false).build());
}
}
return validationResults;
}
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -266,6 +333,7 @@ public class UpdateHiveTable extends AbstractProcessor {
} }
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory recordWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue(); final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null; List<String> partitionClauseElements = null;
@ -292,6 +360,15 @@ public class UpdateHiveTable extends AbstractProcessor {
new Object[]{RecordReader.class.getSimpleName(), flowFile}, new Object[]{RecordReader.class.getSimpleName(), flowFile},
rrfe rrfe
); );
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = rrfe.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", rrfe.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }
@ -299,6 +376,10 @@ public class UpdateHiveTable extends AbstractProcessor {
RecordSchema recordSchema = reader.getSchema(); RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue()); final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
if (recordWriterFactory == null && updateFieldNames) {
throw new ProcessException("Record Writer must be set if 'Update Field Names' is true");
}
final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue(); final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable; final boolean managedTable;
if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) { if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
@ -331,8 +412,54 @@ public class UpdateHiveTable extends AbstractProcessor {
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue(); final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
try (final Connection connection = dbcpService.getConnection()) { try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, partitionClauseElements, createIfNotExists, externalTableLocation, storageFormat); final Map<String,String> attributes = new HashMap<>(flowFile.getAttributes());
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName); OutputMetadataHolder outputMetadataHolder = checkAndUpdateTableSchema(attributes, connection, recordSchema, tableName, partitionClauseElements,
createIfNotExists, externalTableLocation, storageFormat, updateFieldNames);
if (outputMetadataHolder != null) {
// The output schema changed (i.e. field names were updated), so write out the corresponding FlowFile
try {
final FlowFile inputFlowFile = flowFile;
flowFile = session.write(flowFile, (in, out) -> {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
final RecordReader recordReader;
final RecordSetWriter recordSetWriter;
try {
recordReader = recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
recordSetWriter = recordWriterFactory.createWriter(getLogger(), outputMetadataHolder.getOutputSchema(), out, attributes);
} catch (Exception e) {
if(e instanceof IOException) {
throw (IOException) e;
}
throw new IOException(new RecordReaderFactoryException("Unable to create RecordReader", e));
}
WriteResult writeResult = updateRecords(recordSchema, outputMetadataHolder, recordReader, recordSetWriter);
recordSetWriter.flush();
recordSetWriter.close();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
attributes.putAll(writeResult.getAttributes());
});
} catch (final Exception e) {
getLogger().error("Failed to process {}; will route to failure", new Object[]{flowFile, e});
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = e.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE);
return;
}
}
attributes.put(ATTR_OUTPUT_TABLE, tableName);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
@ -351,9 +478,9 @@ public class UpdateHiveTable extends AbstractProcessor {
} }
} }
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema, private synchronized OutputMetadataHolder checkAndUpdateTableSchema(Map<String,String> attributes, final Connection conn, final RecordSchema schema,
final String tableName, List<String> partitionClause, final boolean createIfNotExists, final String tableName, List<String> partitionClause, final boolean createIfNotExists,
final String externalTableLocation, final String storageFormat) throws IOException { final String externalTableLocation, final String storageFormat, final boolean updateFieldNames) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and // Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table // add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) { try (Statement s = conn.createStatement()) {
@ -461,7 +588,7 @@ public class UpdateHiveTable extends AbstractProcessor {
String partitionColumnName; String partitionColumnName;
columnName = tableInfo.getString(1); columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) { if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName); partitionColumns.add(columnName);
} }
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name // If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) { if (columnName.startsWith("#")) {
@ -483,7 +610,7 @@ public class UpdateHiveTable extends AbstractProcessor {
for (int i = 0; i < partitionClauseSize; i++) { for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split(" ")[0]; String partitionName = partitionClause.get(i).split(" ")[0];
String partitionValue = flowFile.getAttribute(partitionName); String partitionValue = attributes.get(partitionName);
if (StringUtils.isEmpty(partitionValue)) { if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition value attribute '" + partitionName + "'"); throw new IOException("No value found for partition value attribute '" + partitionName + "'");
} }
@ -557,9 +684,84 @@ public class UpdateHiveTable extends AbstractProcessor {
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList); outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
} }
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath); // If updating field names, return a new RecordSchema, otherwise return null
OutputMetadataHolder outputMetadataHolder;
if (updateFieldNames) {
List<RecordField> inputRecordFields = schema.getFields();
List<RecordField> outputRecordFields = new ArrayList<>();
Map<String,String> fieldMap = new HashMap<>();
boolean needsUpdating = false;
for (RecordField inputRecordField : inputRecordFields) {
final String inputRecordFieldName = inputRecordField.getFieldName();
boolean found = false;
for (String hiveColumnName : hiveColumns) {
if (inputRecordFieldName.equalsIgnoreCase(hiveColumnName)) {
// Set a flag if the field name doesn't match the column name exactly. This overall flag will determine whether
// the records need updating (if true) or not (if false)
if (!inputRecordFieldName.equals(hiveColumnName)) {
needsUpdating = true;
}
fieldMap.put(inputRecordFieldName, hiveColumnName);
outputRecordFields.add(new RecordField(hiveColumnName, inputRecordField.getDataType(), inputRecordField.getDefaultValue(), inputRecordField.isNullable()));
found = true;
break;
}
}
if (!found) {
// If the input field wasn't a Hive table column, add it back to the schema as-is
fieldMap.put(inputRecordFieldName, inputRecordFieldName);
}
}
outputMetadataHolder = needsUpdating ? new OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
: null;
} else {
outputMetadataHolder = null;
}
attributes.put(ATTR_OUTPUT_PATH, outputPath);
return outputMetadataHolder;
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
} }
private synchronized WriteResult updateRecords(final RecordSchema inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
final RecordReader reader, final RecordSetWriter writer) throws IOException {
try {
writer.beginRecordSet();
Record inputRecord;
while((inputRecord = reader.nextRecord()) != null) {
List<RecordField> inputRecordFields = inputRecordSchema.getFields();
Map<String,Object> outputRecordFields = new HashMap<>(inputRecordFields.size());
// Copy values from input field name to output field name
for(Map.Entry<String,String> mapping : outputMetadataHolder.getFieldMap().entrySet()) {
outputRecordFields.put(mapping.getValue(), inputRecord.getValue(mapping.getKey()));
}
Record outputRecord = new MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
writer.write(outputRecord);
}
return writer.finishRecordSet();
} catch (MalformedRecordException mre) {
throw new IOException("Error reading records: "+mre.getMessage(), mre);
}
}
private static class OutputMetadataHolder {
private final RecordSchema outputSchema;
private final Map<String,String> fieldMap;
public OutputMetadataHolder(RecordSchema outputSchema, Map<String, String> fieldMap) {
this.outputSchema = outputSchema;
this.fieldMap = fieldMap;
}
public RecordSchema getOutputSchema() {
return outputSchema;
}
public Map<String, String> getFieldMap() {
return fieldMap;
}
}
} }

View File

@ -27,10 +27,13 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.Hive3DBCPService; import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -41,8 +44,15 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException; import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
@ -55,9 +65,12 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -73,7 +86,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."), + "and 'failure' relationships, and contains the target table name."),
@WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned).") + "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned)."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer, only if a Record Writer is specified "
+ "and Update Field Names is 'true'."),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile, only if a Record Writer is specified and Update Field Names is 'true'.")
}) })
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@RequiresInstanceClassLoading @RequiresInstanceClassLoading
@ -163,6 +179,31 @@ public class UpdateHive3Table extends AbstractProcessor {
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS) .dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build(); .build();
static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("hive3-update-field-names")
.displayName("Update Field Names")
.description("This property indicates whether to update the output schema such that the field names are set to the exact column names from the specified "
+ "table. This should be used if the incoming record field names may not match the table's column names in terms of upper- and lower-case. For example, this property should be "
+ "set to true if the output FlowFile (and target table storage) is Avro format, as Hive/Impala expects the field names to match the column names exactly.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.name("hive3-record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer should use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types. If Create Table Strategy is set "
+ "'Create If Not Exists', the Record Writer's output format must match the Record Reader's format in order for the data to be placed in the created table location. Note that "
+ "this property is only used if 'Update Field Names' is set to true and the field names do not all match the column names exactly. If no "
+ "update is needed for any field names (or 'Update Field Names' is false), the Record Writer is not used and instead the input FlowFile is routed to success or failure "
+ "without modification.")
.identifiesControllerService(RecordSetWriterFactory.class)
.dependsOn(UPDATE_FIELD_NAMES, "true")
.required(true)
.build();
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder() static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder()
.name("hive3-external-table-location") .name("hive3-external-table-location")
.displayName("External Table Location") .displayName("External Table Location")
@ -237,6 +278,8 @@ public class UpdateHive3Table extends AbstractProcessor {
props.add(TABLE_MANAGEMENT_STRATEGY); props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION); props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT); props.add(TABLE_STORAGE_FORMAT);
props.add(UPDATE_FIELD_NAMES);
props.add(RECORD_WRITER_FACTORY);
props.add(QUERY_TIMEOUT); props.add(QUERY_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(props); propertyDescriptors = Collections.unmodifiableList(props);
@ -257,6 +300,30 @@ public class UpdateHive3Table extends AbstractProcessor {
return relationships; return relationships;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
final boolean recordWriterFactorySet = validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
final boolean createIfNotExists = validationContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
if (!recordWriterFactorySet && updateFieldNames) {
validationResults.add(new ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
.explanation("Record Writer must be set if 'Update Field Names' is true").valid(false).build());
}
final String tableManagementStrategy = validationContext.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if (!ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
managedTable = MANAGED_TABLE.getValue().equals(tableManagementStrategy);
// Ensure valid configuration for external tables
if (createIfNotExists && !managedTable && !validationContext.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
validationResults.add(new ValidationResult.Builder().subject(EXTERNAL_TABLE_LOCATION.getDisplayName())
.explanation("External Table Location must be set when Table Management Strategy is set to External").valid(false).build());
}
}
return validationResults;
}
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -266,6 +333,7 @@ public class UpdateHive3Table extends AbstractProcessor {
} }
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory recordWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue(); final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null; List<String> partitionClauseElements = null;
@ -292,13 +360,26 @@ public class UpdateHive3Table extends AbstractProcessor {
new Object[]{RecordReader.class.getSimpleName(), flowFile}, new Object[]{RecordReader.class.getSimpleName(), flowFile},
rrfe rrfe
); );
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = rrfe.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", rrfe.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }
RecordSchema recordSchema = reader.getSchema(); final RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue()); final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
if (recordWriterFactory == null && updateFieldNames) {
throw new ProcessException("Record Writer must be set if 'Update Field Names' is true");
}
final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue(); final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable; final boolean managedTable;
if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) { if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
@ -333,8 +414,54 @@ public class UpdateHive3Table extends AbstractProcessor {
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class); final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) { try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, partitionClauseElements, createIfNotExists, externalTableLocation, storageFormat); final Map<String,String> attributes = new HashMap<>(flowFile.getAttributes());
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName); OutputMetadataHolder outputMetadataHolder = checkAndUpdateTableSchema(attributes, connection, recordSchema, tableName, partitionClauseElements,
createIfNotExists, externalTableLocation, storageFormat, updateFieldNames);
if (outputMetadataHolder != null) {
// The output schema changed (i.e. field names were updated), so write out the corresponding FlowFile
try {
final FlowFile inputFlowFile = flowFile;
flowFile = session.write(flowFile, (in, out) -> {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
final RecordReader recordReader;
final RecordSetWriter recordSetWriter;
try {
recordReader = recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
recordSetWriter = recordWriterFactory.createWriter(getLogger(), outputMetadataHolder.getOutputSchema(), out, attributes);
} catch (Exception e) {
if(e instanceof IOException) {
throw (IOException) e;
}
throw new IOException(new RecordReaderFactoryException("Unable to create RecordReader", e));
}
WriteResult writeResult = updateRecords(recordSchema, outputMetadataHolder, recordReader, recordSetWriter);
recordSetWriter.flush();
recordSetWriter.close();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
attributes.putAll(writeResult.getAttributes());
});
} catch (final Exception e) {
getLogger().error("Failed to process {}; will route to failure", new Object[]{flowFile, e});
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = e.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE);
return;
}
}
attributes.put(ATTR_OUTPUT_TABLE, tableName);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
@ -351,9 +478,9 @@ public class UpdateHive3Table extends AbstractProcessor {
} }
} }
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema, private synchronized OutputMetadataHolder checkAndUpdateTableSchema(Map<String,String> attributes, final Connection conn, final RecordSchema schema,
final String tableName, List<String> partitionClause, final boolean createIfNotExists, final String tableName, List<String> partitionClause, final boolean createIfNotExists,
final String externalTableLocation, final String storageFormat) throws IOException { final String externalTableLocation, final String storageFormat, final boolean updateFieldNames) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and // Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table // add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) { try (Statement s = conn.createStatement()) {
@ -461,7 +588,7 @@ public class UpdateHive3Table extends AbstractProcessor {
String partitionColumnName; String partitionColumnName;
columnName = tableInfo.getString(1); columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) { if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName); partitionColumns.add(columnName);
} }
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name // If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) { if (columnName.startsWith("#")) {
@ -483,7 +610,7 @@ public class UpdateHive3Table extends AbstractProcessor {
for (int i = 0; i < partitionClauseSize; i++) { for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split(" ")[0]; String partitionName = partitionClause.get(i).split(" ")[0];
String partitionValue = flowFile.getAttribute(partitionName); String partitionValue = attributes.get(partitionName);
if (StringUtils.isEmpty(partitionValue)) { if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition value attribute '" + partitionName + "'"); throw new IOException("No value found for partition value attribute '" + partitionName + "'");
} }
@ -557,9 +684,84 @@ public class UpdateHive3Table extends AbstractProcessor {
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList); outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
} }
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath); // If updating field names, return a new RecordSchema, otherwise return null
OutputMetadataHolder outputMetadataHolder;
if (updateFieldNames) {
List<RecordField> inputRecordFields = schema.getFields();
List<RecordField> outputRecordFields = new ArrayList<>();
Map<String,String> fieldMap = new HashMap<>();
boolean needsUpdating = false;
for (RecordField inputRecordField : inputRecordFields) {
final String inputRecordFieldName = inputRecordField.getFieldName();
boolean found = false;
for (String hiveColumnName : hiveColumns) {
if (inputRecordFieldName.equalsIgnoreCase(hiveColumnName)) {
// Set a flag if the field name doesn't match the column name exactly. This overall flag will determine whether
// the records need updating (if true) or not (if false)
if (!inputRecordFieldName.equals(hiveColumnName)) {
needsUpdating = true;
}
fieldMap.put(inputRecordFieldName, hiveColumnName);
outputRecordFields.add(new RecordField(hiveColumnName, inputRecordField.getDataType(), inputRecordField.getDefaultValue(), inputRecordField.isNullable()));
found = true;
break;
}
}
if (!found) {
// If the input field wasn't a Hive table column, add it back to the schema as-is
fieldMap.put(inputRecordFieldName, inputRecordFieldName);
}
}
outputMetadataHolder = needsUpdating ? new OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
: null;
} else {
outputMetadataHolder = null;
}
attributes.put(ATTR_OUTPUT_PATH, outputPath);
return outputMetadataHolder;
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
} }
private synchronized WriteResult updateRecords(final RecordSchema inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
final RecordReader reader, final RecordSetWriter writer) throws IOException {
try {
writer.beginRecordSet();
Record inputRecord;
while((inputRecord = reader.nextRecord()) != null) {
List<RecordField> inputRecordFields = inputRecordSchema.getFields();
Map<String,Object> outputRecordFields = new HashMap<>(inputRecordFields.size());
// Copy values from input field name to output field name
for(Map.Entry<String,String> mapping : outputMetadataHolder.getFieldMap().entrySet()) {
outputRecordFields.put(mapping.getValue(), inputRecord.getValue(mapping.getKey()));
}
Record outputRecord = new MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
writer.write(outputRecord);
}
return writer.finishRecordSet();
} catch (MalformedRecordException mre) {
throw new IOException("Error reading records: "+mre.getMessage(), mre);
}
}
private static class OutputMetadataHolder {
private final RecordSchema outputSchema;
private final Map<String,String> fieldMap;
public OutputMetadataHolder(RecordSchema outputSchema, Map<String, String> fieldMap) {
this.outputSchema = outputSchema;
this.fieldMap = fieldMap;
}
public RecordSchema getOutputSchema() {
return outputSchema;
}
public Map<String, String> getFieldMap() {
return fieldMap;
}
}
} }

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.Hive3DBCPService; import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
@ -30,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
@ -395,6 +397,104 @@ public class TestUpdateHive3Table {
runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1); runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1);
} }
@Test
public void testUpdateFields() throws Exception {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_mixedcase.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
configure(processor, 3);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
runner.assertNotValid();
runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "users");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
flowFile.assertAttributeEquals("record.count", "3");
assertTrue(service.getExecutedStatements().isEmpty());
// Verify the table column names are the field names in the output
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
}
@Test
public void testUpdateFieldsAllNamesMatchTableExists() throws Exception {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
configure(processor, 3);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
runner.assertNotValid();
runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.enqueue("name,favorite_number,favorite_color,scale\n".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "users");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
flowFile.assertAttributeNotExists("record.count");
assertTrue(service.getExecutedStatements().isEmpty());
// Verify the table column names are the field names in the output
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
}
@Test
public void testUpdateFieldsAllNamesMatchCreateTable() throws Exception {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_mixedcase.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "_newTable");
RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
runner.assertNotValid();
runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(UpdateHive3Table.CREATE_TABLE, UpdateHive3Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive3Table.TABLE_STORAGE_FORMAT, UpdateHive3Table.AVRO);
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.enqueue("name,favorite_number,favorite_color,scale\n".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
flowFile.assertAttributeEquals("record.count", "1");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`NAME` STRING, `Favorite_number` INT, `favorite_Color` STRING, `scale` DOUBLE) STORED AS AVRO",
statements.get(0));
// Verify the table column names are the field names in the output
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
}
private static final class MockUpdateHive3Table extends UpdateHive3Table { private static final class MockUpdateHive3Table extends UpdateHive3Table {
} }

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "NAME", "type": "string"},
{"name": "Favorite_number", "type": ["int", "null"]},
{"name": "favorite_Color", "type": ["string", "null"]},
{"name": "scale", "type": ["double", "null"]}
]
}

View File

@ -26,10 +26,13 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.Hive_1_1DBCPService; import org.apache.nifi.dbcp.hive.Hive_1_1DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -40,9 +43,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException; import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
@ -60,9 +70,12 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -78,7 +91,10 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."), + "and 'failure' relationships, and contains the target table name."),
@WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned).") + "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned)."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer, only if a Record Writer is specified "
+ "and Update Field Names is 'true'."),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile, only if a Record Writer is specified and Update Field Names is 'true'.")
}) })
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@RequiresInstanceClassLoading @RequiresInstanceClassLoading
@ -168,6 +184,31 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS) .dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build(); .build();
static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder()
.name("hive11-update-field-names")
.displayName("Update Field Names")
.description("This property indicates whether to update the output schema such that the field names are set to the exact column names from the specified "
+ "table. This should be used if the incoming record field names may not match the table's column names in terms of upper- and lower-case. For example, this property should be "
+ "set to true if the output FlowFile (and target table storage) is Avro format, as Hive/Impala expects the field names to match the column names exactly.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.name("hive11-record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer should use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types. If Create Table Strategy is set "
+ "'Create If Not Exists', the Record Writer's output format must match the Record Reader's format in order for the data to be placed in the created table location. Note that "
+ "this property is only used if 'Update Field Names' is set to true and the field names do not all match the column names exactly. If no "
+ "update is needed for any field names (or 'Update Field Names' is false), the Record Writer is not used and instead the input FlowFile is routed to success or failure "
+ "without modification.")
.identifiesControllerService(RecordSetWriterFactory.class)
.dependsOn(UPDATE_FIELD_NAMES, "true")
.required(true)
.build();
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder() static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder()
.name("hive11-external-table-location") .name("hive11-external-table-location")
.displayName("External Table Location") .displayName("External Table Location")
@ -242,6 +283,8 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
props.add(TABLE_MANAGEMENT_STRATEGY); props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION); props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT); props.add(TABLE_STORAGE_FORMAT);
props.add(UPDATE_FIELD_NAMES);
props.add(RECORD_WRITER_FACTORY);
props.add(QUERY_TIMEOUT); props.add(QUERY_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(props); propertyDescriptors = Collections.unmodifiableList(props);
@ -262,6 +305,30 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
return relationships; return relationships;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
final boolean recordWriterFactorySet = validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
final boolean createIfNotExists = validationContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean();
if (!recordWriterFactorySet && updateFieldNames) {
validationResults.add(new ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName())
.explanation("Record Writer must be set if 'Update Field Names' is true").valid(false).build());
}
final String tableManagementStrategy = validationContext.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if (!ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
managedTable = MANAGED_TABLE.getValue().equals(tableManagementStrategy);
// Ensure valid configuration for external tables
if (createIfNotExists && !managedTable && !validationContext.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
validationResults.add(new ValidationResult.Builder().subject(EXTERNAL_TABLE_LOCATION.getDisplayName())
.explanation("External Table Location must be set when Table Management Strategy is set to External").valid(false).build());
}
}
return validationResults;
}
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -271,6 +338,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
} }
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory recordWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue(); final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null; List<String> partitionClauseElements = null;
@ -297,13 +365,26 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
new Object[]{RecordReader.class.getSimpleName(), flowFile}, new Object[]{RecordReader.class.getSimpleName(), flowFile},
rrfe rrfe
); );
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = rrfe.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", rrfe.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }
RecordSchema recordSchema = reader.getSchema(); final RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue()); final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final boolean updateFieldNames = context.getProperty(UPDATE_FIELD_NAMES).asBoolean();
if (recordWriterFactory == null && updateFieldNames) {
throw new ProcessException("Record Writer must be set if 'Update Field Names' is true");
}
final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue(); final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable; final boolean managedTable;
if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) { if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
@ -336,8 +417,54 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue(); final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class); final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) { try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, partitionClauseElements, createIfNotExists, externalTableLocation, storageFormat); final Map<String,String> attributes = new HashMap<>(flowFile.getAttributes());
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName); OutputMetadataHolder outputMetadataHolder = checkAndUpdateTableSchema(attributes, connection, recordSchema, tableName, partitionClauseElements,
createIfNotExists, externalTableLocation, storageFormat, updateFieldNames);
if (outputMetadataHolder != null) {
// The output schema changed (i.e. field names were updated), so write out the corresponding FlowFile
try {
final FlowFile inputFlowFile = flowFile;
flowFile = session.write(flowFile, (in, out) -> {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
final RecordReader recordReader;
final RecordSetWriter recordSetWriter;
try {
recordReader = recordReaderFactory.createRecordReader(inputFlowFile, in, getLogger());
recordSetWriter = recordWriterFactory.createWriter(getLogger(), outputMetadataHolder.getOutputSchema(), out, attributes);
} catch (Exception e) {
if(e instanceof IOException) {
throw (IOException) e;
}
throw new IOException(new RecordReaderFactoryException("Unable to create RecordReader", e));
}
WriteResult writeResult = updateRecords(recordSchema, outputMetadataHolder, recordReader, recordSetWriter);
recordSetWriter.flush();
recordSetWriter.close();
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
attributes.putAll(writeResult.getAttributes());
});
} catch (final Exception e) {
getLogger().error("Failed to process {}; will route to failure", new Object[]{flowFile, e});
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = e.getCause();
if (c != null) {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE);
return;
}
}
attributes.put(ATTR_OUTPUT_TABLE, tableName);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL()); session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} }
@ -356,9 +483,9 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
} }
} }
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema, private synchronized OutputMetadataHolder checkAndUpdateTableSchema(Map<String,String> attributes, final Connection conn, final RecordSchema schema,
final String tableName, List<String> partitionClause, final boolean createIfNotExists, final String tableName, List<String> partitionClause, final boolean createIfNotExists,
final String externalTableLocation, final String storageFormat) throws IOException { final String externalTableLocation, final String storageFormat, final boolean updateFieldNames) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and // Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table // add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) { try (Statement s = conn.createStatement()) {
@ -466,7 +593,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
String partitionColumnName; String partitionColumnName;
columnName = tableInfo.getString(1); columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) { if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName); partitionColumns.add(columnName);
} }
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name // If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) { if (columnName.startsWith("#")) {
@ -488,7 +615,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
for (int i = 0; i < partitionClauseSize; i++) { for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split(" ")[0]; String partitionName = partitionClause.get(i).split(" ")[0];
String partitionValue = flowFile.getAttribute(partitionName); String partitionValue = attributes.get(partitionName);
if (StringUtils.isEmpty(partitionValue)) { if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition value attribute '" + partitionName + "'"); throw new IOException("No value found for partition value attribute '" + partitionName + "'");
} }
@ -562,7 +689,42 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList); outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
} }
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath); // If updating field names, return a new RecordSchema, otherwise return null
OutputMetadataHolder outputMetadataHolder;
if (updateFieldNames) {
List<RecordField> inputRecordFields = schema.getFields();
List<RecordField> outputRecordFields = new ArrayList<>();
Map<String,String> fieldMap = new HashMap<>();
boolean needsUpdating = false;
for (RecordField inputRecordField : inputRecordFields) {
final String inputRecordFieldName = inputRecordField.getFieldName();
boolean found = false;
for (String hiveColumnName : hiveColumns) {
if (inputRecordFieldName.equalsIgnoreCase(hiveColumnName)) {
// Set a flag if the field name doesn't match the column name exactly. This overall flag will determine whether
// the records need updating (if true) or not (if false)
if (!inputRecordFieldName.equals(hiveColumnName)) {
needsUpdating = true;
}
fieldMap.put(inputRecordFieldName, hiveColumnName);
outputRecordFields.add(new RecordField(hiveColumnName, inputRecordField.getDataType(), inputRecordField.getDefaultValue(), inputRecordField.isNullable()));
found = true;
break;
}
}
if (!found) {
// If the input field wasn't a Hive table column, add it back to the schema as-is
fieldMap.put(inputRecordFieldName, inputRecordFieldName);
}
}
outputMetadataHolder = needsUpdating ? new OutputMetadataHolder(new SimpleRecordSchema(outputRecordFields), fieldMap)
: null;
} else {
outputMetadataHolder = null;
}
attributes.put(ATTR_OUTPUT_PATH, outputPath);
return outputMetadataHolder;
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
@ -644,7 +806,46 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
} }
return null; return null;
} }
throw new IllegalArgumentException("Error converting Avro type " + dataType.name() + " to Hive type"); throw new IllegalArgumentException("Error converting Avro type " + dataType.name() + " to Hive type");
} }
private synchronized WriteResult updateRecords(final RecordSchema inputRecordSchema, final OutputMetadataHolder outputMetadataHolder,
final RecordReader reader, final RecordSetWriter writer) throws IOException {
try {
writer.beginRecordSet();
Record inputRecord;
while((inputRecord = reader.nextRecord()) != null) {
List<RecordField> inputRecordFields = inputRecordSchema.getFields();
Map<String,Object> outputRecordFields = new HashMap<>(inputRecordFields.size());
// Copy values from input field name to output field name
for(Map.Entry<String,String> mapping : outputMetadataHolder.getFieldMap().entrySet()) {
outputRecordFields.put(mapping.getValue(), inputRecord.getValue(mapping.getKey()));
}
Record outputRecord = new MapRecord(outputMetadataHolder.getOutputSchema(), outputRecordFields);
writer.write(outputRecord);
}
return writer.finishRecordSet();
} catch (MalformedRecordException mre) {
throw new IOException("Error reading records: "+mre.getMessage(), mre);
}
}
private static class OutputMetadataHolder {
private final RecordSchema outputSchema;
private final Map<String,String> fieldMap;
public OutputMetadataHolder(RecordSchema outputSchema, Map<String, String> fieldMap) {
this.outputSchema = outputSchema;
this.fieldMap = fieldMap;
}
public RecordSchema getOutputSchema() {
return outputSchema;
}
public Map<String, String> getFieldMap() {
return fieldMap;
}
}
} }