diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java index 3352959d17..8fcc8ac7d4 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java @@ -25,18 +25,19 @@ import org.apache.nifi.components.ClassloaderIsolationKeyProvider; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.security.krb.KerberosLoginException; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.services.iceberg.IcebergCatalogService; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser; import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; @@ -67,36 +68,33 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") .build(); - private volatile KerberosUser kerberosUser; - private volatile UserGroupInformation ugi; + protected final AtomicReference kerberosUserReference = new AtomicReference<>(); + protected final AtomicReference ugiReference = new AtomicReference<>(); @OnScheduled public void onScheduled(final ProcessContext context) { - final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); + initKerberosCredentials(context); + } + + protected void initKerberosCredentials(ProcessContext context) { final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); if (kerberosUserService != null) { - this.kerberosUser = kerberosUserService.createKerberosUser(); + final KerberosUser kerberosUser = kerberosUserService.createKerberosUser(); + kerberosUserReference.set(kerberosUser); try { - this.ugi = getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser); + ugiReference.set(getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser)); } catch (IOException e) { - throw new ProcessException("Kerberos Authentication failed", e); + throw new ProcessException("Kerberos authentication failed", e); } } } @OnStopped public void onStopped() { - if (kerberosUser != null) { - try { - kerberosUser.logout(); - } catch (KerberosLoginException e) { - getLogger().error("Error logging out kerberos user", e); - } finally { - kerberosUser = null; - ugi = null; - } - } + kerberosUserReference.set(null); + ugiReference.set(null); } @Override @@ -106,6 +104,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme return; } + final KerberosUser kerberosUser = kerberosUserReference.get(); if (kerberosUser == null) { doOnTrigger(context, session, flowFile); } else { @@ -132,12 +131,8 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme } private UserGroupInformation getUgi() { - try { - kerberosUser.checkTGTAndRelogin(); - } catch (KerberosLoginException e) { - throw new ProcessException("Unable to re-login with kerberos credentials for " + kerberosUser.getPrincipal(), e); - } - return ugi; + SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUserReference.get()); + return ugiReference.get(); } protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException; diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java index eead5ed61d..ba2680c37e 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java @@ -17,6 +17,7 @@ */ package org.apache.nifi.processors.iceberg; +import com.google.common.base.Throwables; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -25,6 +26,8 @@ import org.apache.nifi.processor.ProcessContext; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; import java.util.stream.Collectors; public class IcebergUtils { @@ -65,4 +68,19 @@ public class IcebergUtils { e -> context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue() )); } + + /** + * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type, + * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise. + * + * @param t The throwable to inspect for the cause. + * @return Throwable Cause + */ + public static Optional findCause(Throwable t, Class expectedCauseType, Predicate causePredicate) { + return Throwables.getCausalChain(t).stream() + .filter(expectedCauseType::isInstance) + .map(expectedCauseType::cast) + .filter(causePredicate) + .findFirst(); + } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 450fe2de7c..e459a98db4 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -53,6 +53,7 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.services.iceberg.IcebergCatalogService; +import org.ietf.jgss.GSSException; import java.io.InputStream; import java.util.ArrayList; @@ -61,11 +62,13 @@ import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause; import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; import static org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties; @@ -274,8 +277,16 @@ public class PutIceberg extends AbstractIcebergProcessor { try { table = loadTable(context, flowFile); } catch (Exception e) { - getLogger().error("Failed to load table from catalog", e); - session.transfer(session.penalize(flowFile), REL_FAILURE); + final Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get()); + initKerberosCredentials(context); + session.rollback(); + context.yield(); + } else { + getLogger().error("Failed to load table from catalog", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } return; } @@ -299,16 +310,24 @@ public class PutIceberg extends AbstractIcebergProcessor { final WriteResult result = taskWriter.complete(); appendDataFiles(context, flowFile, table, result); } catch (Exception e) { - getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e); + final Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get()); + initKerberosCredentials(context); + session.rollback(); + context.yield(); + } else { + getLogger().error("Exception occurred while writing Iceberg records", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + try { if (taskWriter != null) { abort(taskWriter.dataFiles(), table); } } catch (Exception ex) { - getLogger().error("Failed to abort uncommitted data files", ex); + getLogger().warn("Failed to abort uncommitted data files", ex); } - - session.transfer(session.penalize(flowFile), REL_FAILURE); return; }