NIFI-7527 AbstractKuduProcessorrefresh TGT deadlock fix: Redesigned locking.

NIFI-7527 Fixed StackOverFlowError due to pacing issue (recursive login before loggedIn flag is set).
NIFI-7527 Refactor: removed redundant kudu client creation.

This closes #4330.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-06-12 17:24:59 +02:00 committed by Peter Turcsanyi
parent a126d0a6b6
commit e02ffdd99f
3 changed files with 36 additions and 51 deletions

View File

@ -46,8 +46,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction; import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser; import org.apache.nifi.security.krb.KerberosKeytabUser;
@ -71,6 +69,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
public abstract class AbstractKuduProcessor extends AbstractProcessor { public abstract class AbstractKuduProcessor extends AbstractProcessor {
@ -136,36 +135,23 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
private volatile KerberosUser kerberosUser; private volatile KerberosUser kerberosUser;
protected abstract void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
kuduClientReadLock.lock();
try {
onTrigger(context, session, kuduClient);
} finally {
kuduClientReadLock.unlock();
}
}
protected KerberosUser getKerberosUser() { protected KerberosUser getKerberosUser() {
return this.kerberosUser; return this.kerberosUser;
} }
protected void createKerberosUserAndKuduClient(ProcessContext context) throws LoginException { protected void createKerberosUserAndOrKuduClient(ProcessContext context) throws LoginException {
createKerberosUser(context);
createKuduClient(context);
}
protected void createKerberosUser(ProcessContext context) throws LoginException {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue(); final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
if (credentialsService != null) { if (credentialsService != null) {
kerberosUser = loginKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context); kerberosUser = createKerberosKeytabUser(credentialsService.getPrincipal(), credentialsService.getKeytab(), context);
kerberosUser.login(); // login creates the kudu client as well
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) { } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context); kerberosUser = createKerberosPasswordUser(kerberosPrincipal, kerberosPassword, context);
kerberosUser.login(); // login creates the kudu client as well
} else {
createKuduClient(context);
} }
} }
@ -202,6 +188,15 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
.build(); .build();
} }
protected void executeOnKuduClient(Consumer<KuduClient> actionOnKuduClient) {
kuduClientReadLock.lock();
try {
actionOnKuduClient.accept(kuduClient);
} finally {
kuduClientReadLock.unlock();
}
}
protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException { protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException {
final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush(); final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();
@ -215,38 +210,30 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
} }
} }
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException { protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) {
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab) { return new KerberosKeytabUser(principal, keytab) {
@Override @Override
public synchronized boolean checkTGTAndRelogin() throws LoginException { public synchronized void login() throws LoginException {
boolean didRelogin = super.checkTGTAndRelogin(); if (!isLoggedIn()) {
super.login();
if (didRelogin) {
createKuduClient(context); createKuduClient(context);
} }
return didRelogin;
} }
}; };
kerberosUser.login();
return kerberosUser;
} }
protected KerberosUser loginKerberosPasswordUser(final String principal, final String password, ProcessContext context) throws LoginException { protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) {
final KerberosUser kerberosUser = new KerberosPasswordUser(principal, password) { return new KerberosPasswordUser(principal, password) {
@Override @Override
public synchronized boolean checkTGTAndRelogin() throws LoginException { public synchronized void login() throws LoginException {
boolean didRelogin = super.checkTGTAndRelogin(); if (!isLoggedIn()) {
super.login();
if (didRelogin) {
createKuduClient(context); createKuduClient(context);
} }
return didRelogin;
} }
}; };
kerberosUser.login();
return kerberosUser;
} }
@Override @Override

View File

@ -259,11 +259,11 @@ public class PutKudu extends AbstractKuduProcessor {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKerberosUserAndKuduClient(context); createKerberosUserAndOrKuduClient(context);
} }
@Override @Override
protected void onTrigger(final ProcessContext context, final ProcessSession session, KuduClient kuduClient) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(ffbatch); final List<FlowFile> flowFiles = session.get(ffbatch);
if (flowFiles.isEmpty()) { if (flowFiles.isEmpty()) {
return; return;
@ -271,12 +271,12 @@ public class PutKudu extends AbstractKuduProcessor {
final KerberosUser user = getKerberosUser(); final KerberosUser user = getKerberosUser();
if (user == null) { if (user == null) {
trigger(context, session, flowFiles, kuduClient); executeOnKuduClient(kuduClient -> trigger(context, session, flowFiles, kuduClient));
return; return;
} }
final PrivilegedExceptionAction<Void> privilegedAction = () -> { final PrivilegedExceptionAction<Void> privilegedAction = () -> {
trigger(context, session, flowFiles, kuduClient); executeOnKuduClient(kuduClient -> trigger(context, session, flowFiles, kuduClient));
return null; return null;
}; };

View File

@ -26,12 +26,9 @@ import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert; import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.Update; import org.apache.kudu.client.Update;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import javax.security.auth.login.LoginException;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.security.PrivilegedActionException; import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -39,6 +36,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -103,7 +101,7 @@ public class MockPutKudu extends PutKudu {
} }
@Override @Override
protected void onTrigger(ProcessContext context, ProcessSession session, KuduClient kuduClient) throws ProcessException { protected void executeOnKuduClient(Consumer<KuduClient> actionOnKuduClient) {
final KuduClient client = mock(KuduClient.class); final KuduClient client = mock(KuduClient.class);
try { try {
@ -114,7 +112,7 @@ public class MockPutKudu extends PutKudu {
throw new AssertionError(e); throw new AssertionError(e);
} }
super.onTrigger(context, session, client); actionOnKuduClient.accept(client);
} }
public boolean loggedIn() { public boolean loggedIn() {
@ -126,12 +124,12 @@ public class MockPutKudu extends PutKudu {
} }
@Override @Override
protected KerberosUser loginKerberosKeytabUser(final String principal, final String keytab, ProcessContext context) throws LoginException { protected KerberosUser createKerberosKeytabUser(String principal, String keytab, ProcessContext context) {
return createMockKerberosUser(principal); return createMockKerberosUser(principal);
} }
@Override @Override
protected KerberosUser loginKerberosPasswordUser(String principal, String password, ProcessContext context) throws LoginException { protected KerberosUser createKerberosPasswordUser(String principal, String password, ProcessContext context) {
return createMockKerberosUser(principal); return createMockKerberosUser(principal);
} }