NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu

This commit is contained in:
Mark Payne 2020-10-27 11:40:12 -04:00 committed by markap14
parent bf962f6227
commit 74968991d5
3 changed files with 239 additions and 129 deletions

View File

@ -83,6 +83,11 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId> <artifactId>nifi-security-utils</artifactId>

View File

@ -179,8 +179,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
protected KuduClient buildClient(final ProcessContext context) { protected KuduClient buildClient(final ProcessContext context) {
final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final int adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
return new KuduClient.KuduClientBuilder(masters) return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout) .defaultOperationTimeoutMs(operationTimeout)
@ -295,14 +295,19 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
if (lowercaseFields) { if (lowercaseFields) {
colName = colName.toLowerCase(); colName = colName.toLowerCase();
} }
int colIdx = this.getColumnIndex(schema, colName);
if (colIdx != -1) { if (!schema.hasColumn(colName)) {
ColumnSchema colSchema = schema.getColumnByIndex(colIdx); continue;
Type colType = colSchema.getType(); }
final int columnIndex = schema.getColumnIndex(colName);
final ColumnSchema colSchema = schema.getColumnByIndex(columnIndex);
final Type colType = colSchema.getType();
if (record.getValue(recordFieldName) == null) { if (record.getValue(recordFieldName) == null) {
if (schema.getColumnByIndex(colIdx).isKey()) { if (schema.getColumnByIndex(columnIndex).isKey()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
} else if(!schema.getColumnByIndex(colIdx).isNullable()) { } else if(!schema.getColumnByIndex(columnIndex).isNullable()) {
throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName)); throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
} }
@ -313,46 +318,46 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
Object value = record.getValue(recordFieldName); Object value = record.getValue(recordFieldName);
switch (colType) { switch (colType) {
case BOOL: case BOOL:
row.addBoolean(colIdx, DataTypeUtils.toBoolean(value, recordFieldName)); row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
break; break;
case INT8: case INT8:
row.addByte(colIdx, DataTypeUtils.toByte(value, recordFieldName)); row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName));
break; break;
case INT16: case INT16:
row.addShort(colIdx, DataTypeUtils.toShort(value, recordFieldName)); row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName));
break; break;
case INT32: case INT32:
row.addInt(colIdx, DataTypeUtils.toInteger(value, recordFieldName)); row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName));
break; break;
case INT64: case INT64:
row.addLong(colIdx, DataTypeUtils.toLong(value, recordFieldName)); row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName));
break; break;
case UNIXTIME_MICROS: case UNIXTIME_MICROS:
DataType fieldType = record.getSchema().getDataType(recordFieldName).get(); DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName), Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
() -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName); () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
row.addTimestamp(colIdx, timestamp); row.addTimestamp(columnIndex, timestamp);
break; break;
case STRING: case STRING:
row.addString(colIdx, DataTypeUtils.toString(value, recordFieldName)); row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName));
break; break;
case BINARY: case BINARY:
row.addBinary(colIdx, DataTypeUtils.toString(value, recordFieldName).getBytes()); row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes());
break; break;
case FLOAT: case FLOAT:
row.addFloat(colIdx, DataTypeUtils.toFloat(value, recordFieldName)); row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
break; break;
case DOUBLE: case DOUBLE:
row.addDouble(colIdx, DataTypeUtils.toDouble(value, recordFieldName)); row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
break; break;
case DECIMAL: case DECIMAL:
row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName))); row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
break; break;
case VARCHAR: case VARCHAR:
row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName)); row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
break; break;
case DATE: case DATE:
row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName)); row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
break; break;
default: default:
throw new IllegalStateException(String.format("unknown column type %s", colType)); throw new IllegalStateException(String.format("unknown column type %s", colType));
@ -360,7 +365,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
} }
} }
} }
}
/** /**
* Converts a NiFi DataType to it's equivalent Kudu Type. * Converts a NiFi DataType to it's equivalent Kudu Type.
@ -425,14 +429,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return alterTable; return alterTable;
} }
private int getColumnIndex(Schema columns, String colName) {
try {
return columns.getColumnIndex(colName);
} catch (Exception ex) {
return -1;
}
}
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Upsert upsert = kuduTable.newUpsert(); Upsert upsert = kuduTable.newUpsert();
buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields); buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);

View File

@ -40,34 +40,42 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.security.krb.KerberosAction; import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException; import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
@EventDriven @EventDriven
@SupportsBatching @SupportsBatching
@ -86,7 +94,7 @@ public class PutKudu extends AbstractKuduProcessor {
.description("The name of the Kudu Table to put data into") .description("The name of the Kudu Table to put data into")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build(); .build();
public static final PropertyDescriptor RECORD_READER = new Builder() public static final PropertyDescriptor RECORD_READER = new Builder()
@ -113,7 +121,7 @@ public class PutKudu extends AbstractKuduProcessor {
.description("Convert column names to lowercase when finding index of Kudu table columns") .description("Convert column names to lowercase when finding index of Kudu table columns")
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build(); .build();
@ -123,10 +131,31 @@ public class PutKudu extends AbstractKuduProcessor {
"are encountered, the Kudu table will be altered to include new columns for those fields.") "are encountered, the Kudu table will be altered to include new columns for those fields.")
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build(); .build();
static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
.name("Data RecordPath")
.displayName("Data RecordPath")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
" Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor OPERATION_RECORD_PATH = new Builder()
.name("Operation RecordPath")
.displayName("Operation RecordPath")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " +
"RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property" +
" will be ignored.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
.build();
protected static final Validator OperationTypeValidator = new Validator() { protected static final Validator OperationTypeValidator = new Validator() {
@Override @Override
public ValidationResult validate(String subject, String value, ValidationContext context) { public ValidationResult validate(String subject, String value, ValidationContext context) {
@ -156,9 +185,10 @@ public class PutKudu extends AbstractKuduProcessor {
.displayName("Kudu Operation Type") .displayName("Kudu Operation Type")
.description("Specify operationType for this processor.\n" + .description("Specify operationType for this processor.\n" +
"Valid values are: " + "Valid values are: " +
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "))) Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")) +
". This Property will be ignored if the <Operation RecordPath> property is set.")
.defaultValue(OperationType.INSERT.toString()) .defaultValue(OperationType.INSERT.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(OperationTypeValidator) .addValidator(OperationTypeValidator)
.build(); .build();
@ -184,7 +214,7 @@ public class PutKudu extends AbstractKuduProcessor {
.defaultValue("1") .defaultValue("1")
.required(true) .required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true)) .addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(VARIABLE_REGISTRY)
.build(); .build();
protected static final PropertyDescriptor BATCH_SIZE = new Builder() protected static final PropertyDescriptor BATCH_SIZE = new Builder()
@ -196,7 +226,7 @@ public class PutKudu extends AbstractKuduProcessor {
.defaultValue("100") .defaultValue("100")
.required(true) .required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true)) .addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(VARIABLE_REGISTRY)
.build(); .build();
protected static final PropertyDescriptor IGNORE_NULL = new Builder() protected static final PropertyDescriptor IGNORE_NULL = new Builder()
@ -205,7 +235,7 @@ public class PutKudu extends AbstractKuduProcessor {
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build(); .build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder() protected static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -220,9 +250,11 @@ public class PutKudu extends AbstractKuduProcessor {
public static final String RECORD_COUNT_ATTR = "record.count"; public static final String RECORD_COUNT_ATTR = "record.count";
// Properties set in onScheduled. // Properties set in onScheduled.
protected int batchSize = 100; private volatile int batchSize = 100;
protected int ffbatch = 1; private volatile int ffbatch = 1;
protected SessionConfiguration.FlushMode flushMode; private volatile SessionConfiguration.FlushMode flushMode;
private volatile Function<Record, OperationType> recordPathOperationType;
private volatile RecordPath dataRecordPath;
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -236,6 +268,8 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(LOWERCASE_FIELD_NAMES); properties.add(LOWERCASE_FIELD_NAMES);
properties.add(HANDLE_SCHEMA_DRIFT); properties.add(HANDLE_SCHEMA_DRIFT);
properties.add(RECORD_READER); properties.add(RECORD_READER);
properties.add(DATA_RECORD_PATH);
properties.add(OPERATION_RECORD_PATH);
properties.add(INSERT_OPERATION); properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE); properties.add(FLUSH_MODE);
properties.add(FLOWFILE_BATCH_SIZE); properties.add(FLOWFILE_BATCH_SIZE);
@ -255,11 +289,22 @@ public class PutKudu extends AbstractKuduProcessor {
} }
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, LoginException { public void onScheduled(final ProcessContext context) throws LoginException {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKerberosUserAndOrKuduClient(context); createKerberosUserAndOrKuduClient(context);
final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue();
if (operationRecordPathValue == null) {
recordPathOperationType = null;
} else {
final RecordPath recordPath = RecordPath.compile(operationRecordPathValue);
recordPathOperationType = new RecordPathOperationType(recordPath);
}
final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
} }
@Override @Override
@ -301,47 +346,82 @@ public class PutKudu extends AbstractKuduProcessor {
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile); final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile)); final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile)); final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); final boolean handleSchemaDrift = Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
final Function<Record, OperationType> operationTypeFunction;
if (recordPathOperationType == null) {
final OperationType staticOperationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
operationTypeFunction = record -> staticOperationType;
} else {
operationTypeFunction = recordPathOperationType;
}
final RecordSet recordSet = recordReader.createRecordSet(); final RecordSet recordSet = recordReader.createRecordSet();
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
KuduTable kuduTable = kuduClient.openTable(tableName); KuduTable kuduTable = kuduClient.openTable(tableName);
// If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them. // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
if (handleSchemaDrift) { if (handleSchemaDrift) {
final Schema schema = kuduTable.getSchema(); final Schema schema = kuduTable.getSchema();
Stream<RecordField> fields = recordReader.getSchema().getFields().stream(); final List<RecordField> missing = recordReader.getSchema().getFields().stream()
List<RecordField> missing = fields.filter(field -> !schema.hasColumn( .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (!missing.isEmpty()) { if (!missing.isEmpty()) {
getLogger().info("adding {} columns to table '{}' to handle schema drift", getLogger().info("adding {} columns to table '{}' to handle schema drift", new Object[]{missing.size(), tableName});
new Object[]{missing.size(), tableName});
// Add each column one at a time to avoid failing if some of the missing columns // Add each column one at a time to avoid failing if some of the missing columns
// we created by a concurrent thread or application attempting to handle schema drift. // we created by a concurrent thread or application attempting to handle schema drift.
for (RecordField field : missing) { for (final RecordField field : missing) {
try { try {
final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName(); final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType())); kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
} catch (KuduException e) { } catch (final KuduException e) {
// Ignore the exception if the column already exists due to concurrent // Ignore the exception if the column already exists due to concurrent
// threads or applications attempting to handle schema drift. // threads or applications attempting to handle schema drift.
if (e.getStatus().isAlreadyPresent()) { if (e.getStatus().isAlreadyPresent()) {
getLogger().info("column already exists in table '{}' while handling schema drift", getLogger().info("Column already exists in table '{}' while handling schema drift", new Object[]{tableName});
new Object[]{tableName});
} else { } else {
throw new ProcessException(e); throw new ProcessException(e);
} }
} }
} }
// Re-open the table to get the new schema. // Re-open the table to get the new schema.
kuduTable = kuduClient.openTable(tableName); kuduTable = kuduClient.openTable(tableName);
} }
} }
Record record = recordSet.next();
recordReaderLoop: while (record != null) {
final OperationType operationType = operationTypeFunction.apply(record);
final List<Record> dataRecords;
if (dataRecordPath == null) {
dataRecords = Collections.singletonList(record);
} else {
final RecordPathResult result = dataRecordPath.evaluate(record);
final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
if (fieldValues.isEmpty()) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
}
for (final FieldValue fieldValue : fieldValues) {
final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
if (fieldType != RecordFieldType.RECORD) {
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
" " + fieldType);
}
}
dataRecords = new ArrayList<>(fieldValues.size());
for (final FieldValue fieldValue : fieldValues) {
dataRecords.add((Record) fieldValue.getValue());
}
}
for (final Record dataRecord : dataRecords) {
// In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors. // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
// Because the session is shared across flow files, for batching efficiency, we // Because the session is shared across flow files, for batching efficiency, we
// need to flush when changing to and from INSERT_IGNORE operation types. // need to flush when changing to and from INSERT_IGNORE operation types.
@ -352,9 +432,8 @@ public class PutKudu extends AbstractKuduProcessor {
} }
prevOperationType = operationType; prevOperationType = operationType;
Record record = recordSet.next(); final List<String> fieldNames = dataRecord.getSchema().getFieldNames();
while (record != null) { Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
Operation operation = createKuduOperation(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
// We keep track of mappings between Operations and their origins, // We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush. // so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile); operationFlowFileMap.put(operation, flowFile);
@ -373,11 +452,13 @@ public class PutKudu extends AbstractKuduProcessor {
// Stop processing the records on the first error. // Stop processing the records on the first error.
// Note that Kudu does not support rolling back of previous operations. // Note that Kudu does not support rolling back of previous operations.
flowFileFailures.put(flowFile, response.getRowError()); flowFileFailures.put(flowFile, response.getRowError());
break; break recordReaderLoop;
} }
numBuffered++; numBuffered++;
numRecords.merge(flowFile, 1, Integer::sum); numRecords.merge(flowFile, 1, Integer::sum);
}
record = recordSet.next(); record = recordSet.next();
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -460,4 +541,32 @@ public class PutKudu extends AbstractKuduProcessor {
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
} }
} }
private static class RecordPathOperationType implements Function<Record, OperationType> {
private final RecordPath recordPath;
public RecordPathOperationType(final RecordPath recordPath) {
this.recordPath = recordPath;
}
@Override
public OperationType apply(final Record record) {
final RecordPathResult recordPathResult = recordPath.evaluate(record);
final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
if (resultList.isEmpty()) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results");
}
if (resultList.size() > 1) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record and received multiple distinct results (" + resultList + ")");
}
final String resultValue = String.valueOf(resultList.get(0).getValue());
try {
return OperationType.valueOf(resultValue.toUpperCase());
} catch (final IllegalArgumentException iae) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
}
}
}
} }