diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 91fd204ce1..6e71331fee 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -49,6 +49,9 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import org.ietf.jgss.GSSException; + +import com.google.common.base.Throwables; import java.io.BufferedInputStream; import java.io.FileNotFoundException; @@ -60,8 +63,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; /** * This processor copies FlowFiles to HDFS. @@ -373,6 +379,17 @@ public class PutHDFS extends AbstractHadoopProcessor { session.transfer(putFlowFile, REL_SUCCESS); + } catch (final IOException e) { + Optional causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor()); + if (causeOptional.isPresent()) { + getLogger().warn("An error occurred while connecting to HDFS. " + + "Rolling back session, and penalizing flow file {}", + new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()}); + session.rollback(true); + } else { + getLogger().error("Failed to access HDFS due to {}", new Object[]{e}); + session.transfer(putFlowFile, REL_FAILURE); + } } catch (final Throwable t) { if (tempDotCopyFile != null) { try { @@ -391,6 +408,22 @@ public class PutHDFS extends AbstractHadoopProcessor { }); } + + /** + * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type, + * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise. + * @param t The throwable to inspect for the cause. + * @return + */ + private Optional findCause(Throwable t, Class expectedCauseType, Predicate causePredicate) { + Stream causalChain = Throwables.getCausalChain(t).stream(); + return causalChain + .filter(expectedCauseType::isInstance) + .map(expectedCauseType::cast) + .filter(causePredicate) + .findFirst(); + } + protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { try { // Change owner and group of file if configured to do so