NIFI-12939: Retry Kerberos login on authentication failure in Iceberg processors

Update rollback and context yield calls

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8553
This commit is contained in:
Mark Bathori 2024-03-24 20:57:32 +01:00 committed by Matt Burgess
parent 0c01dfe585
commit 2a5b61136b
3 changed files with 61 additions and 29 deletions

View File

@ -25,18 +25,19 @@ import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
@ -67,36 +68,33 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
.description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.")
.build();
private volatile KerberosUser kerberosUser;
private volatile UserGroupInformation ugi;
protected final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
protected final AtomicReference<UserGroupInformation> ugiReference = new AtomicReference<>();
@OnScheduled
public void onScheduled(final ProcessContext context) {
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
initKerberosCredentials(context);
}
protected void initKerberosCredentials(ProcessContext context) {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
if (kerberosUserService != null) {
this.kerberosUser = kerberosUserService.createKerberosUser();
final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
kerberosUserReference.set(kerberosUser);
try {
this.ugi = getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser);
ugiReference.set(getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser));
} catch (IOException e) {
throw new ProcessException("Kerberos Authentication failed", e);
throw new ProcessException("Kerberos authentication failed", e);
}
}
}
@OnStopped
public void onStopped() {
if (kerberosUser != null) {
try {
kerberosUser.logout();
} catch (KerberosLoginException e) {
getLogger().error("Error logging out kerberos user", e);
} finally {
kerberosUser = null;
ugi = null;
}
}
kerberosUserReference.set(null);
ugiReference.set(null);
}
@Override
@ -106,6 +104,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
return;
}
final KerberosUser kerberosUser = kerberosUserReference.get();
if (kerberosUser == null) {
doOnTrigger(context, session, flowFile);
} else {
@ -132,12 +131,8 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
}
private UserGroupInformation getUgi() {
try {
kerberosUser.checkTGTAndRelogin();
} catch (KerberosLoginException e) {
throw new ProcessException("Unable to re-login with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
return ugi;
SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUserReference.get());
return ugiReference.get();
}
protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException;

View File

@ -17,6 +17,7 @@
*/
package org.apache.nifi.processors.iceberg;
import com.google.common.base.Throwables;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -25,6 +26,8 @@ import org.apache.nifi.processor.ProcessContext;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class IcebergUtils {
@ -65,4 +68,19 @@ public class IcebergUtils {
e -> context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue()
));
}
/**
* Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
* and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
*
* @param t The throwable to inspect for the cause.
* @return Throwable Cause
*/
public static <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
return Throwables.getCausalChain(t).stream()
.filter(expectedCauseType::isInstance)
.map(expectedCauseType::cast)
.filter(causePredicate)
.findFirst();
}
}

View File

@ -53,6 +53,7 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.ietf.jgss.GSSException;
import java.io.InputStream;
import java.util.ArrayList;
@ -61,11 +62,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;
@ -274,8 +277,16 @@ public class PutIceberg extends AbstractIcebergProcessor {
try {
table = loadTable(context, flowFile);
} catch (Exception e) {
final Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get());
initKerberosCredentials(context);
session.rollback();
context.yield();
} else {
getLogger().error("Failed to load table from catalog", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
return;
}
@ -299,16 +310,24 @@ public class PutIceberg extends AbstractIcebergProcessor {
final WriteResult result = taskWriter.complete();
appendDataFiles(context, flowFile, table, result);
} catch (Exception e) {
getLogger().error("Exception occurred while writing iceberg records. Removing uncommitted data files", e);
final Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get());
initKerberosCredentials(context);
session.rollback();
context.yield();
} else {
getLogger().error("Exception occurred while writing Iceberg records", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
try {
if (taskWriter != null) {
abort(taskWriter.dataFiles(), table);
}
} catch (Exception ex) {
getLogger().error("Failed to abort uncommitted data files", ex);
getLogger().warn("Failed to abort uncommitted data files", ex);
}
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}