From e02ffdd99fb3e0f561a50903f491b392a1a505cc Mon Sep 17 00:00:00 2001 From: Tamas Palfy Date: Fri, 12 Jun 2020 17:24:59 +0200 Subject: [PATCH] NIFI-7527 AbstractKuduProcessorrefresh TGT deadlock fix: Redesigned locking. NIFI-7527 Fixed StackOverFlowError due to pacing issue (recursive login before loggedIn flag is set). NIFI-7527 Refactor: removed redundant kudu client creation. This closes #4330. Signed-off-by: Peter Turcsanyi --- .../kudu/AbstractKuduProcessor.java | 67 ++++++++----------- .../apache/nifi/processors/kudu/PutKudu.java | 8 +-- .../nifi/processors/kudu/MockPutKudu.java | 12 ++-- 3 files changed, 36 insertions(+), 51 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index b9639e5d06..36268d5202 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -46,8 +46,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.krb.KerberosAction; import org.apache.nifi.security.krb.KerberosKeytabUser; @@ -71,6 +69,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; public abstract class AbstractKuduProcessor extends AbstractProcessor { @@ -136,36 +135,23 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { private volatile KerberosUser kerberosUser; - protected abstract void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException; - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - kuduClientReadLock.lock(); - try { - onTrigger(context, session, kuduClient); - } finally { - kuduClientReadLock.unlock(); - } - } - protected KerberosUser getKerberosUser() { return this.kerberosUser; } - protected void createKerberosUserAndKuduClient(ProcessContext context) throws LoginException { - createKerberosUser(context); - createKuduClient(context); - } - - protected void createKerberosUser(ProcessContext context) throws LoginException { + protected void createKerberosUserAndOrKuduClient(ProcessContext context) throws LoginException { final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); if (credentialsService != null) { - kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context); + kerberosUser = createKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context); + kerberosUser.login(); // login creates the kudu client as well } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) { - kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context); + kerberosUser = createKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context); + kerberosUser.login(); // login creates the kudu client as well + } else { + createKuduClient(context); } } @@ -202,6 +188,15 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .build(); } + protected void executeOnKuduClient(Consumer actionOnKuduClient) { + kuduClientReadLock.lock(); + try { + actionOnKuduClient.accept(kuduClient); + } finally { + kuduClientReadLock.unlock(); + } + } + protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List rowErrors) throws KuduException { final List responses = close ? kuduSession.close() : kuduSession.flush(); @@ -215,38 +210,30 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } } - protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException { - final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab) { + protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) { + return new KerberosKeytabUser(principal, keytab) { @Override - public synchronized boolean checkTGTAndRelogin() throws LoginException { - boolean didRelogin = super.checkTGTAndRelogin(); + public synchronized void login() throws LoginException { + if (!isLoggedIn()) { + super.login(); - if (didRelogin) { createKuduClient(context); } - - return didRelogin; } }; - kerberosUser.login(); - return kerberosUser; } - protected KerberosUser loginKerberosPasswordUser(final String principal, final String password, ProcessContext context) throws LoginException { - final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password) { + protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) { + return new KerberosPasswordUser(principal, password) { @Override - public synchronized boolean checkTGTAndRelogin() throws LoginException { - boolean didRelogin = super.checkTGTAndRelogin(); + public synchronized void login() throws LoginException { + if (!isLoggedIn()) { + super.login(); - if (didRelogin) { createKuduClient(context); } - - return didRelogin; } }; - kerberosUser.login(); - return kerberosUser; } @Override diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index c0d7e4664e..064e295c2c 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -259,11 +259,11 @@ public class PutKudu extends AbstractKuduProcessor { batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); - createKerberosUserAndKuduClient(context); + createKerberosUserAndOrKuduClient(context); } @Override - protected void onTrigger(final ProcessContext context, final ProcessSession session, KuduClient kuduClient) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final List flowFiles = session.get(ffbatch); if (flowFiles.isEmpty()) { return; @@ -271,12 +271,12 @@ public class PutKudu extends AbstractKuduProcessor { final KerberosUser user = getKerberosUser(); if (user == null) { - trigger(context, session, flowFiles, kuduClient); + executeOnKuduClient(kuduClient -> trigger(context, session, flowFiles, kuduClient)); return; } final PrivilegedExceptionAction privilegedAction = () -> { - trigger(context, session, flowFiles, kuduClient); + executeOnKuduClient(kuduClient -> trigger(context, session, flowFiles, kuduClient)); return null; }; diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java index 4f634fd873..31b9ac7530 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -26,12 +26,9 @@ import org.apache.kudu.client.Insert; import org.apache.kudu.client.Upsert; import org.apache.kudu.client.Update; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.Record; -import javax.security.auth.login.LoginException; import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -39,6 +36,7 @@ import java.util.Arrays; import java.util.List; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -103,7 +101,7 @@ public class MockPutKudu extends PutKudu { } @Override - protected void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException { + protected void executeOnKuduClient(Consumer actionOnKuduClient) { final KuduClient client = mock(KuduClient.class); try { @@ -114,7 +112,7 @@ public class MockPutKudu extends PutKudu { throw new AssertionError(e); } - super.onTrigger(context, session, client); + actionOnKuduClient.accept(client); } public boolean loggedIn() { @@ -126,12 +124,12 @@ public class MockPutKudu extends PutKudu { } @Override - protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException { + protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) { return createMockKerberosUser(principal); } @Override - protected KerberosUser loginKerberosPasswordUser(String principal, String password, ProcessContext context) throws LoginException { + protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) { return createMockKerberosUser(principal); }