From 3d408f2b30fbb35fd507577a7032821044a7dcd9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 30 Jan 2019 10:06:04 -0500 Subject: [PATCH] NIFI-5984: Enabled Kerberos Authentication for PutKudu This closes #3279 --- .../nifi-kudu-processors/pom.xml | 29 +++-- .../apache/nifi/processors/kudu/PutKudu.java | 103 ++++++++++++++---- .../nifi/processors/kudu/MockPutKudu.java | 65 ++++++++++- .../nifi/processors/kudu/TestPutKudu.java | 61 +++++++++++ 4 files changed, 221 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index 5871cc0a59..8bb100d9ea 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -25,12 +25,6 @@ jar - - org.apache.nifi - nifi-distributed-cache-client-service-api - 1.9.0-SNAPSHOT - provided - org.apache.nifi nifi-api @@ -40,6 +34,11 @@ nifi-processor-utils 1.9.0-SNAPSHOT + + org.apache.nifi + nifi-kerberos-credentials-service-api + 1.9.0-SNAPSHOT + org.apache.nifi nifi-record-serialization-service-api @@ -58,21 +57,24 @@ org.apache.nifi nifi-record + + com.google.guava + guava + 18.0 + + org.apache.nifi - nifi-hadoop-record-utils + nifi-security-utils 1.9.0-SNAPSHOT + org.apache.nifi nifi-mock-record-utils 1.9.0-SNAPSHOT test - - org.apache.nifi - nifi-schema-registry-service-api - org.apache.nifi nifi-mock @@ -91,10 +93,5 @@ 2.5.4 test - - com.google.guava - guava - 18.0 - diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index b0eb3f9fe9..9c0c503dfd 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -34,6 +34,7 @@ import org.apache.kudu.client.SessionConfiguration; import org.apache.kudu.client.Upsert; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -41,21 +42,29 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.krb.KerberosAction; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSet; +import javax.security.auth.login.LoginException; +import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -65,8 +74,11 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY; + @EventDriven @SupportsBatching +@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"}) @CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " + @@ -74,23 +86,31 @@ import java.util.stream.Collectors; " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure") @WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu") public class PutKudu extends AbstractProcessor { - protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder() + protected static final PropertyDescriptor KUDU_MASTERS = new Builder() .name("Kudu Masters") .description("List all kudu masters's ip with port (e.g. 7051), comma separated") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); - protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + protected static final PropertyDescriptor TABLE_NAME = new Builder() .name("Table Name") .description("The name of the Kudu Table to put data into") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); - public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials to use for authentication") + .required(false) + .identifiesControllerService(KerberosCredentialsService.class) + .build(); + + public static final PropertyDescriptor RECORD_READER = new Builder() .name("record-reader") .displayName("Record Reader") .description("The service for reading records from incoming flow files.") @@ -98,7 +118,7 @@ public class PutKudu extends AbstractProcessor { .required(true) .build(); - protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder() + protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder() .name("Skip head line") .description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " + "(e.g. \"Treat First Line as Header\" property of CSVReader)") @@ -108,7 +128,7 @@ public class PutKudu extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder() + protected static final PropertyDescriptor INSERT_OPERATION = new Builder() .name("Insert Operation") .description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows") .allowableValues(OperationType.values()) @@ -116,7 +136,7 @@ public class PutKudu extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder() + protected static final PropertyDescriptor FLUSH_MODE = new Builder() .name("Flush Mode") .description("Set the new flush mode for a kudu session.\n" + "AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" + @@ -149,7 +169,7 @@ public class PutKudu extends AbstractProcessor { .defaultValue("100") .required(true) .addValidator(StandardValidators.createLongValidator(1, 100000, true)) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); @@ -171,12 +191,14 @@ public class PutKudu extends AbstractProcessor { protected KuduClient kuduClient; protected KuduTable kuduTable; + private volatile KerberosUser kerberosUser; @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(KUDU_MASTERS); properties.add(TABLE_NAME); + properties.add(KERBEROS_CREDENTIALS_SERVICE); properties.add(SKIP_HEAD_LINE); properties.add(RECORD_READER); properties.add(INSERT_OPERATION); @@ -197,7 +219,7 @@ public class PutKudu extends AbstractProcessor { @OnScheduled - public void OnScheduled(final ProcessContext context) throws KuduException { + public void onScheduled(final ProcessContext context) throws IOException, LoginException { final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); @@ -206,21 +228,48 @@ public class PutKudu extends AbstractProcessor { flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); getLogger().debug("Setting up Kudu connection..."); - kuduClient = createClient(kuduMasters); + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + kuduClient = createClient(kuduMasters, credentialsService); kuduTable = kuduClient.openTable(tableName); getLogger().debug("Kudu connection successfully initialized"); } - protected KuduClient createClient(final String masters) { + protected KuduClient createClient(final String masters, final KerberosCredentialsService credentialsService) throws LoginException { + if (credentialsService == null) { + return buildClient(masters); + } + + final String keytab = credentialsService.getKeytab(); + final String principal = credentialsService.getPrincipal(); + kerberosUser = loginKerberosUser(principal, keytab); + + final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger()); + return kerberosAction.execute(); + } + + protected KuduClient buildClient(final String masters) { return new KuduClient.KuduClientBuilder(masters).build(); } + protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException { + final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab); + kerberosUser.login(); + return kerberosUser; + } + @OnStopped - public final void closeClient() throws KuduException { - if (kuduClient != null) { - getLogger().debug("Closing KuduClient"); - kuduClient.close(); - kuduClient = null; + public final void closeClient() throws KuduException, LoginException { + try { + if (kuduClient != null) { + getLogger().debug("Closing KuduClient"); + kuduClient.close(); + kuduClient = null; + } + } finally { + if (kerberosUser != null) { + kerberosUser.logout(); + kerberosUser = null; + } } } @@ -231,6 +280,22 @@ public class PutKudu extends AbstractProcessor { return; } + final KerberosUser user = kerberosUser; + if (user == null) { + trigger(context, session, flowFiles); + return; + } + + final PrivilegedExceptionAction privelegedAction = () -> { + trigger(context, session, flowFiles); + return null; + }; + + final KerberosAction action = new KerberosAction<>(user, privelegedAction, getLogger()); + action.execute(); + } + + private void trigger(final ProcessContext context, final ProcessSession session, final List flowFiles) throws ProcessException { final KuduSession kuduSession = getKuduSession(kuduClient); final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); @@ -353,13 +418,13 @@ public class PutKudu extends AbstractProcessor { - protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames) throws IllegalStateException, Exception { + protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames) { Upsert upsert = kuduTable.newUpsert(); this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames); return upsert; } - protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames) throws IllegalStateException, Exception { + protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames) { Insert insert = kuduTable.newInsert(); this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames); return insert; diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java index f805be77b5..091f5c342a 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -22,8 +22,13 @@ import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Upsert; +import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.Record; +import javax.security.auth.login.LoginException; +import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -36,6 +41,9 @@ public class MockPutKudu extends PutKudu { private KuduSession session; private LinkedList insertQueue; + private boolean loggedIn = false; + private boolean loggedOut = false; + public MockPutKudu() { this(mock(KuduSession.class)); } @@ -61,18 +69,71 @@ public class MockPutKudu extends PutKudu { } @Override - protected KuduClient createClient(final String masters) { + protected KuduClient buildClient(final String masters) { final KuduClient client = mock(KuduClient.class); try { when(client.openTable(anyString())).thenReturn(mock(KuduTable.class)); } catch (final Exception e) { - + throw new AssertionError(e); } return client; } + public boolean loggedIn() { + return loggedIn; + } + + public boolean loggedOut() { + return loggedOut; + } + + @Override + protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException { + return new KerberosUser() { + + @Override + public void login() { + loggedIn = true; + } + + @Override + public void logout() { + loggedOut = true; + } + + @Override + public T doAs(final PrivilegedAction action) throws IllegalStateException { + return action.run(); + } + + @Override + public T doAs(final PrivilegedExceptionAction action) throws IllegalStateException, PrivilegedActionException { + try { + return action.run(); + } catch (Exception e) { + throw new PrivilegedActionException(e); + } + } + + @Override + public boolean checkTGTAndRelogin() { + return true; + } + + @Override + public boolean isLoggedIn() { + return loggedIn && !loggedOut; + } + + @Override + public String getPrincipal() { + return principal; + } + }; + } + @Override protected KuduSession getKuduSession(KuduClient client) { return session; diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 51908f227e..6fc430c260 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -28,8 +28,10 @@ import org.apache.kudu.client.OperationResponse; import org.apache.kudu.client.RowError; import org.apache.kudu.client.RowErrorsAndOverflowStatus; import org.apache.kudu.client.SessionConfiguration.FlushMode; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -70,6 +72,8 @@ import java.util.stream.IntStream; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL; import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -149,6 +153,42 @@ public class TestPutKudu { Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); } + @Test + public void testKerberosEnabled() throws InitializationException { + createRecordReader(1); + + final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab"); + testRunner.addControllerService("kerb", kerberosCredentialsService); + testRunner.enableControllerService(kerberosCredentialsService); + + testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb"); + + testRunner.run(1, false); + + final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor(); + assertTrue(proc.loggedIn()); + assertFalse(proc.loggedOut()); + + testRunner.run(1, true, false); + assertTrue(proc.loggedOut()); + } + + + @Test + public void testInsecureClient() throws InitializationException { + createRecordReader(1); + + testRunner.run(1, false); + + final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor(); + assertFalse(proc.loggedIn()); + assertFalse(proc.loggedOut()); + + testRunner.run(1, true, false); + assertFalse(proc.loggedOut()); + } + + @Test public void testInvalidReaderShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException { createRecordReader(0); @@ -516,4 +556,25 @@ public class TestPutKudu { public void testKuduPartialFailuresOnManualFlush() throws Exception { testKuduPartialFailure(FlushMode.MANUAL_FLUSH); } + + + public static class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService { + private final String keytab; + private final String principal; + + public MockKerberosCredentialsService(final String keytab, final String principal) { + this.keytab = keytab; + this.principal = principal; + } + + @Override + public String getKeytab() { + return keytab; + } + + @Override + public String getPrincipal() { + return principal; + } + } }