mirror of https://github.com/apache/nifi.git
NIFI-11271: This closes #7795. Removed deprecated Kerberos-related properties and updated to make use of KerberosUserService; some code cleanup to bring up-to-date with Java 21 recommendations such as Stream.toList() instead of Stream.collect(Collectors.toList()) and using enhanced switch statements
Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
parent
ff05a5d158
commit
16c527271b
|
@ -63,11 +63,6 @@
|
|||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
|
@ -88,6 +83,12 @@
|
|||
<artifactId>kudu-client</artifactId>
|
||||
<version>${kudu.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kerberos-user-service-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
|
|
|
@ -16,6 +16,18 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.kudu;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import org.apache.kudu.ColumnSchema;
|
||||
import org.apache.kudu.ColumnTypeAttributes;
|
||||
import org.apache.kudu.Schema;
|
||||
|
@ -38,12 +50,11 @@ import org.apache.nifi.controller.AbstractControllerService;
|
|||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.lookup.RecordLookupService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
|
@ -52,20 +63,6 @@ import org.apache.nifi.serialization.record.RecordField;
|
|||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@CapabilityDescription("Lookup a record from Kudu Server associated with the specified key. Binary columns are base64 encoded. Only one matched row will be returned")
|
||||
@Tags({"lookup", "enrich", "key", "value", "kudu"})
|
||||
|
@ -80,13 +77,12 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kudu-lu-kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Specifies the Kerberos Credentials to use for authentication")
|
||||
.required(false)
|
||||
.identifiesControllerService(KerberosCredentialsService.class)
|
||||
.build();
|
||||
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Kerberos User Service")
|
||||
.description("Specifies the Kerberos Credentials to use for authentication")
|
||||
.required(false)
|
||||
.identifiesControllerService(KerberosUserService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder()
|
||||
.name("kudu-lu-operations-timeout-ms")
|
||||
|
@ -137,7 +133,6 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
|
||||
protected List<PropertyDescriptor> properties;
|
||||
|
||||
protected KerberosCredentialsService credentialsService;
|
||||
private volatile KerberosUser kerberosUser;
|
||||
|
||||
protected String kuduMasters;
|
||||
|
@ -154,26 +149,22 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
protected void init(final ControllerServiceInitializationContext context) {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(KUDU_MASTERS);
|
||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
properties.add(KERBEROS_USER_SERVICE);
|
||||
properties.add(KUDU_OPERATION_TIMEOUT_MS);
|
||||
properties.add(KUDU_REPLICA_SELECTION);
|
||||
properties.add(TABLE_NAME);
|
||||
properties.add(RETURN_COLUMNS);
|
||||
addProperties(properties);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
protected void addProperties(List<PropertyDescriptor> properties) {
|
||||
}
|
||||
|
||||
protected void createKuduClient(ConfigurationContext context) throws LoginException {
|
||||
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final KerberosUserService userService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
|
||||
if (credentialsService != null) {
|
||||
final String keytab = credentialsService.getKeytab();
|
||||
final String principal = credentialsService.getPrincipal();
|
||||
kerberosUser = loginKerberosUser(principal, keytab);
|
||||
if (userService != null) {
|
||||
kerberosUser = userService.createKerberosUser();
|
||||
kerberosUser.login();
|
||||
|
||||
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
|
||||
this.kuduClient = kerberosAction.execute();
|
||||
|
@ -182,14 +173,9 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
}
|
||||
}
|
||||
|
||||
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
|
||||
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
|
||||
kerberosUser.login();
|
||||
return kerberosUser;
|
||||
}
|
||||
|
||||
protected KuduClient buildClient(final String masters, final ConfigurationContext context) {
|
||||
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
|
||||
return new KuduClient.KuduClientBuilder(masters)
|
||||
.defaultOperationTimeoutMs(operationTimeout)
|
||||
|
@ -203,10 +189,8 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
*/
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||
|
||||
try {
|
||||
kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
|
||||
credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
if (kuduClient == null) {
|
||||
getLogger().debug("Setting up Kudu connection...");
|
||||
|
@ -214,7 +198,7 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
createKuduClient(context);
|
||||
getLogger().debug("Kudu connection successfully initialized");
|
||||
}
|
||||
} catch(Exception ex){
|
||||
} catch (final Exception ex) {
|
||||
getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
|
||||
throw new InitializationException(ex);
|
||||
}
|
||||
|
@ -228,15 +212,14 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
|
||||
//Result Schema
|
||||
resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
|
||||
|
||||
} catch (KuduException e) {
|
||||
} catch (final KuduException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRequiredKeys() {
|
||||
return new HashSet<>();
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,21 +229,17 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
|
||||
@Override
|
||||
public Optional<Record> lookup(Map<String, Object> coordinates) {
|
||||
Optional<Record> record;
|
||||
|
||||
if (kerberosUser != null) {
|
||||
final KerberosAction<Optional<Record>> kerberosAction = new KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger());
|
||||
record = kerberosAction.execute();
|
||||
if (kerberosUser == null) {
|
||||
return getRecord(coordinates);
|
||||
} else {
|
||||
record = getRecord(coordinates);
|
||||
final KerberosAction<Optional<Record>> kerberosAction = new KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger());
|
||||
return kerberosAction.execute();
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
private Optional<Record> getRecord(Map<String, Object> coordinates) {
|
||||
//Scanner
|
||||
KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table);
|
||||
final KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table);
|
||||
|
||||
builder.setProjectedColumnNames(columnNames);
|
||||
builder.replicaSelection(replicaSelection);
|
||||
|
@ -272,20 +251,22 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key), KuduPredicate.ComparisonOp.EQUAL, value))
|
||||
);
|
||||
|
||||
KuduScanner kuduScanner = builder.build();
|
||||
final KuduScanner kuduScanner = builder.build();
|
||||
|
||||
//Run lookup
|
||||
for ( RowResult row : kuduScanner){
|
||||
for (final RowResult row : kuduScanner) {
|
||||
final Map<String, Object> values = new HashMap<>();
|
||||
for(String columnName : columnNames){
|
||||
for (final String columnName : columnNames) {
|
||||
Object object;
|
||||
if(row.getColumnType(columnName) == Type.BINARY){
|
||||
if (row.getColumnType(columnName) == Type.BINARY) {
|
||||
object = Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
|
||||
} else {
|
||||
object = row.getObject(columnName);
|
||||
}
|
||||
|
||||
values.put(columnName, object);
|
||||
}
|
||||
|
||||
return Optional.of(new MapRecord(resultSchema, values));
|
||||
}
|
||||
|
||||
|
@ -293,8 +274,8 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
return Optional.empty();
|
||||
}
|
||||
|
||||
private List<String> getColumns(String columns){
|
||||
if(columns.equals("*")){
|
||||
private List<String> getColumns(final String columns) {
|
||||
if (columns.equals("*")) {
|
||||
return tableSchema
|
||||
.getColumns()
|
||||
.stream().map(ColumnSchema::getName)
|
||||
|
@ -306,50 +287,31 @@ public class KuduLookupService extends AbstractControllerService implements Reco
|
|||
|
||||
private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, List<String> columnNames){
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
for(String columnName : columnNames) {
|
||||
if(!kuduTableSchema.hasColumn(columnName)){
|
||||
for (final String columnName : columnNames) {
|
||||
if (!kuduTableSchema.hasColumn(columnName)) {
|
||||
throw new IllegalArgumentException("Column not found in Kudu table schema " + columnName);
|
||||
}
|
||||
ColumnSchema cs = kuduTableSchema.getColumn(columnName);
|
||||
switch (cs.getType()) {
|
||||
case INT8:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType()));
|
||||
break;
|
||||
case INT16:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType()));
|
||||
break;
|
||||
case INT32:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.INT.getDataType()));
|
||||
break;
|
||||
case INT64:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
|
||||
break;
|
||||
case DECIMAL:
|
||||
final ColumnTypeAttributes attributes = cs.getTypeAttributes();
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale())));
|
||||
break;
|
||||
case UNIXTIME_MICROS:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
|
||||
break;
|
||||
case BINARY:
|
||||
case STRING:
|
||||
case VARCHAR:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
|
||||
break;
|
||||
case DOUBLE:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType()));
|
||||
break;
|
||||
case BOOL:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType()));
|
||||
break;
|
||||
case FLOAT:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()));
|
||||
break;
|
||||
case DATE:
|
||||
fields.add(new RecordField(cs.getName(), RecordFieldType.DATE.getDataType()));
|
||||
break;
|
||||
}
|
||||
|
||||
final ColumnSchema cs = kuduTableSchema.getColumn(columnName);
|
||||
final ColumnTypeAttributes attributes = cs.getTypeAttributes();
|
||||
|
||||
final RecordField field = switch (cs.getType()) {
|
||||
case INT8 -> new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType());
|
||||
case INT16 -> new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType());
|
||||
case INT32 -> new RecordField(cs.getName(), RecordFieldType.INT.getDataType());
|
||||
case INT64 -> new RecordField(cs.getName(), RecordFieldType.LONG.getDataType());
|
||||
case DECIMAL -> new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale()));
|
||||
case UNIXTIME_MICROS -> new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType());
|
||||
case BINARY, STRING, VARCHAR -> new RecordField(cs.getName(), RecordFieldType.STRING.getDataType());
|
||||
case DOUBLE -> new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType());
|
||||
case BOOL -> new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType());
|
||||
case FLOAT -> new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType());
|
||||
case DATE -> new RecordField(cs.getName(), RecordFieldType.DATE.getDataType());
|
||||
};
|
||||
|
||||
fields.add(field);
|
||||
}
|
||||
|
||||
return new SimpleRecordSchema(fields);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,25 @@
|
|||
|
||||
package org.apache.nifi.processors.kudu;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Date;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.kudu.ColumnSchema;
|
||||
import org.apache.kudu.ColumnTypeAttributes;
|
||||
import org.apache.kudu.Schema;
|
||||
|
@ -33,18 +52,12 @@ import org.apache.kudu.client.SessionConfiguration;
|
|||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
@ -53,29 +66,6 @@ import org.apache.nifi.serialization.record.field.FieldConverter;
|
|||
import org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter;
|
||||
import org.apache.nifi.serialization.record.type.DecimalDataType;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Date;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||
|
||||
|
@ -87,14 +77,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
|
||||
.name("kerberos-credentials-service")
|
||||
.displayName("Kerberos Credentials Service")
|
||||
.description("Specifies the Kerberos Credentials to use for authentication")
|
||||
.required(false)
|
||||
.identifiesControllerService(KerberosCredentialsService.class)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-user-service")
|
||||
.displayName("Kerberos User Service")
|
||||
|
@ -103,25 +85,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
|
||||
.name("kudu-operations-timeout-ms")
|
||||
.displayName("Kudu Operation Timeout")
|
||||
|
@ -187,25 +150,13 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
|
||||
protected void createKerberosUserAndOrKuduClient(ProcessContext context) {
|
||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
if (kerberosUserService != null) {
|
||||
kerberosUser = kerberosUserService.createKerberosUser();
|
||||
kerberosUser.login();
|
||||
createKuduClient(context);
|
||||
} else {
|
||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
if (credentialsService != null) {
|
||||
kerberosUser = createKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context);
|
||||
kerberosUser.login(); // login creates the kudu client as well
|
||||
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
kerberosUser = createKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context);
|
||||
kerberosUser.login(); // login creates the kudu client as well
|
||||
} else {
|
||||
createKuduClient(context);
|
||||
}
|
||||
if (kerberosUserService == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
kerberosUser = kerberosUserService.createKerberosUser();
|
||||
kerberosUser.login();
|
||||
createKuduClient(context);
|
||||
}
|
||||
|
||||
protected void createKuduClient(ProcessContext context) {
|
||||
|
@ -250,8 +201,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
);
|
||||
|
||||
return new KuduClient.KuduClientBuilder(masters)
|
||||
.defaultAdminOperationTimeoutMs(adminOperationTimeout)
|
||||
.defaultOperationTimeoutMs(operationTimeout)
|
||||
.defaultSocketReadTimeoutMs(adminOperationTimeout)
|
||||
.saslProtocolName(saslProtocolName)
|
||||
.workerCount(workerCount)
|
||||
.nioExecutor(nioExecutor)
|
||||
|
@ -280,86 +231,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) {
|
||||
return new KerberosKeytabUser(principal, keytab) {
|
||||
@Override
|
||||
public synchronized void login() {
|
||||
if (isLoggedIn()) {
|
||||
return;
|
||||
}
|
||||
|
||||
super.login();
|
||||
createKuduClient(context);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) {
|
||||
return new KerberosPasswordUser(principal, password) {
|
||||
@Override
|
||||
public synchronized void login() {
|
||||
if (isLoggedIn()) {
|
||||
return;
|
||||
}
|
||||
|
||||
super.login();
|
||||
createKuduClient(context);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
final boolean kerberosPrincipalProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue());
|
||||
final boolean kerberosPasswordProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue());
|
||||
|
||||
if (kerberosPrincipalProvided && !kerberosPasswordProvided) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PASSWORD.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a password must be provided for the given principal")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (kerberosPasswordProvided && !kerberosPrincipalProvided) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PRINCIPAL.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a principal must be provided for the given password")
|
||||
.build());
|
||||
}
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||
|
||||
if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (kerberosUserService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_USER_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal/password and kerberos user service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (kerberosUserService != null && kerberosCredentialsService != null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_USER_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos user service and kerberos credentials service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void shutdown() throws Exception {
|
||||
|
@ -407,51 +278,28 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
|
||||
final String dataTypeFormat = fieldDataType.map(DataType::getFormat).orElse(null);
|
||||
switch (colType) {
|
||||
case BOOL:
|
||||
row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
|
||||
break;
|
||||
case INT8:
|
||||
row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName));
|
||||
break;
|
||||
case INT16:
|
||||
row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName));
|
||||
break;
|
||||
case INT32:
|
||||
row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName));
|
||||
break;
|
||||
case INT64:
|
||||
row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName));
|
||||
break;
|
||||
case UNIXTIME_MICROS:
|
||||
case BOOL -> row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
|
||||
case INT8 -> row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName));
|
||||
case INT16 -> row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName));
|
||||
case INT32 -> row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName));
|
||||
case INT64 -> row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName));
|
||||
case UNIXTIME_MICROS -> {
|
||||
final Optional<DataType> optionalDataType = record.getSchema().getDataType(recordFieldName);
|
||||
final Optional<String> optionalPattern = getTimestampPattern(optionalDataType);
|
||||
final Optional<String> optionalPattern = getTimestampPattern(optionalDataType.orElse(null));
|
||||
final Timestamp timestamp = TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
|
||||
row.addTimestamp(columnIndex, timestamp);
|
||||
break;
|
||||
case STRING:
|
||||
row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
|
||||
break;
|
||||
case BINARY:
|
||||
row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes());
|
||||
break;
|
||||
case FLOAT:
|
||||
row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
|
||||
break;
|
||||
case DOUBLE:
|
||||
row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
|
||||
break;
|
||||
case DECIMAL:
|
||||
row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
|
||||
break;
|
||||
case VARCHAR:
|
||||
row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
|
||||
break;
|
||||
case DATE:
|
||||
}
|
||||
case STRING -> row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
|
||||
case BINARY -> row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes());
|
||||
case FLOAT -> row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
|
||||
case DOUBLE -> row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
|
||||
case DECIMAL -> row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
|
||||
case VARCHAR -> row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat));
|
||||
case DATE -> {
|
||||
final String dateFormat = dataTypeFormat == null ? RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat;
|
||||
row.addDate(columnIndex, getDate(value, recordFieldName, dateFormat));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException(String.format("unknown column type %s", colType));
|
||||
}
|
||||
default -> throw new IllegalStateException(String.format("unknown column type %s", colType));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -460,20 +308,15 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
/**
|
||||
* Get Timestamp Pattern and override Timestamp Record Field pattern with optional microsecond pattern
|
||||
*
|
||||
* @param optionalDataType Optional Data Type
|
||||
* @param dataType Data Type
|
||||
* @return Optional Timestamp Pattern
|
||||
*/
|
||||
private Optional<String> getTimestampPattern(final Optional<DataType> optionalDataType) {
|
||||
String pattern = null;
|
||||
if (optionalDataType.isPresent()) {
|
||||
final DataType dataType = optionalDataType.get();
|
||||
if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) {
|
||||
pattern = MICROSECOND_TIMESTAMP_PATTERN;
|
||||
} else {
|
||||
pattern = dataType.getFormat();
|
||||
}
|
||||
private Optional<String> getTimestampPattern(final DataType dataType) {
|
||||
if (dataType == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.ofNullable(pattern);
|
||||
|
||||
return Optional.of(RecordFieldType.TIMESTAMP == dataType.getFieldType() ? MICROSECOND_TIMESTAMP_PATTERN : dataType.getFormat());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -493,33 +336,20 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
* Converts a NiFi DataType to it's equivalent Kudu Type.
|
||||
*/
|
||||
private Type toKuduType(DataType nifiType) {
|
||||
switch (nifiType.getFieldType()) {
|
||||
case BOOLEAN:
|
||||
return Type.BOOL;
|
||||
case BYTE:
|
||||
return Type.INT8;
|
||||
case SHORT:
|
||||
return Type.INT16;
|
||||
case INT:
|
||||
return Type.INT32;
|
||||
case LONG:
|
||||
return Type.INT64;
|
||||
case FLOAT:
|
||||
return Type.FLOAT;
|
||||
case DOUBLE:
|
||||
return Type.DOUBLE;
|
||||
case DECIMAL:
|
||||
return Type.DECIMAL;
|
||||
case TIMESTAMP:
|
||||
return Type.UNIXTIME_MICROS;
|
||||
case CHAR:
|
||||
case STRING:
|
||||
return Type.STRING;
|
||||
case DATE:
|
||||
return Type.DATE;
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("unsupported type %s", nifiType));
|
||||
}
|
||||
return switch (nifiType.getFieldType()) {
|
||||
case BOOLEAN -> Type.BOOL;
|
||||
case BYTE -> Type.INT8;
|
||||
case SHORT -> Type.INT16;
|
||||
case INT -> Type.INT32;
|
||||
case LONG -> Type.INT64;
|
||||
case FLOAT -> Type.FLOAT;
|
||||
case DOUBLE -> Type.DOUBLE;
|
||||
case DECIMAL -> Type.DECIMAL;
|
||||
case TIMESTAMP -> Type.UNIXTIME_MICROS;
|
||||
case CHAR, STRING -> Type.STRING;
|
||||
case DATE -> Type.DATE;
|
||||
default -> throw new IllegalArgumentException(String.format("unsupported type %s", nifiType));
|
||||
};
|
||||
}
|
||||
|
||||
private ColumnTypeAttributes getKuduTypeAttributes(final DataType nifiType) {
|
||||
|
|
|
@ -17,6 +17,19 @@
|
|||
|
||||
package org.apache.nifi.processors.kudu;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import org.apache.kudu.Schema;
|
||||
import org.apache.kudu.client.KuduClient;
|
||||
import org.apache.kudu.client.KuduException;
|
||||
|
@ -61,23 +74,9 @@ import org.apache.nifi.serialization.record.RecordField;
|
|||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.InputStream;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
|
||||
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
|
||||
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
|
||||
import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
|
||||
|
||||
@SystemResourceConsideration(resource = SystemResource.MEMORY)
|
||||
@SupportsBatching
|
||||
|
@ -211,26 +210,27 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
|
||||
protected static final PropertyDescriptor FLUSH_MODE = new Builder()
|
||||
.name("Flush Mode")
|
||||
.description("Set the new flush mode for a kudu session.\n" +
|
||||
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
|
||||
"AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" +
|
||||
" operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" +
|
||||
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
|
||||
.description("""
|
||||
Set the new flush mode for a kudu session.
|
||||
AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.
|
||||
AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory
|
||||
operations but it may have to wait when the buffer is full and there's another buffer being flushed.
|
||||
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.
|
||||
""")
|
||||
.allowableValues(SessionConfiguration.FlushMode.values())
|
||||
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new Builder()
|
||||
.name("FlowFiles per Batch")
|
||||
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
|
||||
.description("The maximum number of FlowFiles to process in a single execution, between 1 and 100,000. " +
|
||||
"Depending on your memory size, and data size per row set an appropriate batch size " +
|
||||
"for the number of FlowFiles to process per client connection setup." +
|
||||
"Gradually increase this number, only if your FlowFiles typically contain a few records.")
|
||||
.defaultValue("1")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
|
||||
.addValidator(StandardValidators.createLongValidator(1, 100_000, true))
|
||||
.expressionLanguageSupported(ENVIRONMENT)
|
||||
.build();
|
||||
|
||||
|
@ -282,9 +282,6 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
properties.add(TABLE_NAME);
|
||||
properties.add(FAILURE_STRATEGY);
|
||||
properties.add(KERBEROS_USER_SERVICE);
|
||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
properties.add(KERBEROS_PRINCIPAL);
|
||||
properties.add(KERBEROS_PASSWORD);
|
||||
properties.add(SKIP_HEAD_LINE);
|
||||
properties.add(LOWERCASE_FIELD_NAMES);
|
||||
properties.add(HANDLE_SCHEMA_DRIFT);
|
||||
|
@ -447,7 +444,7 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
dataRecords = Collections.singletonList(record);
|
||||
} else {
|
||||
final RecordPathResult result = dataRecordPath.evaluate(record);
|
||||
final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
|
||||
final List<FieldValue> fieldValues = result.getSelectedFields().toList();
|
||||
if (fieldValues.isEmpty()) {
|
||||
throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
|
||||
}
|
||||
|
@ -530,7 +527,7 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
recordFields = record.getSchema().getFields();
|
||||
} else {
|
||||
final RecordPathResult recordPathResult = dataRecordPath.evaluate(record);
|
||||
final List<FieldValue> fieldValues = recordPathResult.getSelectedFields().collect(Collectors.toList());
|
||||
final List<FieldValue> fieldValues = recordPathResult.getSelectedFields().toList();
|
||||
|
||||
recordFields = new ArrayList<>();
|
||||
for (final FieldValue fieldValue : fieldValues) {
|
||||
|
@ -548,7 +545,7 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
|
||||
final List<RecordField> missing = recordFields.stream()
|
||||
.filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
|
||||
if (missing.isEmpty()) {
|
||||
getLogger().debug("No schema drift detected for {}", flowFile);
|
||||
|
@ -649,10 +646,8 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
boolean lowercaseFields, KuduTable kuduTable) {
|
||||
Operation operation;
|
||||
switch (operationType) {
|
||||
case INSERT:
|
||||
operation = kuduTable.newInsert();
|
||||
break;
|
||||
case INSERT_IGNORE:
|
||||
case INSERT -> operation = kuduTable.newInsert();
|
||||
case INSERT_IGNORE -> {
|
||||
// If the target Kudu cluster does not support ignore operations use an insert.
|
||||
// The legacy session based insert ignore will be used instead.
|
||||
if (!supportsInsertIgnoreOp) {
|
||||
|
@ -660,24 +655,13 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
} else {
|
||||
operation = kuduTable.newInsertIgnore();
|
||||
}
|
||||
break;
|
||||
case UPSERT:
|
||||
operation = kuduTable.newUpsert();
|
||||
break;
|
||||
case UPDATE:
|
||||
operation = kuduTable.newUpdate();
|
||||
break;
|
||||
case UPDATE_IGNORE:
|
||||
operation = kuduTable.newUpdateIgnore();
|
||||
break;
|
||||
case DELETE:
|
||||
operation = kuduTable.newDelete();
|
||||
break;
|
||||
case DELETE_IGNORE:
|
||||
operation = kuduTable.newDeleteIgnore();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
|
||||
}
|
||||
case UPSERT -> operation = kuduTable.newUpsert();
|
||||
case UPDATE -> operation = kuduTable.newUpdate();
|
||||
case UPDATE_IGNORE -> operation = kuduTable.newUpdateIgnore();
|
||||
case DELETE -> operation = kuduTable.newDelete();
|
||||
case DELETE_IGNORE -> operation = kuduTable.newDeleteIgnore();
|
||||
default -> throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
|
||||
}
|
||||
buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
|
||||
return operation;
|
||||
|
@ -693,7 +677,7 @@ public class PutKudu extends AbstractKuduProcessor {
|
|||
@Override
|
||||
public OperationType apply(final Record record) {
|
||||
final RecordPathResult recordPathResult = recordPath.evaluate(record);
|
||||
final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
|
||||
final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().toList();
|
||||
if (resultList.isEmpty()) {
|
||||
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results");
|
||||
}
|
||||
|
|
|
@ -17,47 +17,37 @@
|
|||
|
||||
package org.apache.nifi.processors.kudu;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.kudu.Schema;
|
||||
import org.apache.kudu.client.Delete;
|
||||
import org.apache.kudu.client.DeleteIgnore;
|
||||
import org.apache.kudu.client.Insert;
|
||||
import org.apache.kudu.client.InsertIgnore;
|
||||
import org.apache.kudu.client.KuduClient;
|
||||
import org.apache.kudu.client.KuduSession;
|
||||
import org.apache.kudu.client.KuduTable;
|
||||
import org.apache.kudu.client.Delete;
|
||||
import org.apache.kudu.client.Insert;
|
||||
import org.apache.kudu.client.Operation;
|
||||
import org.apache.kudu.client.Update;
|
||||
import org.apache.kudu.client.UpdateIgnore;
|
||||
import org.apache.kudu.client.Upsert;
|
||||
import org.apache.kudu.client.Update;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MockPutKudu extends PutKudu {
|
||||
|
||||
private KuduSession session;
|
||||
private LinkedList<Operation> opQueue;
|
||||
private final KuduSession session;
|
||||
private final LinkedList<Operation> opQueue;
|
||||
|
||||
// Atomic reference is used as the set and use of the schema are in different thread
|
||||
private AtomicReference<Schema> tableSchema = new AtomicReference<>();
|
||||
|
||||
private boolean loggedIn = false;
|
||||
private boolean loggedOut = false;
|
||||
private final AtomicReference<Schema> tableSchema = new AtomicReference<>();
|
||||
|
||||
public MockPutKudu() {
|
||||
this(mock(KuduSession.class));
|
||||
|
@ -78,31 +68,15 @@ public class MockPutKudu extends PutKudu {
|
|||
boolean lowercaseFields, KuduTable kuduTable) {
|
||||
Operation operation = opQueue.poll();
|
||||
if (operation == null) {
|
||||
switch (operationType) {
|
||||
case INSERT:
|
||||
operation = mock(Insert.class);
|
||||
break;
|
||||
case INSERT_IGNORE:
|
||||
operation = mock(InsertIgnore.class);
|
||||
break;
|
||||
case UPSERT:
|
||||
operation = mock(Upsert.class);
|
||||
break;
|
||||
case UPDATE:
|
||||
operation = mock(Update.class);
|
||||
break;
|
||||
case UPDATE_IGNORE:
|
||||
operation = mock(UpdateIgnore.class);
|
||||
break;
|
||||
case DELETE:
|
||||
operation = mock(Delete.class);
|
||||
break;
|
||||
case DELETE_IGNORE:
|
||||
operation = mock(DeleteIgnore.class);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
|
||||
}
|
||||
operation = switch (operationType) {
|
||||
case INSERT -> mock(Insert.class);
|
||||
case INSERT_IGNORE -> mock(InsertIgnore.class);
|
||||
case UPSERT -> mock(Upsert.class);
|
||||
case UPDATE -> mock(Update.class);
|
||||
case UPDATE_IGNORE -> mock(UpdateIgnore.class);
|
||||
case DELETE -> mock(Delete.class);
|
||||
case DELETE_IGNORE -> mock(DeleteIgnore.class);
|
||||
};
|
||||
}
|
||||
return operation;
|
||||
}
|
||||
|
@ -140,86 +114,6 @@ public class MockPutKudu extends PutKudu {
|
|||
actionOnKuduClient.accept(client);
|
||||
}
|
||||
|
||||
public boolean loggedIn() {
|
||||
return loggedIn;
|
||||
}
|
||||
|
||||
public boolean loggedOut() {
|
||||
return loggedOut;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) {
|
||||
return createMockKerberosUser(principal);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) {
|
||||
return createMockKerberosUser(principal);
|
||||
}
|
||||
|
||||
private KerberosUser createMockKerberosUser(final String principal) {
|
||||
return new KerberosUser() {
|
||||
|
||||
@Override
|
||||
public void login() {
|
||||
loggedIn = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logout() {
|
||||
loggedOut = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T doAs(final PrivilegedAction<T> action) throws IllegalStateException {
|
||||
return action.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T doAs(PrivilegedAction<T> action, ClassLoader contextClassLoader) throws IllegalStateException {
|
||||
return action.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T doAs(final PrivilegedExceptionAction<T> action) throws IllegalStateException, PrivilegedActionException {
|
||||
try {
|
||||
return action.run();
|
||||
} catch (Exception e) {
|
||||
throw new PrivilegedActionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T doAs(PrivilegedExceptionAction<T> action, ClassLoader contextClassLoader) throws IllegalStateException, PrivilegedActionException {
|
||||
try {
|
||||
return action.run();
|
||||
} catch (Exception e) {
|
||||
throw new PrivilegedActionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkTGTAndRelogin() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLoggedIn() {
|
||||
return loggedIn && !loggedOut;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPrincipal() {
|
||||
return principal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppConfigurationEntry getConfigurationEntry() {
|
||||
return new AppConfigurationEntry("LoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.emptyMap());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KuduSession createKuduSession(final KuduClient client) {
|
||||
|
|
|
@ -17,6 +17,20 @@
|
|||
|
||||
package org.apache.nifi.processors.kudu;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.kudu.ColumnSchema;
|
||||
import org.apache.kudu.ColumnTypeAttributes;
|
||||
import org.apache.kudu.Schema;
|
||||
|
@ -29,11 +43,8 @@ import org.apache.kudu.client.PartialRow;
|
|||
import org.apache.kudu.client.RowError;
|
||||
import org.apache.kudu.client.RowErrorsAndOverflowStatus;
|
||||
import org.apache.kudu.client.SessionConfiguration.FlushMode;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
|
@ -58,29 +69,11 @@ import org.junit.jupiter.api.Test;
|
|||
import org.mockito.Mockito;
|
||||
import org.mockito.stubbing.OngoingStubbing;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
|
||||
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
|
||||
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -148,50 +141,6 @@ public class TestPutKudu {
|
|||
testRunner.enableControllerService(readerFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidate() throws InitializationException {
|
||||
createRecordReader(1);
|
||||
|
||||
testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
|
||||
testRunner.assertNotValid();
|
||||
|
||||
testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
|
||||
testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
|
||||
testRunner.assertNotValid();
|
||||
|
||||
testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
|
||||
testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
|
||||
testRunner.assertValid();
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
|
||||
testRunner.addControllerService("kerb", kerberosCredentialsService);
|
||||
testRunner.enableControllerService(kerberosCredentialsService);
|
||||
testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
|
||||
testRunner.assertNotValid();
|
||||
|
||||
testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
|
||||
testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD);
|
||||
testRunner.assertValid();
|
||||
|
||||
final KerberosUserService kerberosUserService = enableKerberosUserService(testRunner);
|
||||
testRunner.setProperty(PutKudu.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
|
||||
testRunner.assertNotValid();
|
||||
|
||||
testRunner.removeProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE);
|
||||
testRunner.assertValid();
|
||||
|
||||
testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
|
||||
testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
|
||||
testRunner.assertNotValid();
|
||||
}
|
||||
|
||||
private KerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException {
|
||||
final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
|
||||
when(kerberosUserService.getIdentifier()).thenReturn("userService1");
|
||||
runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
|
||||
runner.enableControllerService(kerberosUserService);
|
||||
return kerberosUserService;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteKuduWithDefaults() throws InitializationException {
|
||||
|
@ -221,39 +170,6 @@ public class TestPutKudu {
|
|||
assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKerberosEnabled() throws InitializationException {
|
||||
createRecordReader(1);
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
|
||||
testRunner.addControllerService("kerb", kerberosCredentialsService);
|
||||
testRunner.enableControllerService(kerberosCredentialsService);
|
||||
|
||||
testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
|
||||
|
||||
testRunner.run(1, false);
|
||||
|
||||
final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
|
||||
assertTrue(proc.loggedIn());
|
||||
assertFalse(proc.loggedOut());
|
||||
|
||||
testRunner.run(1, true, false);
|
||||
assertTrue(proc.loggedOut());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsecureClient() throws InitializationException {
|
||||
createRecordReader(1);
|
||||
|
||||
testRunner.run(1, false);
|
||||
|
||||
final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
|
||||
assertFalse(proc.loggedIn());
|
||||
assertFalse(proc.loggedOut());
|
||||
|
||||
testRunner.run(1, true, false);
|
||||
assertFalse(proc.loggedOut());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
|
@ -751,7 +667,7 @@ public class TestPutKudu {
|
|||
flowFileResponses = flowFileResponses.subList(sliceSize, flowFileResponses.size());
|
||||
|
||||
List<OperationResponse> batch = new ArrayList<>();
|
||||
for (OperationResponse response : slice.stream().flatMap(List::stream).collect(Collectors.toList())) {
|
||||
for (OperationResponse response : slice.stream().flatMap(List::stream).toList()) {
|
||||
if (batch.size() == batchSize) {
|
||||
flushes.add(batch);
|
||||
batch = new ArrayList<>();
|
||||
|
@ -814,24 +730,4 @@ public class TestPutKudu {
|
|||
public void testKuduPartialFailuresOnManualFlush() throws Exception {
|
||||
testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
|
||||
}
|
||||
|
||||
public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService {
|
||||
private final String keytab;
|
||||
private final String principal;
|
||||
|
||||
public MockKerberosCredentialsService(final String keytab, final String principal) {
|
||||
this.keytab = keytab;
|
||||
this.principal = principal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeytab() {
|
||||
return keytab;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPrincipal() {
|
||||
return principal;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue