NIFI-5984: Enabled Kerberos Authentication for PutKudu

This closes #3279
This commit is contained in:
Mark Payne 2019-01-30 10:06:04 -05:00 committed by Jeff Storck
parent 36c0a99e91
commit 3d408f2b30
4 changed files with 221 additions and 37 deletions

View File

@ -25,12 +25,6 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
@ -40,6 +34,11 @@
<artifactId>nifi-processor-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
@ -58,21 +57,24 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-record-utils</artifactId>
<artifactId>nifi-security-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -91,10 +93,5 @@
<version>2.5.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
</dependencies>
</project>

View File

@ -34,6 +34,7 @@ import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -41,21 +42,29 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -65,8 +74,11 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
@ -74,23 +86,31 @@ import java.util.stream.Collectors;
" If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu")
public class PutKudu extends AbstractProcessor {
protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor KUDU_MASTERS = new Builder()
.name("Kudu Masters")
.description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials to use for authentication")
.required(false)
.identifiesControllerService(KerberosCredentialsService.class)
.build();
public static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
@ -98,7 +118,7 @@ public class PutKudu extends AbstractProcessor {
.required(true)
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
"(e.g. \"Treat First Line as Header\" property of CSVReader)")
@ -108,7 +128,7 @@ public class PutKudu extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
.allowableValues(OperationType.values())
@ -116,7 +136,7 @@ public class PutKudu extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
protected static final PropertyDescriptor FLUSH_MODE = new Builder()
.name("Flush Mode")
.description("Set the new flush mode for a kudu session.\n" +
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
@ -149,7 +169,7 @@ public class PutKudu extends AbstractProcessor {
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
@ -171,12 +191,14 @@ public class PutKudu extends AbstractProcessor {
protected KuduClient kuduClient;
protected KuduTable kuduTable;
private volatile KerberosUser kerberosUser;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
properties.add(TABLE_NAME);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SKIP_HEAD_LINE);
properties.add(RECORD_READER);
properties.add(INSERT_OPERATION);
@ -197,7 +219,7 @@ public class PutKudu extends AbstractProcessor {
@OnScheduled
public void OnScheduled(final ProcessContext context) throws KuduException {
public void onScheduled(final ProcessContext context) throws IOException, LoginException {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
@ -206,21 +228,48 @@ public class PutKudu extends AbstractProcessor {
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
getLogger().debug("Setting up Kudu connection...");
kuduClient = createClient(kuduMasters);
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
kuduClient = createClient(kuduMasters, credentialsService);
kuduTable = kuduClient.openTable(tableName);
getLogger().debug("Kudu connection successfully initialized");
}
protected KuduClient createClient(final String masters) {
protected KuduClient createClient(final String masters, final KerberosCredentialsService credentialsService) throws LoginException {
if (credentialsService == null) {
return buildClient(masters);
}
final String keytab = credentialsService.getKeytab();
final String principal = credentialsService.getPrincipal();
kerberosUser = loginKerberosUser(principal, keytab);
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger());
return kerberosAction.execute();
}
protected KuduClient buildClient(final String masters) {
return new KuduClient.KuduClientBuilder(masters).build();
}
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
kerberosUser.login();
return kerberosUser;
}
@OnStopped
public final void closeClient() throws KuduException {
if (kuduClient != null) {
getLogger().debug("Closing KuduClient");
kuduClient.close();
kuduClient = null;
public final void closeClient() throws KuduException, LoginException {
try {
if (kuduClient != null) {
getLogger().debug("Closing KuduClient");
kuduClient.close();
kuduClient = null;
}
} finally {
if (kerberosUser != null) {
kerberosUser.logout();
kerberosUser = null;
}
}
}
@ -231,6 +280,22 @@ public class PutKudu extends AbstractProcessor {
return;
}
final KerberosUser user = kerberosUser;
if (user == null) {
trigger(context, session, flowFiles);
return;
}
final PrivilegedExceptionAction<Void> privelegedAction = () -> {
trigger(context, session, flowFiles);
return null;
};
final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
action.execute();
}
private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
final KuduSession kuduSession = getKuduSession(kuduClient);
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
@ -353,13 +418,13 @@ public class PutKudu extends AbstractProcessor {
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
Upsert upsert = kuduTable.newUpsert();
this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
return upsert;
}
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) {
Insert insert = kuduTable.newInsert();
this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);
return insert;

View File

@ -22,8 +22,13 @@ import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
import javax.security.auth.login.LoginException;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@ -36,6 +41,9 @@ public class MockPutKudu extends PutKudu {
private KuduSession session;
private LinkedList<Insert> insertQueue;
private boolean loggedIn = false;
private boolean loggedOut = false;
public MockPutKudu() {
this(mock(KuduSession.class));
}
@ -61,18 +69,71 @@ public class MockPutKudu extends PutKudu {
}
@Override
protected KuduClient createClient(final String masters) {
protected KuduClient buildClient(final String masters) {
final KuduClient client = mock(KuduClient.class);
try {
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
} catch (final Exception e) {
throw new AssertionError(e);
}
return client;
}
public boolean loggedIn() {
return loggedIn;
}
public boolean loggedOut() {
return loggedOut;
}
@Override
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
return new KerberosUser() {
@Override
public void login() {
loggedIn = true;
}
@Override
public void logout() {
loggedOut = true;
}
@Override
public <T> T doAs(final PrivilegedAction<T> action) throws IllegalStateException {
return action.run();
}
@Override
public <T> T doAs(final PrivilegedExceptionAction<T> action) throws IllegalStateException, PrivilegedActionException {
try {
return action.run();
} catch (Exception e) {
throw new PrivilegedActionException(e);
}
}
@Override
public boolean checkTGTAndRelogin() {
return true;
}
@Override
public boolean isLoggedIn() {
return loggedIn && !loggedOut;
}
@Override
public String getPrincipal() {
return principal;
}
};
}
@Override
protected KuduSession getKuduSession(KuduClient client) {
return session;

View File

@ -28,8 +28,10 @@ import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowErrorsAndOverflowStatus;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@ -70,6 +72,8 @@ import java.util.stream.IntStream;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -149,6 +153,42 @@ public class TestPutKudu {
Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
}
@Test
public void testKerberosEnabled() throws InitializationException {
createRecordReader(1);
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
testRunner.addControllerService("kerb", kerberosCredentialsService);
testRunner.enableControllerService(kerberosCredentialsService);
testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
testRunner.run(1, false);
final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
assertTrue(proc.loggedIn());
assertFalse(proc.loggedOut());
testRunner.run(1, true, false);
assertTrue(proc.loggedOut());
}
@Test
public void testInsecureClient() throws InitializationException {
createRecordReader(1);
testRunner.run(1, false);
final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
assertFalse(proc.loggedIn());
assertFalse(proc.loggedOut());
testRunner.run(1, true, false);
assertFalse(proc.loggedOut());
}
@Test
public void testInvalidReaderShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException {
createRecordReader(0);
@ -516,4 +556,25 @@ public class TestPutKudu {
public void testKuduPartialFailuresOnManualFlush() throws Exception {
testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
}
public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService {
private final String keytab;
private final String principal;
public MockKerberosCredentialsService(final String keytab, final String principal) {
this.keytab = keytab;
this.principal = principal;
}
@Override
public String getKeytab() {
return keytab;
}
@Override
public String getPrincipal() {
return principal;
}
}
}