From 74968991d5034c96c481386f76929e9a49cab8a1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 27 Oct 2020 11:40:12 -0400 Subject: [PATCH] NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu --- .../nifi-kudu-processors/pom.xml | 5 + .../kudu/AbstractKuduProcessor.java | 138 ++++++----- .../apache/nifi/processors/kudu/PutKudu.java | 225 +++++++++++++----- 3 files changed, 239 insertions(+), 129 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index 34dea1e051..bf38d47317 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -83,6 +83,11 @@ org.apache.nifi nifi-record + + org.apache.nifi + nifi-record-path + 1.13.0-SNAPSHOT + org.apache.nifi nifi-security-utils diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 520fca394d..0ae8e40568 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -179,8 +179,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { protected KuduClient buildClient(final ProcessContext context) { final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); - final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final int adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(); return new KuduClient.KuduClientBuilder(masters) .defaultOperationTimeoutMs(operationTimeout) @@ -295,68 +295,72 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { if (lowercaseFields) { colName = colName.toLowerCase(); } - int colIdx = this.getColumnIndex(schema, colName); - if (colIdx != -1) { - ColumnSchema colSchema = schema.getColumnByIndex(colIdx); - Type colType = colSchema.getType(); - if (record.getValue(recordFieldName) == null) { - if (schema.getColumnByIndex(colIdx).isKey()) { - throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); - } else if(!schema.getColumnByIndex(colIdx).isNullable()) { - throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName)); - } - if (!ignoreNull) { - row.setNull(colName); - } - } else { - Object value = record.getValue(recordFieldName); - switch (colType) { - case BOOL: - row.addBoolean(colIdx, DataTypeUtils.toBoolean(value, recordFieldName)); - break; - case INT8: - row.addByte(colIdx, DataTypeUtils.toByte(value, recordFieldName)); - break; - case INT16: - row.addShort(colIdx, DataTypeUtils.toShort(value, recordFieldName)); - break; - case INT32: - row.addInt(colIdx, DataTypeUtils.toInteger(value, recordFieldName)); - break; - case INT64: - row.addLong(colIdx, DataTypeUtils.toLong(value, recordFieldName)); - break; - case UNIXTIME_MICROS: - DataType fieldType = record.getSchema().getDataType(recordFieldName).get(); - Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName), - () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName); - row.addTimestamp(colIdx, timestamp); - break; - case STRING: - row.addString(colIdx, DataTypeUtils.toString(value, recordFieldName)); - break; - case BINARY: - row.addBinary(colIdx, DataTypeUtils.toString(value, recordFieldName).getBytes()); - break; - case FLOAT: - row.addFloat(colIdx, DataTypeUtils.toFloat(value, recordFieldName)); - break; - case DOUBLE: - row.addDouble(colIdx, DataTypeUtils.toDouble(value, recordFieldName)); - break; - case DECIMAL: - row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName))); - break; - case VARCHAR: - row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName)); - break; - case DATE: - row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName)); - break; - default: - throw new IllegalStateException(String.format("unknown column type %s", colType)); - } + if (!schema.hasColumn(colName)) { + continue; + } + + final int columnIndex = schema.getColumnIndex(colName); + final ColumnSchema colSchema = schema.getColumnByIndex(columnIndex); + final Type colType = colSchema.getType(); + + if (record.getValue(recordFieldName) == null) { + if (schema.getColumnByIndex(columnIndex).isKey()) { + throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName)); + } else if(!schema.getColumnByIndex(columnIndex).isNullable()) { + throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName)); + } + + if (!ignoreNull) { + row.setNull(colName); + } + } else { + Object value = record.getValue(recordFieldName); + switch (colType) { + case BOOL: + row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName)); + break; + case INT8: + row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName)); + break; + case INT16: + row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName)); + break; + case INT32: + row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName)); + break; + case INT64: + row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName)); + break; + case UNIXTIME_MICROS: + DataType fieldType = record.getSchema().getDataType(recordFieldName).get(); + Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName), + () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName); + row.addTimestamp(columnIndex, timestamp); + break; + case STRING: + row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName)); + break; + case BINARY: + row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes()); + break; + case FLOAT: + row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName)); + break; + case DOUBLE: + row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName)); + break; + case DECIMAL: + row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName))); + break; + case VARCHAR: + row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName)); + break; + case DATE: + row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName)); + break; + default: + throw new IllegalStateException(String.format("unknown column type %s", colType)); } } } @@ -425,14 +429,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { return alterTable; } - private int getColumnIndex(Schema columns, String colName) { - try { - return columns.getColumnIndex(colName); - } catch (Exception ex) { - return -1; - } - } - protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { Upsert upsert = kuduTable.newUpsert(); buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 3e8e19917d..c811b6b441 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -40,34 +40,42 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.validation.RecordPathValidator; import org.apache.nifi.security.krb.KerberosAction; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSet; import javax.security.auth.login.LoginException; -import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; +import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY; @EventDriven @SupportsBatching @@ -86,7 +94,7 @@ public class PutKudu extends AbstractKuduProcessor { .description("The name of the Kudu Table to put data into") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor RECORD_READER = new Builder() @@ -113,7 +121,7 @@ public class PutKudu extends AbstractKuduProcessor { .description("Convert column names to lowercase when finding index of Kudu table columns") .defaultValue("false") .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); @@ -123,10 +131,31 @@ public class PutKudu extends AbstractKuduProcessor { "are encountered, the Kudu table will be altered to include new columns for those fields.") .defaultValue("false") .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); + static final PropertyDescriptor DATA_RECORD_PATH = new Builder() + .name("Data RecordPath") + .displayName("Data RecordPath") + .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" + + " Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.") + .required(false) + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(NONE) + .build(); + + static final PropertyDescriptor OPERATION_RECORD_PATH = new Builder() + .name("Operation RecordPath") + .displayName("Operation RecordPath") + .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " + + "RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the property" + + " will be ignored.") + .required(false) + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(NONE) + .build(); + protected static final Validator OperationTypeValidator = new Validator() { @Override public ValidationResult validate(String subject, String value, ValidationContext context) { @@ -156,9 +185,10 @@ public class PutKudu extends AbstractKuduProcessor { .displayName("Kudu Operation Type") .description("Specify operationType for this processor.\n" + "Valid values are: " + - Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "))) + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")) + + ". This Property will be ignored if the property is set.") .defaultValue(OperationType.INSERT.toString()) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .addValidator(OperationTypeValidator) .build(); @@ -184,7 +214,7 @@ public class PutKudu extends AbstractKuduProcessor { .defaultValue("1") .required(true) .addValidator(StandardValidators.createLongValidator(1, 100000, true)) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); protected static final PropertyDescriptor BATCH_SIZE = new Builder() @@ -196,7 +226,7 @@ public class PutKudu extends AbstractKuduProcessor { .defaultValue("100") .required(true) .addValidator(StandardValidators.createLongValidator(1, 100000, true)) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); protected static final PropertyDescriptor IGNORE_NULL = new Builder() @@ -205,7 +235,7 @@ public class PutKudu extends AbstractKuduProcessor { .defaultValue("false") .required(true) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) .build(); protected static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -220,9 +250,11 @@ public class PutKudu extends AbstractKuduProcessor { public static final String RECORD_COUNT_ATTR = "record.count"; // Properties set in onScheduled. - protected int batchSize = 100; - protected int ffbatch = 1; - protected SessionConfiguration.FlushMode flushMode; + private volatile int batchSize = 100; + private volatile int ffbatch = 1; + private volatile SessionConfiguration.FlushMode flushMode; + private volatile Function recordPathOperationType; + private volatile RecordPath dataRecordPath; @Override protected List getSupportedPropertyDescriptors() { @@ -236,6 +268,8 @@ public class PutKudu extends AbstractKuduProcessor { properties.add(LOWERCASE_FIELD_NAMES); properties.add(HANDLE_SCHEMA_DRIFT); properties.add(RECORD_READER); + properties.add(DATA_RECORD_PATH); + properties.add(OPERATION_RECORD_PATH); properties.add(INSERT_OPERATION); properties.add(FLUSH_MODE); properties.add(FLOWFILE_BATCH_SIZE); @@ -255,11 +289,22 @@ public class PutKudu extends AbstractKuduProcessor { } @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException, LoginException { + public void onScheduled(final ProcessContext context) throws LoginException { batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); createKerberosUserAndOrKuduClient(context); + + final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue(); + if (operationRecordPathValue == null) { + recordPathOperationType = null; + } else { + final RecordPath recordPath = RecordPath.compile(operationRecordPathValue); + recordPathOperationType = new RecordPathOperationType(recordPath); + } + + final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue(); + dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue); } @Override @@ -301,83 +346,119 @@ public class PutKudu extends AbstractKuduProcessor { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile); - final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase()); final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile)); final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile)); - final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); + final boolean handleSchemaDrift = Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); + + final Function operationTypeFunction; + if (recordPathOperationType == null) { + final OperationType staticOperationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase()); + operationTypeFunction = record -> staticOperationType; + } else { + operationTypeFunction = recordPathOperationType; + } final RecordSet recordSet = recordReader.createRecordSet(); - final List fieldNames = recordReader.getSchema().getFieldNames(); KuduTable kuduTable = kuduClient.openTable(tableName); // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them. if (handleSchemaDrift) { final Schema schema = kuduTable.getSchema(); - Stream fields = recordReader.getSchema().getFields().stream(); - List missing = fields.filter(field -> !schema.hasColumn( - lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName())) - .collect(Collectors.toList()); + final List missing = recordReader.getSchema().getFields().stream() + .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName())) + .collect(Collectors.toList()); + if (!missing.isEmpty()) { - getLogger().info("adding {} columns to table '{}' to handle schema drift", - new Object[]{missing.size(), tableName}); + getLogger().info("adding {} columns to table '{}' to handle schema drift", new Object[]{missing.size(), tableName}); + // Add each column one at a time to avoid failing if some of the missing columns // we created by a concurrent thread or application attempting to handle schema drift. - for (RecordField field : missing) { + for (final RecordField field : missing) { try { final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName(); kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType())); - } catch (KuduException e) { + } catch (final KuduException e) { // Ignore the exception if the column already exists due to concurrent // threads or applications attempting to handle schema drift. if (e.getStatus().isAlreadyPresent()) { - getLogger().info("column already exists in table '{}' while handling schema drift", - new Object[]{tableName}); + getLogger().info("Column already exists in table '{}' while handling schema drift", new Object[]{tableName}); } else { throw new ProcessException(e); } } } + // Re-open the table to get the new schema. kuduTable = kuduClient.openTable(tableName); } } - // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors. - // Because the session is shared across flow files, for batching efficiency, we - // need to flush when changing to and from INSERT_IGNORE operation types. - // This should be updated and simplified when KUDU-1563 is completed. - if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) { - flushKuduSession(kuduSession, false, pendingRowErrors); - kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE); - } - prevOperationType = operationType; - Record record = recordSet.next(); - while (record != null) { - Operation operation = createKuduOperation(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable); - // We keep track of mappings between Operations and their origins, - // so that we know which FlowFiles should be marked failure after buffered flush. - operationFlowFileMap.put(operation, flowFile); + recordReaderLoop: while (record != null) { + final OperationType operationType = operationTypeFunction.apply(record); - // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled - // but the buffer is too big" error. This can happen when flush mode is - // MANUAL_FLUSH and a FlowFile has more than one records. - if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) { - numBuffered = 0; - flushKuduSession(kuduSession, false, pendingRowErrors); + final List dataRecords; + if (dataRecordPath == null) { + dataRecords = Collections.singletonList(record); + } else { + final RecordPathResult result = dataRecordPath.evaluate(record); + final List fieldValues = result.getSelectedFields().collect(Collectors.toList()); + if (fieldValues.isEmpty()) { + throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results."); + } + + for (final FieldValue fieldValue : fieldValues) { + final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType(); + if (fieldType != RecordFieldType.RECORD) { + throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" + + " " + fieldType); + } + } + + dataRecords = new ArrayList<>(fieldValues.size()); + for (final FieldValue fieldValue : fieldValues) { + dataRecords.add((Record) fieldValue.getValue()); + } } - // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC - OperationResponse response = kuduSession.apply(operation); - if (response != null && response.hasRowError()) { - // Stop processing the records on the first error. - // Note that Kudu does not support rolling back of previous operations. - flowFileFailures.put(flowFile, response.getRowError()); - break; + for (final Record dataRecord : dataRecords) { + // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors. + // Because the session is shared across flow files, for batching efficiency, we + // need to flush when changing to and from INSERT_IGNORE operation types. + // This should be updated and simplified when KUDU-1563 is completed. + if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) { + flushKuduSession(kuduSession, false, pendingRowErrors); + kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE); + } + prevOperationType = operationType; + + final List fieldNames = dataRecord.getSchema().getFieldNames(); + Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable); + // We keep track of mappings between Operations and their origins, + // so that we know which FlowFiles should be marked failure after buffered flush. + operationFlowFileMap.put(operation, flowFile); + + // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled + // but the buffer is too big" error. This can happen when flush mode is + // MANUAL_FLUSH and a FlowFile has more than one records. + if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) { + numBuffered = 0; + flushKuduSession(kuduSession, false, pendingRowErrors); + } + + // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC + OperationResponse response = kuduSession.apply(operation); + if (response != null && response.hasRowError()) { + // Stop processing the records on the first error. + // Note that Kudu does not support rolling back of previous operations. + flowFileFailures.put(flowFile, response.getRowError()); + break recordReaderLoop; + } + + numBuffered++; + numRecords.merge(flowFile, 1, Integer::sum); } - numBuffered++; - numRecords.merge(flowFile, 1, Integer::sum); record = recordSet.next(); } } catch (Exception ex) { @@ -460,4 +541,32 @@ public class PutKudu extends AbstractKuduProcessor { throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); } } + + private static class RecordPathOperationType implements Function { + private final RecordPath recordPath; + + public RecordPathOperationType(final RecordPath recordPath) { + this.recordPath = recordPath; + } + + @Override + public OperationType apply(final Record record) { + final RecordPathResult recordPathResult = recordPath.evaluate(record); + final List resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList()); + if (resultList.isEmpty()) { + throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results"); + } + + if (resultList.size() > 1) { + throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record and received multiple distinct results (" + resultList + ")"); + } + + final String resultValue = String.valueOf(resultList.get(0).getValue()); + try { + return OperationType.valueOf(resultValue.toUpperCase()); + } catch (final IllegalArgumentException iae) { + throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue); + } + } + } }