NIFI-5557: handling expired ticket by rollback and penalization

This commit is contained in:
Endre Zoltan Kovacs 2018-08-28 10:47:59 +02:00 committed by Jeff Storck
parent 2e1005e884
commit 0f55cbfb9f
1 changed files with 33 additions and 0 deletions

View File

@ -49,6 +49,9 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;
import com.google.common.base.Throwables;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -60,8 +63,11 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
/** /**
* This processor copies FlowFiles to HDFS. * This processor copies FlowFiles to HDFS.
@ -373,6 +379,17 @@ public class PutHDFS extends AbstractHadoopProcessor {
session.transfer(putFlowFile, REL_SUCCESS); session.transfer(putFlowFile, REL_SUCCESS);
} catch (final IOException e) {
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("An error occurred while connecting to HDFS. "
+ "Rolling back session, and penalizing flow file {}",
new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
session.rollback(true);
} else {
getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
session.transfer(putFlowFile, REL_FAILURE);
}
} catch (final Throwable t) { } catch (final Throwable t) {
if (tempDotCopyFile != null) { if (tempDotCopyFile != null) {
try { try {
@ -391,6 +408,22 @@ public class PutHDFS extends AbstractHadoopProcessor {
}); });
} }
/**
* Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
* and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
* @param t The throwable to inspect for the cause.
* @return
*/
private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
return causalChain
.filter(expectedCauseType::isInstance)
.map(expectedCauseType::cast)
.filter(causePredicate)
.findFirst();
}
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
try { try {
// Change owner and group of file if configured to do so // Change owner and group of file if configured to do so