NIFI-13722: Kerberos ticket renewal issue due static thread pool in Iceberg library

This closes #9305.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Bathori 2024-09-09 11:06:41 +02:00 committed by Peter Turcsanyi
parent bfdf4ef597
commit a0ec73097a
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
3 changed files with 39 additions and 34 deletions

View File

@ -20,7 +20,6 @@ package org.apache.nifi.processors.iceberg;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
@ -34,12 +33,15 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.ietf.jgss.GSSException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
/**
@ -68,7 +70,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
.description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.")
.build();
protected final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
protected static final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
protected final AtomicReference<UserGroupInformation> ugiReference = new AtomicReference<>();
@OnScheduled
@ -76,27 +78,26 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
initKerberosCredentials(context);
}
protected void initKerberosCredentials(ProcessContext context) {
protected synchronized void initKerberosCredentials(ProcessContext context) {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
if (kerberosUserService != null) {
final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
kerberosUserReference.set(kerberosUser);
KerberosUser kerberosUser;
if (kerberosUserReference.get() == null) {
kerberosUser = kerberosUserService.createKerberosUser();
} else {
kerberosUser = kerberosUserReference.get();
}
try {
ugiReference.set(getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser));
} catch (IOException e) {
throw new ProcessException("Kerberos authentication failed", e);
}
kerberosUserReference.set(kerberosUser);
}
}
@OnStopped
public void onStopped() {
kerberosUserReference.set(null);
ugiReference.set(null);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
@ -115,18 +116,26 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
});
} catch (Exception e) {
getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
}
}
}
@Override
public String getClassloaderIsolationKey(PropertyContext context) {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
return kerberosUserService.getIdentifier();
try {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
return kerberosUser.getPrincipal();
}
} catch (IllegalStateException e) {
// the Kerberos controller service is disabled, therefore this part of the isolation key cannot be determined yet
}
return null;
}
@ -135,5 +144,17 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
return ugiReference.get();
}
protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessContext context) {
final Optional<GSSException> causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().info("No valid Kerberos credential found, retrying login", causeOptional.get());
kerberosUserReference.get().checkTGTAndRelogin();
session.rollback();
context.yield();
return true;
}
return false;
}
protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException;
}

View File

@ -54,7 +54,6 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.ietf.jgss.GSSException;
import java.io.InputStream;
import java.util.ArrayList;
@ -65,13 +64,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;
@ -286,13 +283,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
try {
table = loadTable(context, flowFile);
} catch (Exception e) {
final Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get());
initKerberosCredentials(context);
session.rollback();
context.yield();
} else {
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Failed to load table from catalog", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
@ -320,13 +311,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
final WriteResult result = taskWriter.complete();
appendDataFiles(context, flowFile, table, result);
} catch (Exception e) {
final Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get());
initKerberosCredentials(context);
session.rollback();
context.yield();
} else {
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Exception occurred while writing Iceberg records", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}

View File

@ -53,7 +53,6 @@ public abstract class AbstractCatalogService extends AbstractControllerService i
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
protected List<Document> parseConfigFilePaths(String configFilePaths) {