NIFI-7142: Automatically handle schema drift in the PutKudu processor

Adds a boolean property to the PutKudu processor to optionally
enable automatic schema drift handling.

If set to true, when fields with names that are not in the target
Kudu table are encountered, the Kudu table will be altered to
include new columns for those fields.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4053.
This commit is contained in:
Grant Henke 2020-02-13 09:24:08 -06:00 committed by Pierre Villard
parent fe416d1ea0
commit 268ba1d23e
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
3 changed files with 121 additions and 12 deletions

View File

@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction; import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser; import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import javax.security.auth.login.LoginException; import javax.security.auth.login.LoginException;
@ -231,6 +232,35 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
} }
} }
/**
* Converts a NiFi DataType to it's equivalent Kudu Type.
*/
protected Type toKuduType(DataType nifiType) {
switch (nifiType.getFieldType()) {
case BOOLEAN:
return Type.BOOL;
case BYTE:
return Type.INT8;
case SHORT:
return Type.INT16;
case INT:
return Type.INT32;
case LONG:
return Type.INT64;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case TIMESTAMP:
return Type.UNIXTIME_MICROS;
case CHAR:
case STRING:
return Type.STRING;
default:
throw new IllegalArgumentException(String.format("unsupported type %s", nifiType));
}
}
private int getColumnIndex(Schema columns, String colName) { private int getColumnIndex(Schema columns, String colName) {
try { try {
return columns.getColumnIndex(colName); return columns.getColumnIndex(colName);

View File

@ -17,7 +17,10 @@
package org.apache.nifi.processors.kudu; package org.apache.nifi.processors.kudu;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.Operation; import org.apache.kudu.client.Operation;
@ -34,6 +37,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -46,6 +50,7 @@ import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException; import javax.security.auth.login.LoginException;
@ -59,6 +64,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
@EventDriven @EventDriven
@SupportsBatching @SupportsBatching
@ -108,6 +114,16 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build(); .build();
protected static final PropertyDescriptor HANDLE_SCHEMA_DRIFT = new Builder()
.name("Handle Schema Drift")
.description("If set to true, when fields with names that are not in the target Kudu table " +
"are encountered, the Kudu table will be altered to include new columns for those fields.")
.defaultValue("false")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
protected static final PropertyDescriptor INSERT_OPERATION = new Builder() protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation") .name("Insert Operation")
.displayName("Kudu Operation Type") .displayName("Kudu Operation Type")
@ -158,7 +174,6 @@ public class PutKudu extends AbstractKuduProcessor {
.name("Ignore NULL") .name("Ignore NULL")
.description("Ignore NULL on Kudu Put Operation, Update only non-Null columns if set true") .description("Ignore NULL on Kudu Put Operation, Update only non-Null columns if set true")
.defaultValue("false") .defaultValue("false")
.allowableValues("true", "false")
.required(true) .required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ -188,6 +203,7 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(KERBEROS_CREDENTIALS_SERVICE); properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SKIP_HEAD_LINE); properties.add(SKIP_HEAD_LINE);
properties.add(LOWERCASE_FIELD_NAMES); properties.add(LOWERCASE_FIELD_NAMES);
properties.add(HANDLE_SCHEMA_DRIFT);
properties.add(RECORD_READER); properties.add(RECORD_READER);
properties.add(INSERT_OPERATION); properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE); properties.add(FLUSH_MODE);
@ -251,16 +267,52 @@ public class PutKudu extends AbstractKuduProcessor {
OperationType prevOperationType = OperationType.INSERT; OperationType prevOperationType = OperationType.INSERT;
final List<RowError> pendingRowErrors = new ArrayList<>(); final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) { for (FlowFile flowFile : flowFiles) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
try (final InputStream in = session.read(flowFile); try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
final RecordSet recordSet = recordReader.createRecordSet(); final RecordSet recordSet = recordReader.createRecordSet();
final List<String> fieldNames = recordReader.getSchema().getFieldNames(); final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final KuduTable kuduTable = kuduClient.openTable(tableName); KuduTable kuduTable = kuduClient.openTable(tableName);
// If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
if (handleSchemaDrift) {
final Schema schema = kuduTable.getSchema();
Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
.collect(Collectors.toList());
if (!missing.isEmpty()) {
getLogger().info("adding {} columns to table '{}' to handle schema drift",
new Object[]{missing.size(), tableName});
// Add each column one at a time to avoid failing if some of the missing columns
// we created by a concurrent thread or application attempting to handle schema drift.
for (RecordField field : missing) {
try {
String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
AlterTableOptions alter = new AlterTableOptions();
alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
kuduClient.alterTable(tableName, alter);
} catch (KuduException e) {
// Ignore the exception if the column already exists due to concurrent
// threads or applications attempting to handle schema drift.
if (e.getStatus().isAlreadyPresent()) {
getLogger().info("column already exists in table '{}' while handling schema drift",
new Object[]{tableName});
} else {
throw new ProcessException(e);
}
}
}
// Re-open the table to get the new schema.
kuduTable = kuduClient.openTable(tableName);
}
}
// In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors. // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
// Because the session is shared across flow files, for batching efficiency, we // Because the session is shared across flow files, for batching efficiency, we
@ -348,6 +400,14 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false); session.adjustCounter("Records Inserted", totalCount, false);
} }
private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile);
if (property.isRequired() && evaluatedProperty == null) {
throw new ProcessException(String.format("Property `%s` is required but evaluated to null", property.getDisplayName()));
}
return evaluatedProperty.getValue();
}
protected KuduSession createKuduSession(final KuduClient client) { protected KuduSession createKuduSession(final KuduClient client) {
final KuduSession kuduSession = client.newSession(); final KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize); kuduSession.setMutationBufferSpace(batchSize);

View File

@ -99,9 +99,8 @@ public class ITPutKudu {
KuduClient client = harness.getClient(); KuduClient client = harness.getClient();
List<ColumnSchema> columns = new ArrayList<>(); List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("stringVal", Type.STRING).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("stringval", Type.STRING).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("num32Val", Type.INT32).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("num32val", Type.INT32).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleVal", Type.DOUBLE).build());
Schema schema = new Schema(columns); Schema schema = new Schema(columns);
CreateTableOptions opts = new CreateTableOptions() CreateTableOptions opts = new CreateTableOptions()
.addHashPartitions(Collections.singletonList("id"), 4); .addHashPartitions(Collections.singletonList("id"), 4);
@ -113,10 +112,12 @@ public class ITPutKudu {
readerFactory.addSchemaField("id", RecordFieldType.INT); readerFactory.addSchemaField("id", RecordFieldType.INT);
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING); readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
readerFactory.addSchemaField("num32Val", RecordFieldType.INT); readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
// Add two extra columns to test handleSchemaDrift = true.
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
readerFactory.addSchemaField("floatVal", RecordFieldType.FLOAT);
for (int i = 0; i < numOfRecord; i++) { for (int i = 0; i < numOfRecord; i++) {
readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i); readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, 100.88 + i);
} }
testRunner.addControllerService("mock-reader-factory", readerFactory); testRunner.addControllerService("mock-reader-factory", readerFactory);
@ -139,6 +140,18 @@ public class ITPutKudu {
testRunner.setProperty(PutKudu.BATCH_SIZE, "10"); testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2"); testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
// Don't ignore null values.
flowFileAttributes.put("kudu.ignore.null", "false");
testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");
// Enable lowercase handling.
flowFileAttributes.put("kudu.lowercase.field.names", "true");
testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "${kudu.lowercase.field.names}");
// Enable schema drift handling.
flowFileAttributes.put("kudu.handle.schema.drift", "true");
testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "${kudu.handle.schema.drift}");
// Increase the thread count to better simulate a production environment // Increase the thread count to better simulate a production environment
testRunner.setThreadCount(4); testRunner.setThreadCount(4);
@ -163,9 +176,15 @@ public class ITPutKudu {
final ProvenanceEventRecord provEvent = provEvents.get(0); final ProvenanceEventRecord provEvent = provEvents.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
// Verify Kudu record count.
KuduClient client = harness.getClient(); KuduClient client = harness.getClient();
KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME); KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
// Verify the extra field was added.
Assert.assertEquals(5, kuduTable.getSchema().getColumnCount());
Assert.assertTrue(kuduTable.getSchema().hasColumn("doubleval"));
Assert.assertTrue(kuduTable.getSchema().hasColumn("floatval"));
// Verify Kudu record count.
KuduScanner scanner = client.newScannerBuilder(kuduTable).build(); KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
int count = 0; int count = 0;
for (RowResult unused : scanner) { for (RowResult unused : scanner) {