NIFI-6552 - Kudu Put Operations

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3610.
This commit is contained in:
SandishKumarHN 2019-07-28 18:15:07 -07:00 committed by Pierre Villard
parent fc3477bd69
commit 26b203616e
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
6 changed files with 401 additions and 197 deletions

View File

@ -46,7 +46,7 @@
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.7.0</version>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.Update;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
import javax.security.auth.login.LoginException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.List;
public abstract class AbstractKuduProcessor extends AbstractProcessor {
static final PropertyDescriptor KUDU_MASTERS = new Builder()
.name("Kudu Masters")
.description("Comma separated addresses of the Kudu masters to connect to.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials to use for authentication")
.required(false)
.identifiesControllerService(KerberosCredentialsService.class)
.build();
static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
.name("kudu-operations-timeout-ms")
.displayName("Kudu Operation Timeout")
.description("Default timeout used for user operations (using sessions and scanners)")
.required(false)
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + "ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS = new Builder()
.name("kudu-keep-alive-period-timeout-ms")
.displayName("Kudu Keep Alive Period Timeout")
.description("Default timeout used for user operations")
.required(false)
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) + "ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected KuduClient kuduClient;
private volatile KerberosUser kerberosUser;
public KerberosUser getKerberosUser() {
return this.kerberosUser;
}
public KuduClient getKuduClient() {
return this.kuduClient;
}
public void createKuduClient(ProcessContext context) throws LoginException {
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService == null) {
return;
}
final String keytab = credentialsService.getKeytab();
final String principal = credentialsService.getPrincipal();
kerberosUser = loginKerberosUser(principal, keytab);
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
this.kuduClient = kerberosAction.execute();
}
protected KuduClient buildClient(final String masters, final ProcessContext context) {
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout)
.defaultSocketReadTimeoutMs(adminOperationTimeout)
.build();
}
protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException {
final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();
if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
} else {
responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.forEach(rowErrors::add);
}
}
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
kerberosUser.login();
return kerberosUser;
}
@OnStopped
public void shutdown() throws Exception {
try {
if (this.kuduClient != null) {
getLogger().debug("Closing KuduClient");
this.kuduClient.close();
this.kuduClient = null;
}
} finally {
if (kerberosUser != null) {
kerberosUser.logout();
kerberosUser = null;
}
}
}
@VisibleForTesting
protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, Boolean ignoreNull) {
for (String colName : fieldNames) {
int colIdx = this.getColumnIndex(schema, colName);
if (colIdx != -1) {
ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
Type colType = colSchema.getType();
if (record.getValue(colName) == null) {
if (schema.getColumnByIndex(colIdx).isKey()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
} else if(!schema.getColumnByIndex(colIdx).isNullable()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
}
if (!ignoreNull) {
row.setNull(colName);
continue;
}
} else {
switch (colType.getDataType(colSchema.getTypeAttributes())) {
case BOOL:
row.addBoolean(colIdx, record.getAsBoolean(colName));
break;
case FLOAT:
row.addFloat(colIdx, record.getAsFloat(colName));
break;
case DOUBLE:
row.addDouble(colIdx, record.getAsDouble(colName));
break;
case BINARY:
row.addBinary(colIdx, record.getAsString(colName).getBytes());
break;
case INT8:
row.addByte(colIdx, record.getAsInt(colName).byteValue());
break;
case INT16:
row.addShort(colIdx, record.getAsInt(colName).shortValue());
break;
case INT32:
row.addInt(colIdx, record.getAsInt(colName));
break;
case INT64:
case UNIXTIME_MICROS:
row.addLong(colIdx, record.getAsLong(colName));
break;
case STRING:
row.addString(colIdx, record.getAsString(colName));
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName)));
break;
default:
throw new IllegalStateException(String.format("unknown column type %s", colType));
}
}
}
}
}
private int getColumnIndex(Schema columns, String colName) {
try {
return columns.getColumnIndex(colName);
} catch (Exception ex) {
return -1;
}
}
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
Upsert upsert = kuduTable.newUpsert();
buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull);
return upsert;
}
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
Insert insert = kuduTable.newInsert();
buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull);
return insert;
}
protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
Delete delete = kuduTable.newDelete();
buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull);
return delete;
}
protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
Update update = kuduTable.newUpdate();
buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull);
return update;
}
}

