diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml index 1d23041702..36c48656be 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml @@ -63,11 +63,6 @@ 2.0.0-SNAPSHOT provided - - org.apache.nifi - nifi-kerberos-credentials-service-api - provided - org.apache.nifi nifi-record @@ -88,6 +83,12 @@ kudu-client ${kudu.version} + + org.apache.nifi + nifi-kerberos-user-service-api + 2.0.0-SNAPSHOT + provided + diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java index b208186fac..a53444340d 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java @@ -16,6 +16,18 @@ */ package org.apache.nifi.controller.kudu; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.security.auth.login.LoginException; import org.apache.kudu.ColumnSchema; import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Schema; @@ -38,12 +50,11 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.security.krb.KerberosAction; -import org.apache.nifi.security.krb.KerberosKeytabUser; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; @@ -52,20 +63,6 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import javax.security.auth.login.LoginException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Base64; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - @CapabilityDescription("Lookup a record from Kudu Server associated with the specified key. Binary columns are base64 encoded. Only one matched row will be returned") @Tags({"lookup", "enrich", "key", "value", "kudu"}) @@ -80,13 +77,12 @@ public class KuduLookupService extends AbstractControllerService implements Reco .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); - public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() - .name("kudu-lu-kerberos-credentials-service") - .displayName("Kerberos Credentials Service") - .description("Specifies the Kerberos Credentials to use for authentication") - .required(false) - .identifiesControllerService(KerberosCredentialsService.class) - .build(); + public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() + .name("Kerberos User Service") + .description("Specifies the Kerberos Credentials to use for authentication") + .required(false) + .identifiesControllerService(KerberosUserService.class) + .build(); public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder() .name("kudu-lu-operations-timeout-ms") @@ -137,7 +133,6 @@ public class KuduLookupService extends AbstractControllerService implements Reco protected List properties; - protected KerberosCredentialsService credentialsService; private volatile KerberosUser kerberosUser; protected String kuduMasters; @@ -154,26 +149,22 @@ public class KuduLookupService extends AbstractControllerService implements Reco protected void init(final ControllerServiceInitializationContext context) { final List properties = new ArrayList<>(); properties.add(KUDU_MASTERS); - properties.add(KERBEROS_CREDENTIALS_SERVICE); + properties.add(KERBEROS_USER_SERVICE); properties.add(KUDU_OPERATION_TIMEOUT_MS); properties.add(KUDU_REPLICA_SELECTION); properties.add(TABLE_NAME); properties.add(RETURN_COLUMNS); - addProperties(properties); this.properties = Collections.unmodifiableList(properties); } - protected void addProperties(List properties) { - } protected void createKuduClient(ConfigurationContext context) throws LoginException { final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); - final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + final KerberosUserService userService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - if (credentialsService != null) { - final String keytab = credentialsService.getKeytab(); - final String principal = credentialsService.getPrincipal(); - kerberosUser = loginKerberosUser(principal, keytab); + if (userService != null) { + kerberosUser = userService.createKerberosUser(); + kerberosUser.login(); final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger()); this.kuduClient = kerberosAction.execute(); @@ -182,14 +173,9 @@ public class KuduLookupService extends AbstractControllerService implements Reco } } - protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException { - final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab); - kerberosUser.login(); - return kerberosUser; - } protected KuduClient buildClient(final String masters, final ConfigurationContext context) { - final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); return new KuduClient.KuduClientBuilder(masters) .defaultOperationTimeoutMs(operationTimeout) @@ -203,10 +189,8 @@ public class KuduLookupService extends AbstractControllerService implements Reco */ @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException { - try { kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); - credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); if (kuduClient == null) { getLogger().debug("Setting up Kudu connection..."); @@ -214,7 +198,7 @@ public class KuduLookupService extends AbstractControllerService implements Reco createKuduClient(context); getLogger().debug("Kudu connection successfully initialized"); } - } catch(Exception ex){ + } catch (final Exception ex) { getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); throw new InitializationException(ex); } @@ -228,15 +212,14 @@ public class KuduLookupService extends AbstractControllerService implements Reco //Result Schema resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames); - - } catch (KuduException e) { + } catch (final KuduException e) { throw new IllegalArgumentException(e); } } @Override public Set getRequiredKeys() { - return new HashSet<>(); + return Collections.emptySet(); } @Override @@ -246,21 +229,17 @@ public class KuduLookupService extends AbstractControllerService implements Reco @Override public Optional lookup(Map coordinates) { - Optional record; - - if (kerberosUser != null) { - final KerberosAction> kerberosAction = new KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger()); - record = kerberosAction.execute(); + if (kerberosUser == null) { + return getRecord(coordinates); } else { - record = getRecord(coordinates); + final KerberosAction> kerberosAction = new KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger()); + return kerberosAction.execute(); } - - return record; } private Optional getRecord(Map coordinates) { //Scanner - KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table); + final KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table); builder.setProjectedColumnNames(columnNames); builder.replicaSelection(replicaSelection); @@ -272,20 +251,22 @@ public class KuduLookupService extends AbstractControllerService implements Reco builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key), KuduPredicate.ComparisonOp.EQUAL, value)) ); - KuduScanner kuduScanner = builder.build(); + final KuduScanner kuduScanner = builder.build(); //Run lookup - for ( RowResult row : kuduScanner){ + for (final RowResult row : kuduScanner) { final Map values = new HashMap<>(); - for(String columnName : columnNames){ + for (final String columnName : columnNames) { Object object; - if(row.getColumnType(columnName) == Type.BINARY){ + if (row.getColumnType(columnName) == Type.BINARY) { object = Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName)); } else { object = row.getObject(columnName); } + values.put(columnName, object); } + return Optional.of(new MapRecord(resultSchema, values)); } @@ -293,8 +274,8 @@ public class KuduLookupService extends AbstractControllerService implements Reco return Optional.empty(); } - private List getColumns(String columns){ - if(columns.equals("*")){ + private List getColumns(final String columns) { + if (columns.equals("*")) { return tableSchema .getColumns() .stream().map(ColumnSchema::getName) @@ -306,50 +287,31 @@ public class KuduLookupService extends AbstractControllerService implements Reco private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, List columnNames){ final List fields = new ArrayList<>(); - for(String columnName : columnNames) { - if(!kuduTableSchema.hasColumn(columnName)){ + for (final String columnName : columnNames) { + if (!kuduTableSchema.hasColumn(columnName)) { throw new IllegalArgumentException("Column not found in Kudu table schema " + columnName); } - ColumnSchema cs = kuduTableSchema.getColumn(columnName); - switch (cs.getType()) { - case INT8: - fields.add(new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType())); - break; - case INT16: - fields.add(new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType())); - break; - case INT32: - fields.add(new RecordField(cs.getName(), RecordFieldType.INT.getDataType())); - break; - case INT64: - fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType())); - break; - case DECIMAL: - final ColumnTypeAttributes attributes = cs.getTypeAttributes(); - fields.add(new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale()))); - break; - case UNIXTIME_MICROS: - fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType())); - break; - case BINARY: - case STRING: - case VARCHAR: - fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType())); - break; - case DOUBLE: - fields.add(new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType())); - break; - case BOOL: - fields.add(new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType())); - break; - case FLOAT: - fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType())); - break; - case DATE: - fields.add(new RecordField(cs.getName(), RecordFieldType.DATE.getDataType())); - break; - } + + final ColumnSchema cs = kuduTableSchema.getColumn(columnName); + final ColumnTypeAttributes attributes = cs.getTypeAttributes(); + + final RecordField field = switch (cs.getType()) { + case INT8 -> new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType()); + case INT16 -> new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType()); + case INT32 -> new RecordField(cs.getName(), RecordFieldType.INT.getDataType()); + case INT64 -> new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()); + case DECIMAL -> new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale())); + case UNIXTIME_MICROS -> new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()); + case BINARY, STRING, VARCHAR -> new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()); + case DOUBLE -> new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType()); + case BOOL -> new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType()); + case FLOAT -> new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()); + case DATE -> new RecordField(cs.getName(), RecordFieldType.DATE.getDataType()); + }; + + fields.add(field); } + return new SimpleRecordSchema(fields); } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index dd6801c7b4..54b71e5c29 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -17,6 +17,25 @@ package org.apache.nifi.processors.kudu; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import org.apache.kudu.ColumnSchema; import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Schema; @@ -33,18 +52,12 @@ import org.apache.kudu.client.SessionConfiguration; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.krb.KerberosAction; -import org.apache.nifi.security.krb.KerberosKeytabUser; -import org.apache.nifi.security.krb.KerberosPasswordUser; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; @@ -53,29 +66,6 @@ import org.apache.nifi.serialization.record.field.FieldConverter; import org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter; import org.apache.nifi.serialization.record.type.DecimalDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; -import org.apache.nifi.util.StringUtils; - -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; public abstract class AbstractKuduProcessor extends AbstractProcessor { @@ -87,14 +77,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); - static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder() - .name("kerberos-credentials-service") - .displayName("Kerberos Credentials Service") - .description("Specifies the Kerberos Credentials to use for authentication") - .required(false) - .identifiesControllerService(KerberosCredentialsService.class) - .build(); - static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() .name("kerberos-user-service") .displayName("Kerberos User Service") @@ -103,25 +85,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .required(false) .build(); - static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() - .name("kerberos-principal") - .displayName("Kerberos Principal") - .description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - - static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder() - .name("kerberos-password") - .displayName("Kerberos Password") - .description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder() .name("kudu-operations-timeout-ms") .displayName("Kudu Operation Timeout") @@ -187,25 +150,13 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { protected void createKerberosUserAndOrKuduClient(ProcessContext context) { final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - if (kerberosUserService != null) { - kerberosUser = kerberosUserService.createKerberosUser(); - kerberosUser.login(); - createKuduClient(context); - } else { - final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); - final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); - final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); - - if (credentialsService != null) { - kerberosUser = createKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context); - kerberosUser.login(); // login creates the kudu client as well - } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) { - kerberosUser = createKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context); - kerberosUser.login(); // login creates the kudu client as well - } else { - createKuduClient(context); - } + if (kerberosUserService == null) { + return; } + + kerberosUser = kerberosUserService.createKerberosUser(); + kerberosUser.login(); + createKuduClient(context); } protected void createKuduClient(ProcessContext context) { @@ -250,8 +201,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { ); return new KuduClient.KuduClientBuilder(masters) + .defaultAdminOperationTimeoutMs(adminOperationTimeout) .defaultOperationTimeoutMs(operationTimeout) - .defaultSocketReadTimeoutMs(adminOperationTimeout) .saslProtocolName(saslProtocolName) .workerCount(workerCount) .nioExecutor(nioExecutor) @@ -280,86 +231,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } } - protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) { - return new KerberosKeytabUser(principal, keytab) { - @Override - public synchronized void login() { - if (isLoggedIn()) { - return; - } - - super.login(); - createKuduClient(context); - } - }; - } - - protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) { - return new KerberosPasswordUser(principal, password) { - @Override - public synchronized void login() { - if (isLoggedIn()) { - return; - } - - super.login(); - createKuduClient(context); - } - }; - } - - @Override - protected Collection customValidate(ValidationContext context) { - final List results = new ArrayList<>(); - - final boolean kerberosPrincipalProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue()); - final boolean kerberosPasswordProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue()); - - if (kerberosPrincipalProvided && !kerberosPasswordProvided) { - results.add(new ValidationResult.Builder() - .subject(KERBEROS_PASSWORD.getDisplayName()) - .valid(false) - .explanation("a password must be provided for the given principal") - .build()); - } - - if (kerberosPasswordProvided && !kerberosPrincipalProvided) { - results.add(new ValidationResult.Builder() - .subject(KERBEROS_PRINCIPAL.getDisplayName()) - .valid(false) - .explanation("a principal must be provided for the given password") - .build()); - } - - final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); - final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - - if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) { - results.add(new ValidationResult.Builder() - .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName()) - .valid(false) - .explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time") - .build()); - } - - if (kerberosUserService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) { - results.add(new ValidationResult.Builder() - .subject(KERBEROS_USER_SERVICE.getDisplayName()) - .valid(false) - .explanation("kerberos principal/password and kerberos user service cannot be configured at the same time") - .build()); - } - - if (kerberosUserService != null && kerberosCredentialsService != null) { - results.add(new ValidationResult.Builder() - .subject(KERBEROS_USER_SERVICE.getDisplayName()) - .valid(false) - .explanation("kerberos user service and kerberos credentials service cannot be configured at the same time") - .build()); - } - - return results; - } @OnStopped public void shutdown() throws Exception { @@ -407,51 +278,28 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { final Optional fieldDataType = record.getSchema().getDataType(recordFieldName); final String dataTypeFormat = fieldDataType.map(DataType::getFormat).orElse(null); switch (colType) { - case BOOL: - row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName)); - break; - case INT8: - row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName)); - break; - case INT16: - row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName)); - break; - case INT32: - row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName)); - break; - case INT64: - row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName)); - break; - case UNIXTIME_MICROS: + case BOOL -> row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName)); + case INT8 -> row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName)); + case INT16 -> row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName)); + case INT32 -> row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName)); + case INT64 -> row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName)); + case UNIXTIME_MICROS -> { final Optional optionalDataType = record.getSchema().getDataType(recordFieldName); - final Optional optionalPattern = getTimestampPattern(optionalDataType); + final Optional optionalPattern = getTimestampPattern(optionalDataType.orElse(null)); final Timestamp timestamp = TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName); row.addTimestamp(columnIndex, timestamp); - break; - case STRING: - row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat)); - break; - case BINARY: - row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes()); - break; - case FLOAT: - row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName)); - break; - case DOUBLE: - row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName)); - break; - case DECIMAL: - row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat))); - break; - case VARCHAR: - row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat)); - break; - case DATE: + } + case STRING -> row.addString(columnIndex, DataTypeUtils.toString(value, dataTypeFormat)); + case BINARY -> row.addBinary(columnIndex, DataTypeUtils.toString(value, dataTypeFormat).getBytes()); + case FLOAT -> row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName)); + case DOUBLE -> row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName)); + case DECIMAL -> row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, dataTypeFormat))); + case VARCHAR -> row.addVarchar(columnIndex, DataTypeUtils.toString(value, dataTypeFormat)); + case DATE -> { final String dateFormat = dataTypeFormat == null ? RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat; row.addDate(columnIndex, getDate(value, recordFieldName, dateFormat)); - break; - default: - throw new IllegalStateException(String.format("unknown column type %s", colType)); + } + default -> throw new IllegalStateException(String.format("unknown column type %s", colType)); } } } @@ -460,20 +308,15 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { /** * Get Timestamp Pattern and override Timestamp Record Field pattern with optional microsecond pattern * - * @param optionalDataType Optional Data Type + * @param dataType Data Type * @return Optional Timestamp Pattern */ - private Optional getTimestampPattern(final Optional optionalDataType) { - String pattern = null; - if (optionalDataType.isPresent()) { - final DataType dataType = optionalDataType.get(); - if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) { - pattern = MICROSECOND_TIMESTAMP_PATTERN; - } else { - pattern = dataType.getFormat(); - } + private Optional getTimestampPattern(final DataType dataType) { + if (dataType == null) { + return Optional.empty(); } - return Optional.ofNullable(pattern); + + return Optional.of(RecordFieldType.TIMESTAMP == dataType.getFieldType() ? MICROSECOND_TIMESTAMP_PATTERN : dataType.getFormat()); } /** @@ -493,33 +336,20 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { * Converts a NiFi DataType to it's equivalent Kudu Type. */ private Type toKuduType(DataType nifiType) { - switch (nifiType.getFieldType()) { - case BOOLEAN: - return Type.BOOL; - case BYTE: - return Type.INT8; - case SHORT: - return Type.INT16; - case INT: - return Type.INT32; - case LONG: - return Type.INT64; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case DECIMAL: - return Type.DECIMAL; - case TIMESTAMP: - return Type.UNIXTIME_MICROS; - case CHAR: - case STRING: - return Type.STRING; - case DATE: - return Type.DATE; - default: - throw new IllegalArgumentException(String.format("unsupported type %s", nifiType)); - } + return switch (nifiType.getFieldType()) { + case BOOLEAN -> Type.BOOL; + case BYTE -> Type.INT8; + case SHORT -> Type.INT16; + case INT -> Type.INT32; + case LONG -> Type.INT64; + case FLOAT -> Type.FLOAT; + case DOUBLE -> Type.DOUBLE; + case DECIMAL -> Type.DECIMAL; + case TIMESTAMP -> Type.UNIXTIME_MICROS; + case CHAR, STRING -> Type.STRING; + case DATE -> Type.DATE; + default -> throw new IllegalArgumentException(String.format("unsupported type %s", nifiType)); + }; } private ColumnTypeAttributes getKuduTypeAttributes(final DataType nifiType) { diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 7fba59fa17..3a3273559f 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -17,6 +17,19 @@ package org.apache.nifi.processors.kudu; +import java.io.InputStream; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.security.auth.login.LoginException; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; @@ -61,23 +74,9 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSet; -import javax.security.auth.login.LoginException; -import java.io.InputStream; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - +import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT; import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; -import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT; @SystemResourceConsideration(resource = SystemResource.MEMORY) @SupportsBatching @@ -211,26 +210,27 @@ public class PutKudu extends AbstractKuduProcessor { protected static final PropertyDescriptor FLUSH_MODE = new Builder() .name("Flush Mode") - .description("Set the new flush mode for a kudu session.\n" + - "AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" + - "AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" + - " operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" + - "MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.") + .description(""" + Set the new flush mode for a kudu session. + AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception. + AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory + operations but it may have to wait when the buffer is full and there's another buffer being flushed. + "MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full. + """) .allowableValues(SessionConfiguration.FlushMode.values()) .defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString()) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) .build(); protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new Builder() .name("FlowFiles per Batch") - .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " + + .description("The maximum number of FlowFiles to process in a single execution, between 1 and 100,000. " + "Depending on your memory size, and data size per row set an appropriate batch size " + "for the number of FlowFiles to process per client connection setup." + "Gradually increase this number, only if your FlowFiles typically contain a few records.") .defaultValue("1") .required(true) - .addValidator(StandardValidators.createLongValidator(1, 100000, true)) + .addValidator(StandardValidators.createLongValidator(1, 100_000, true)) .expressionLanguageSupported(ENVIRONMENT) .build(); @@ -282,9 +282,6 @@ public class PutKudu extends AbstractKuduProcessor { properties.add(TABLE_NAME); properties.add(FAILURE_STRATEGY); properties.add(KERBEROS_USER_SERVICE); - properties.add(KERBEROS_CREDENTIALS_SERVICE); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_PASSWORD); properties.add(SKIP_HEAD_LINE); properties.add(LOWERCASE_FIELD_NAMES); properties.add(HANDLE_SCHEMA_DRIFT); @@ -447,7 +444,7 @@ public class PutKudu extends AbstractKuduProcessor { dataRecords = Collections.singletonList(record); } else { final RecordPathResult result = dataRecordPath.evaluate(record); - final List fieldValues = result.getSelectedFields().collect(Collectors.toList()); + final List fieldValues = result.getSelectedFields().toList(); if (fieldValues.isEmpty()) { throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results."); } @@ -530,7 +527,7 @@ public class PutKudu extends AbstractKuduProcessor { recordFields = record.getSchema().getFields(); } else { final RecordPathResult recordPathResult = dataRecordPath.evaluate(record); - final List fieldValues = recordPathResult.getSelectedFields().collect(Collectors.toList()); + final List fieldValues = recordPathResult.getSelectedFields().toList(); recordFields = new ArrayList<>(); for (final FieldValue fieldValue : fieldValues) { @@ -548,7 +545,7 @@ public class PutKudu extends AbstractKuduProcessor { final List missing = recordFields.stream() .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName())) - .collect(Collectors.toList()); + .toList(); if (missing.isEmpty()) { getLogger().debug("No schema drift detected for {}", flowFile); @@ -649,10 +646,8 @@ public class PutKudu extends AbstractKuduProcessor { boolean lowercaseFields, KuduTable kuduTable) { Operation operation; switch (operationType) { - case INSERT: - operation = kuduTable.newInsert(); - break; - case INSERT_IGNORE: + case INSERT -> operation = kuduTable.newInsert(); + case INSERT_IGNORE -> { // If the target Kudu cluster does not support ignore operations use an insert. // The legacy session based insert ignore will be used instead. if (!supportsInsertIgnoreOp) { @@ -660,24 +655,13 @@ public class PutKudu extends AbstractKuduProcessor { } else { operation = kuduTable.newInsertIgnore(); } - break; - case UPSERT: - operation = kuduTable.newUpsert(); - break; - case UPDATE: - operation = kuduTable.newUpdate(); - break; - case UPDATE_IGNORE: - operation = kuduTable.newUpdateIgnore(); - break; - case DELETE: - operation = kuduTable.newDelete(); - break; - case DELETE_IGNORE: - operation = kuduTable.newDeleteIgnore(); - break; - default: - throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); + } + case UPSERT -> operation = kuduTable.newUpsert(); + case UPDATE -> operation = kuduTable.newUpdate(); + case UPDATE_IGNORE -> operation = kuduTable.newUpdateIgnore(); + case DELETE -> operation = kuduTable.newDelete(); + case DELETE_IGNORE -> operation = kuduTable.newDeleteIgnore(); + default -> throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); } buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, fieldNames, ignoreNull, lowercaseFields); return operation; @@ -693,7 +677,7 @@ public class PutKudu extends AbstractKuduProcessor { @Override public OperationType apply(final Record record) { final RecordPathResult recordPathResult = recordPath.evaluate(record); - final List resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList()); + final List resultList = recordPathResult.getSelectedFields().distinct().toList(); if (resultList.isEmpty()) { throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results"); } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java index 1b807d7516..a1efdb71e9 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -17,47 +17,37 @@ package org.apache.nifi.processors.kudu; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.kudu.Schema; +import org.apache.kudu.client.Delete; import org.apache.kudu.client.DeleteIgnore; +import org.apache.kudu.client.Insert; import org.apache.kudu.client.InsertIgnore; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Delete; -import org.apache.kudu.client.Insert; import org.apache.kudu.client.Operation; +import org.apache.kudu.client.Update; import org.apache.kudu.client.UpdateIgnore; import org.apache.kudu.client.Upsert; -import org.apache.kudu.client.Update; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.Record; -import javax.security.auth.login.AppConfigurationEntry; -import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; - import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MockPutKudu extends PutKudu { - private KuduSession session; - private LinkedList opQueue; + private final KuduSession session; + private final LinkedList opQueue; // Atomic reference is used as the set and use of the schema are in different thread - private AtomicReference tableSchema = new AtomicReference<>(); - - private boolean loggedIn = false; - private boolean loggedOut = false; + private final AtomicReference tableSchema = new AtomicReference<>(); public MockPutKudu() { this(mock(KuduSession.class)); @@ -78,31 +68,15 @@ public class MockPutKudu extends PutKudu { boolean lowercaseFields, KuduTable kuduTable) { Operation operation = opQueue.poll(); if (operation == null) { - switch (operationType) { - case INSERT: - operation = mock(Insert.class); - break; - case INSERT_IGNORE: - operation = mock(InsertIgnore.class); - break; - case UPSERT: - operation = mock(Upsert.class); - break; - case UPDATE: - operation = mock(Update.class); - break; - case UPDATE_IGNORE: - operation = mock(UpdateIgnore.class); - break; - case DELETE: - operation = mock(Delete.class); - break; - case DELETE_IGNORE: - operation = mock(DeleteIgnore.class); - break; - default: - throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); - } + operation = switch (operationType) { + case INSERT -> mock(Insert.class); + case INSERT_IGNORE -> mock(InsertIgnore.class); + case UPSERT -> mock(Upsert.class); + case UPDATE -> mock(Update.class); + case UPDATE_IGNORE -> mock(UpdateIgnore.class); + case DELETE -> mock(Delete.class); + case DELETE_IGNORE -> mock(DeleteIgnore.class); + }; } return operation; } @@ -140,86 +114,6 @@ public class MockPutKudu extends PutKudu { actionOnKuduClient.accept(client); } - public boolean loggedIn() { - return loggedIn; - } - - public boolean loggedOut() { - return loggedOut; - } - - @Override - protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) { - return createMockKerberosUser(principal); - } - - @Override - protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) { - return createMockKerberosUser(principal); - } - - private KerberosUser createMockKerberosUser(final String principal) { - return new KerberosUser() { - - @Override - public void login() { - loggedIn = true; - } - - @Override - public void logout() { - loggedOut = true; - } - - @Override - public T doAs(final PrivilegedAction action) throws IllegalStateException { - return action.run(); - } - - @Override - public T doAs(PrivilegedAction action, ClassLoader contextClassLoader) throws IllegalStateException { - return action.run(); - } - - @Override - public T doAs(final PrivilegedExceptionAction action) throws IllegalStateException, PrivilegedActionException { - try { - return action.run(); - } catch (Exception e) { - throw new PrivilegedActionException(e); - } - } - - @Override - public T doAs(PrivilegedExceptionAction action, ClassLoader contextClassLoader) throws IllegalStateException, PrivilegedActionException { - try { - return action.run(); - } catch (Exception e) { - throw new PrivilegedActionException(e); - } - } - - @Override - public boolean checkTGTAndRelogin() { - return true; - } - - @Override - public boolean isLoggedIn() { - return loggedIn && !loggedOut; - } - - @Override - public String getPrincipal() { - return principal; - } - - @Override - public AppConfigurationEntry getConfigurationEntry() { - return new AppConfigurationEntry("LoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.emptyMap()); - } - }; - } @Override protected KuduSession createKuduSession(final KuduClient client) { diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index eb72477a51..39247984f6 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -17,6 +17,20 @@ package org.apache.nifi.processors.kudu; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; import org.apache.kudu.ColumnSchema; import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Schema; @@ -29,11 +43,8 @@ import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RowError; import org.apache.kudu.client.RowErrorsAndOverflowStatus; import org.apache.kudu.client.SessionConfiguration.FlushMode; -import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.kerberos.KerberosCredentialsService; -import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -58,29 +69,11 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.mockito.stubbing.OngoingStubbing; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigDecimal; -import java.sql.Timestamp; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -148,50 +141,6 @@ public class TestPutKudu { testRunner.enableControllerService(readerFactory); } - @Test - public void testCustomValidate() throws InitializationException { - createRecordReader(1); - - testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); - testRunner.assertNotValid(); - - testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL); - testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); - testRunner.assertNotValid(); - - testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); - testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); - testRunner.assertValid(); - - final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab"); - testRunner.addControllerService("kerb", kerberosCredentialsService); - testRunner.enableControllerService(kerberosCredentialsService); - testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb"); - testRunner.assertNotValid(); - - testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL); - testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD); - testRunner.assertValid(); - - final KerberosUserService kerberosUserService = enableKerberosUserService(testRunner); - testRunner.setProperty(PutKudu.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier()); - testRunner.assertNotValid(); - - testRunner.removeProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE); - testRunner.assertValid(); - - testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal"); - testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password"); - testRunner.assertNotValid(); - } - - private KerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException { - final KerberosUserService kerberosUserService = mock(KerberosUserService.class); - when(kerberosUserService.getIdentifier()).thenReturn("userService1"); - runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService); - runner.enableControllerService(kerberosUserService); - return kerberosUserService; - } @Test public void testWriteKuduWithDefaults() throws InitializationException { @@ -221,39 +170,6 @@ public class TestPutKudu { assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); } - @Test - public void testKerberosEnabled() throws InitializationException { - createRecordReader(1); - - final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab"); - testRunner.addControllerService("kerb", kerberosCredentialsService); - testRunner.enableControllerService(kerberosCredentialsService); - - testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb"); - - testRunner.run(1, false); - - final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor(); - assertTrue(proc.loggedIn()); - assertFalse(proc.loggedOut()); - - testRunner.run(1, true, false); - assertTrue(proc.loggedOut()); - } - - @Test - public void testInsecureClient() throws InitializationException { - createRecordReader(1); - - testRunner.run(1, false); - - final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor(); - assertFalse(proc.loggedIn()); - assertFalse(proc.loggedOut()); - - testRunner.run(1, true, false); - assertFalse(proc.loggedOut()); - } @Test @@ -751,7 +667,7 @@ public class TestPutKudu { flowFileResponses = flowFileResponses.subList(sliceSize, flowFileResponses.size()); List batch = new ArrayList<>(); - for (OperationResponse response : slice.stream().flatMap(List::stream).collect(Collectors.toList())) { + for (OperationResponse response : slice.stream().flatMap(List::stream).toList()) { if (batch.size() == batchSize) { flushes.add(batch); batch = new ArrayList<>(); @@ -814,24 +730,4 @@ public class TestPutKudu { public void testKuduPartialFailuresOnManualFlush() throws Exception { testKuduPartialFailure(FlushMode.MANUAL_FLUSH); } - - public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService { - private final String keytab; - private final String principal; - - public MockKerberosCredentialsService(final String keytab, final String principal) { - this.keytab = keytab; - this.principal = principal; - } - - @Override - public String getKeytab() { - return keytab; - } - - @Override - public String getPrincipal() { - return principal; - } - } }