mirror of https://github.com/apache/nifi.git
NIFI-7029 Add kerberos password property to NiFi Kudu components
This closes #4097.
This commit is contained in:
parent
c74e71306e
commit
5b93e537d5
|
@ -17,26 +17,29 @@
|
||||||
|
|
||||||
package org.apache.nifi.processors.kudu;
|
package org.apache.nifi.processors.kudu;
|
||||||
|
|
||||||
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.kudu.ColumnSchema;
|
import org.apache.kudu.ColumnSchema;
|
||||||
import org.apache.kudu.Schema;
|
import org.apache.kudu.Schema;
|
||||||
import org.apache.kudu.Type;
|
import org.apache.kudu.Type;
|
||||||
import org.apache.kudu.client.AsyncKuduClient;
|
import org.apache.kudu.client.AsyncKuduClient;
|
||||||
|
import org.apache.kudu.client.Delete;
|
||||||
|
import org.apache.kudu.client.Insert;
|
||||||
import org.apache.kudu.client.KuduClient;
|
import org.apache.kudu.client.KuduClient;
|
||||||
import org.apache.kudu.client.KuduTable;
|
|
||||||
import org.apache.kudu.client.KuduSession;
|
|
||||||
import org.apache.kudu.client.KuduException;
|
import org.apache.kudu.client.KuduException;
|
||||||
|
import org.apache.kudu.client.KuduSession;
|
||||||
|
import org.apache.kudu.client.KuduTable;
|
||||||
import org.apache.kudu.client.OperationResponse;
|
import org.apache.kudu.client.OperationResponse;
|
||||||
import org.apache.kudu.client.PartialRow;
|
import org.apache.kudu.client.PartialRow;
|
||||||
import org.apache.kudu.client.RowError;
|
import org.apache.kudu.client.RowError;
|
||||||
import org.apache.kudu.client.SessionConfiguration;
|
import org.apache.kudu.client.SessionConfiguration;
|
||||||
import org.apache.kudu.client.Delete;
|
|
||||||
import org.apache.kudu.client.Insert;
|
|
||||||
import org.apache.kudu.client.Upsert;
|
|
||||||
import org.apache.kudu.client.Update;
|
import org.apache.kudu.client.Update;
|
||||||
|
import org.apache.kudu.client.Upsert;
|
||||||
|
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
import org.apache.nifi.kerberos.KerberosCredentialsService;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
@ -44,17 +47,21 @@ import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.security.krb.KerberosAction;
|
import org.apache.nifi.security.krb.KerberosAction;
|
||||||
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
import org.apache.nifi.security.krb.KerberosKeytabUser;
|
||||||
|
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||||
import org.apache.nifi.security.krb.KerberosUser;
|
import org.apache.nifi.security.krb.KerberosUser;
|
||||||
import org.apache.nifi.serialization.record.DataType;
|
import org.apache.nifi.serialization.record.DataType;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
|
import org.apache.nifi.util.StringUtils;
|
||||||
|
|
||||||
import javax.security.auth.login.LoginException;
|
import javax.security.auth.login.LoginException;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
|
@ -74,6 +81,25 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
.identifiesControllerService(KerberosCredentialsService.class)
|
.identifiesControllerService(KerberosCredentialsService.class)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||||
|
.name("kerberos-principal")
|
||||||
|
.displayName("Kerberos Principal")
|
||||||
|
.description("The principal to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||||
|
.name("kerberos-password")
|
||||||
|
.displayName("Kerberos Password")
|
||||||
|
.description("The password to use when specifying the principal and password directly in the processor for authenticating via Kerberos.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.sensitive(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
|
static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
|
||||||
.name("kudu-operations-timeout-ms")
|
.name("kudu-operations-timeout-ms")
|
||||||
.displayName("Kudu Operation Timeout")
|
.displayName("Kudu Operation Timeout")
|
||||||
|
@ -109,12 +135,16 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
public void createKuduClient(ProcessContext context) throws LoginException {
|
public void createKuduClient(ProcessContext context) throws LoginException {
|
||||||
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
|
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
|
||||||
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||||
|
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||||
|
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||||
|
|
||||||
if (credentialsService != null) {
|
if (credentialsService != null) {
|
||||||
final String keytab = credentialsService.getKeytab();
|
kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab());
|
||||||
final String principal = credentialsService.getPrincipal();
|
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||||
kerberosUser = loginKerberosUser(principal, keytab);
|
kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (kerberosUser != null) {
|
||||||
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
|
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
|
||||||
this.kuduClient = kerberosAction.execute();
|
this.kuduClient = kerberosAction.execute();
|
||||||
} else {
|
} else {
|
||||||
|
@ -146,12 +176,54 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
|
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
|
||||||
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
|
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
|
||||||
kerberosUser.login();
|
kerberosUser.login();
|
||||||
return kerberosUser;
|
return kerberosUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected KerberosUser loginKerberosPasswordUser(final String principal, final String password) throws LoginException {
|
||||||
|
final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password);
|
||||||
|
kerberosUser.login();
|
||||||
|
return kerberosUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||||
|
final List<ValidationResult> results = new ArrayList<>();
|
||||||
|
|
||||||
|
final boolean kerberosPrincipalProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue());
|
||||||
|
final boolean kerberosPasswordProvided = !StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue());
|
||||||
|
|
||||||
|
if (kerberosPrincipalProvided && !kerberosPasswordProvided) {
|
||||||
|
results.add(new ValidationResult.Builder()
|
||||||
|
.subject(KERBEROS_PASSWORD.getDisplayName())
|
||||||
|
.valid(false)
|
||||||
|
.explanation("a password must be provided for the given principal")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (kerberosPasswordProvided && !kerberosPrincipalProvided) {
|
||||||
|
results.add(new ValidationResult.Builder()
|
||||||
|
.subject(KERBEROS_PRINCIPAL.getDisplayName())
|
||||||
|
.valid(false)
|
||||||
|
.explanation("a principal must be provided for the given password")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
|
||||||
|
|
||||||
|
if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
|
||||||
|
results.add(new ValidationResult.Builder()
|
||||||
|
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||||
|
.valid(false)
|
||||||
|
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
@OnStopped
|
@OnStopped
|
||||||
public void shutdown() throws Exception {
|
public void shutdown() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -231,6 +231,8 @@ public class PutKudu extends AbstractKuduProcessor {
|
||||||
properties.add(KUDU_MASTERS);
|
properties.add(KUDU_MASTERS);
|
||||||
properties.add(TABLE_NAME);
|
properties.add(TABLE_NAME);
|
||||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||||
|
properties.add(KERBEROS_PRINCIPAL);
|
||||||
|
properties.add(KERBEROS_PASSWORD);
|
||||||
properties.add(SKIP_HEAD_LINE);
|
properties.add(SKIP_HEAD_LINE);
|
||||||
properties.add(LOWERCASE_FIELD_NAMES);
|
properties.add(LOWERCASE_FIELD_NAMES);
|
||||||
properties.add(HANDLE_SCHEMA_DRIFT);
|
properties.add(HANDLE_SCHEMA_DRIFT);
|
||||||
|
|
|
@ -117,7 +117,16 @@ public class MockPutKudu extends PutKudu {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
|
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab) throws LoginException {
|
||||||
|
return createMockKerberosUser(principal);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected KerberosUser loginKerberosPasswordUser(String principal, String password) throws LoginException {
|
||||||
|
return createMockKerberosUser(principal);
|
||||||
|
}
|
||||||
|
|
||||||
|
private KerberosUser createMockKerberosUser(final String principal) {
|
||||||
return new KerberosUser() {
|
return new KerberosUser() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -131,6 +131,32 @@ public class TestPutKudu {
|
||||||
testRunner.enableControllerService(readerFactory);
|
testRunner.enableControllerService(readerFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidate() throws InitializationException {
|
||||||
|
createRecordReader(1);
|
||||||
|
|
||||||
|
testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
|
||||||
|
testRunner.assertNotValid();
|
||||||
|
|
||||||
|
testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
|
||||||
|
testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
|
||||||
|
testRunner.assertNotValid();
|
||||||
|
|
||||||
|
testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
|
||||||
|
testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
|
||||||
|
testRunner.addControllerService("kerb", kerberosCredentialsService);
|
||||||
|
testRunner.enableControllerService(kerberosCredentialsService);
|
||||||
|
testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
|
||||||
|
testRunner.assertNotValid();
|
||||||
|
|
||||||
|
testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
|
||||||
|
testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD);
|
||||||
|
testRunner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteKuduWithDefaults() throws IOException, InitializationException {
|
public void testWriteKuduWithDefaults() throws IOException, InitializationException {
|
||||||
createRecordReader(100);
|
createRecordReader(100);
|
||||||
|
|
Loading…
Reference in New Issue