View File

@ -20,5 +20,7 @@ package org.apache.nifi.processors.kudu;
public enum OperationType {
INSERT,
INSERT_IGNORE,
UPSERT;
UPSERT,
UPDATE,
DELETE;
}

View File

@ -17,20 +17,13 @@
package org.apache.nifi.processors.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
@ -39,20 +32,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@ -62,10 +51,8 @@ import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -82,14 +69,8 @@ import java.util.stream.Collectors;
"to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." +
" If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu")
public class PutKudu extends AbstractProcessor {
protected static final PropertyDescriptor KUDU_MASTERS = new Builder()
.name("Kudu Masters")
.description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public class PutKudu extends AbstractKuduProcessor {
protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
@ -99,20 +80,13 @@ public class PutKudu extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials to use for authentication")
.required(false)
.identifiesControllerService(KerberosCredentialsService.class)
.build();
public static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
@ -127,9 +101,10 @@ public class PutKudu extends AbstractProcessor {
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.displayName("Kudu Operation Type")
.description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
.allowableValues(OperationType.values())
.defaultValue(OperationType.INSERT.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -142,10 +117,11 @@ public class PutKudu extends AbstractProcessor {
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new Builder()
.name("FlowFiles per Batch")
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
"Depending on your memory size, and data size per row set an appropriate batch size " +
@ -157,7 +133,7 @@ public class PutKudu extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor BATCH_SIZE = new Builder()
.name("Batch Size")
.displayName("Max Records per Batch")
.description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " +
@ -169,6 +145,15 @@ public class PutKudu extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor IGNORE_NULL = new Builder()
.name("Ignore NULL")
.description("Ignore NULL on Kudu Put Operation, Update only non-Null columns if set true")
.defaultValue("false")
.allowableValues("true", "false")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -183,12 +168,10 @@ public class PutKudu extends AbstractProcessor {
protected OperationType operationType;
protected SessionConfiguration.FlushMode flushMode;
protected int batchSize = 100;
protected int ffbatch = 1;
protected KuduClient kuduClient;
private volatile KerberosUser kerberosUser;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
@ -201,7 +184,9 @@ public class PutKudu extends AbstractProcessor {
properties.add(FLUSH_MODE);
properties.add(FLOWFILE_BATCH_SIZE);
properties.add(BATCH_SIZE);
properties.add(IGNORE_NULL);
properties.add(KUDU_OPERATION_TIMEOUT_MS);
properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
return properties;
}
@ -213,58 +198,15 @@ public class PutKudu extends AbstractProcessor {
return rels;
}
protected KerberosUser kerberosUser;
protected KuduSession kuduSession;
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, LoginException {
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
getLogger().debug("Setting up Kudu connection...");
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
kuduClient = createClient(kuduMasters, credentialsService);
getLogger().debug("Kudu connection successfully initialized");
}
protected KuduClient createClient(final String masters, final KerberosCredentialsService credentialsService) throws LoginException {
if (credentialsService == null) {
return buildClient(masters);
}
final String keytab = credentialsService.getKeytab();
final String principal = credentialsService.getPrincipal();
kerberosUser = loginKerberosUser(principal, keytab);
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger());
return kerberosAction.execute();
}
protected KuduClient buildClient(final String masters) {
return new KuduClient.KuduClientBuilder(masters).build();
}
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
kerberosUser.login();
return kerberosUser;
}
@OnStopped
public final void closeClient() throws KuduException, LoginException {
try {
if (kuduClient != null) {
getLogger().debug("Closing KuduClient");
kuduClient.close();
kuduClient = null;
}
} finally {
if (kerberosUser != null) {
kerberosUser.logout();
kerberosUser = null;
}
}
createKuduClient(context);
}
@Override
@ -273,6 +215,7 @@ public class PutKudu extends AbstractProcessor {
if (flowFiles.isEmpty()) {
return;
}
kerberosUser = getKerberosUser();
final KerberosUser user = kerberosUser;
if (user == null) {
@ -290,9 +233,11 @@ public class PutKudu extends AbstractProcessor {
}
private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
final KuduSession kuduSession = getKuduSession(kuduClient);
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final KuduClient kuduClient = getKuduClient();
kuduSession = getKuduSession(kuduClient);
final Map<FlowFile, Integer> numRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
@ -300,6 +245,8 @@ public class PutKudu extends AbstractProcessor {
int numBuffered = 0;
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
@ -309,10 +256,7 @@ public class PutKudu extends AbstractProcessor {
Record record = recordSet.next();
while (record != null) {
Operation operation = operationType == OperationType.UPSERT
? upsertRecordToKudu(kuduTable, record, fieldNames)
: insertRecordToKudu(kuduTable, record, fieldNames);
Operation operation = getKuduOperationType(operationType, record, fieldNames, ignoreNull, kuduTable);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
@ -399,91 +343,20 @@ public class PutKudu extends AbstractProcessor {
return kuduSession;
}
private void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException {
final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();
if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
} else {
responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.forEach(rowErrors::add);
}
}
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
Upsert upsert = kuduTable.newUpsert();
this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
return upsert;
}
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
Insert insert = kuduTable.newInsert();
this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);
return insert;
}
void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames) {
for (String colName : fieldNames) {
int colIdx = this.getColumnIndex(schema, colName);
if (colIdx != -1) {
ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
Type colType = colSchema.getType();
if (record.getValue(colName) == null) {
row.setNull(colName);
continue;
}
switch (colType.getDataType(colSchema.getTypeAttributes())) {
case BOOL:
row.addBoolean(colIdx, record.getAsBoolean(colName));
break;
case FLOAT:
row.addFloat(colIdx, record.getAsFloat(colName));
break;
case DOUBLE:
row.addDouble(colIdx, record.getAsDouble(colName));
break;
case BINARY:
row.addBinary(colIdx, record.getAsString(colName).getBytes());
break;
case INT8:
row.addByte(colIdx, record.getAsInt(colName).byteValue());
break;
case INT16:
row.addShort(colIdx, record.getAsInt(colName).shortValue());
break;
case INT32:
row.addInt(colIdx, record.getAsInt(colName));
break;
case INT64:
case UNIXTIME_MICROS:
row.addLong(colIdx, record.getAsLong(colName));
break;
case STRING:
row.addString(colIdx, record.getAsString(colName));
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName)));
break;
default:
throw new IllegalStateException(String.format("unknown column type %s", colType));
}
}
}
}
private int getColumnIndex(Schema columns, String colName) {
try {
return columns.getColumnIndex(colName);
} catch (Exception ex) {
return -1;
private Operation getKuduOperationType(OperationType operationType, Record record, List<String> fieldNames, Boolean ignoreNull, KuduTable kuduTable) {
switch (operationType) {
case DELETE:
return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull);
case INSERT:
return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
case INSERT_IGNORE:
return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
case UPSERT:
return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
case UPDATE:
return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
default:
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
}
}
}

View File

@ -17,11 +17,14 @@
package org.apache.nifi.processors.kudu;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.Update;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
@ -30,14 +33,15 @@ import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.LinkedList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockPutKudu extends PutKudu {
private KuduSession session;
private LinkedList<Insert> insertQueue;
@ -58,18 +62,41 @@ public class MockPutKudu extends PutKudu {
}
@Override
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
Insert insert = insertQueue.poll();
return insert != null ? insert : mock(Insert.class);
}
@Override
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
return mock(Upsert.class);
}
@Override
protected KuduClient buildClient(final String masters) {
protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
return mock(Delete.class);
}
@Override
protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
return mock(Update.class);
}
@Override
public KuduClient buildClient(final String masters, ProcessContext context) {
final KuduClient client = mock(KuduClient.class);
try {
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
} catch (final Exception e) {
throw new AssertionError(e);
}
return client;
}
@Override
public KuduClient getKuduClient() {
final KuduClient client = mock(KuduClient.class);
try {
@ -138,4 +165,4 @@ public class MockPutKudu extends PutKudu {
protected KuduSession getKuduSession(KuduClient client) {
return session;
}
}
}

View File

@ -17,7 +17,7 @@
package org.apache.nifi.processors.kudu;
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
@ -74,7 +74,7 @@ import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -86,20 +86,23 @@ public class TestPutKudu {
public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal";
private TestRunner testRunner;
private MockPutKudu processor;
private MockRecordParser readerFactory;
@Before
public void setUp() {
public void setUp() throws InitializationException {
processor = new MockPutKudu();
testRunner = TestRunners.newTestRunner(processor);
setUpTestRunner(testRunner);
}
private void setUpTestRunner(TestRunner testRunner) {
private void setUpTestRunner(TestRunner testRunner) throws InitializationException {
testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString());
}
@ -173,7 +176,6 @@ public class TestPutKudu {
assertTrue(proc.loggedOut());
}
@Test
public void testInsecureClient() throws InitializationException {
createRecordReader(1);
@ -326,6 +328,42 @@ public class TestPutKudu {
flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
}
@Test
public void testDeleteFlowFiles() throws Exception {
createRecordReader(50);
testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.delete}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("kudu.record.delete", "DELETE");
testRunner.enqueue("string".getBytes(), attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
flowFile.assertContentEquals("string".getBytes());
flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
}
@Test
public void testUpdateFlowFiles() throws Exception {
createRecordReader(50);
testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.record.update}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("kudu.record.update", "UPDATE");
testRunner.enqueue("string".getBytes(), attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
flowFile.assertContentEquals("string".getBytes());
flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
}
@Test
public void testBuildRow() {
buildPartialRow((long) 1, "foo", (short) 10);
@ -348,11 +386,11 @@ public class TestPutKudu {
private void buildPartialRow(Long id, String name, Short age) {
final Schema kuduSchema = new Schema(Arrays.asList(
new ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
new ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
new ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
new ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
new ColumnSchema.ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes(
new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
).build()));
@ -369,11 +407,12 @@ public class TestPutKudu {
values.put("age", age);
values.put("updated_at", System.currentTimeMillis() * 1000);
values.put("score", 10000L);
new PutKudu().buildPartialRow(
processor.buildPartialRow(
kuduSchema,
kuduSchema.newPartialRow(),
new MapRecord(schema, values),
schema.getFieldNames()
schema.getFieldNames(),
true
);
}
@ -393,12 +432,12 @@ public class TestPutKudu {
EXCEPTION
}
private LinkedList<OperationResponse> queueInsert(MockPutKudu kudu, KuduSession session, boolean sync, ResultCode... results) throws Exception {
private LinkedList<OperationResponse> queueInsert(MockPutKudu putKudu, KuduSession session, boolean sync, ResultCode... results) throws Exception {
LinkedList<OperationResponse> responses = new LinkedList<>();
for (ResultCode result : results) {
boolean ok = result == OK;
Tuple<Insert, OperationResponse> tuple = insert(ok);
kudu.queue(tuple.getKey());
putKudu.queue(tuple.getKey());
if (result == EXCEPTION) {
when(session.apply(tuple.getKey())).thenThrow(mock(KuduException.class));
@ -526,6 +565,7 @@ public class TestPutKudu {
testRunner.run(numFlowFiles);
testRunner.assertTransferCount(PutKudu.REL_FAILURE, 3);
List<MockFlowFile> failedFlowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_FAILURE);
failedFlowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "2");
failedFlowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, sync ? "1" : "2");
@ -557,7 +597,6 @@ public class TestPutKudu {
testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
}
public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService {
private final String keytab;
private final String principal;