diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java index 8fcc8ac7d4..fdea6b07d4 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java @@ -20,7 +20,6 @@ package org.apache.nifi.processors.iceberg; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.ClassloaderIsolationKeyProvider; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.context.PropertyContext; @@ -34,12 +33,15 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.services.iceberg.IcebergCatalogService; +import org.ietf.jgss.GSSException; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser; +import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause; import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; /** @@ -68,7 +70,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") .build(); - protected final AtomicReference kerberosUserReference = new AtomicReference<>(); + protected static final AtomicReference kerberosUserReference = new AtomicReference<>(); protected final AtomicReference ugiReference = new AtomicReference<>(); @OnScheduled @@ -76,27 +78,26 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme initKerberosCredentials(context); } - protected void initKerberosCredentials(ProcessContext context) { + protected synchronized void initKerberosCredentials(ProcessContext context) { final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); if (kerberosUserService != null) { - final KerberosUser kerberosUser = kerberosUserService.createKerberosUser(); - kerberosUserReference.set(kerberosUser); + KerberosUser kerberosUser; + if (kerberosUserReference.get() == null) { + kerberosUser = kerberosUserService.createKerberosUser(); + } else { + kerberosUser = kerberosUserReference.get(); + } try { ugiReference.set(getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser)); } catch (IOException e) { throw new ProcessException("Kerberos authentication failed", e); } + kerberosUserReference.set(kerberosUser); } } - @OnStopped - public void onStopped() { - kerberosUserReference.set(null); - ugiReference.set(null); - } - @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final FlowFile flowFile = session.get(); @@ -115,18 +116,26 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme }); } catch (Exception e) { - getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e); - session.transfer(session.penalize(flowFile), REL_FAILURE); + if (!handleAuthErrors(e, session, context)) { + getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } } } } @Override public String getClassloaderIsolationKey(PropertyContext context) { - final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); - if (kerberosUserService != null) { - return kerberosUserService.getIdentifier(); + try { + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + if (kerberosUserService != null) { + final KerberosUser kerberosUser = kerberosUserService.createKerberosUser(); + return kerberosUser.getPrincipal(); + } + } catch (IllegalStateException e) { + // the Kerberos controller service is disabled, therefore this part of the isolation key cannot be determined yet } + return null; } @@ -135,5 +144,17 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme return ugiReference.get(); } + protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessContext context) { + final Optional causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().info("No valid Kerberos credential found, retrying login", causeOptional.get()); + kerberosUserReference.get().checkTGTAndRelogin(); + session.rollback(); + context.yield(); + return true; + } + return false; + } + protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException; } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 0145124dcf..7f9153fd84 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -54,7 +54,6 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.services.iceberg.IcebergCatalogService; -import org.ietf.jgss.GSSException; import java.io.InputStream; import java.util.ArrayList; @@ -65,13 +64,11 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause; import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; import static org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties; @@ -286,13 +283,7 @@ public class PutIceberg extends AbstractIcebergProcessor { try { table = loadTable(context, flowFile); } catch (Exception e) { - final Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); - if (causeOptional.isPresent()) { - getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get()); - initKerberosCredentials(context); - session.rollback(); - context.yield(); - } else { + if (!handleAuthErrors(e, session, context)) { getLogger().error("Failed to load table from catalog", e); session.transfer(session.penalize(flowFile), REL_FAILURE); } @@ -320,13 +311,7 @@ public class PutIceberg extends AbstractIcebergProcessor { final WriteResult result = taskWriter.complete(); appendDataFiles(context, flowFile, table, result); } catch (Exception e) { - final Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); - if (causeOptional.isPresent()) { - getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get()); - initKerberosCredentials(context); - session.rollback(); - context.yield(); - } else { + if (!handleAuthErrors(e, session, context)) { getLogger().error("Exception occurred while writing Iceberg records", e); session.transfer(session.penalize(flowFile), REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java index 7afc68a6a0..22c400c754 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java @@ -53,7 +53,6 @@ public abstract class AbstractCatalogService extends AbstractControllerService i .required(false) .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .dynamicallyModifiesClasspath(true) .build(); protected List parseConfigFilePaths(String configFilePaths) {