From 62e3cfc6291d1f30ac92f29cf790336644205919 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 10 Nov 2015 07:58:28 -0500 Subject: [PATCH] NIFI-1024 NIFI-1062 Fixed PutHDFS processor to properly route failures. Ensured that during put failures the FlowFile is routed to 'failure' relationship. Added validation test Re-enabled previously ignored test. Signed-off-by: Bryan Bende --- .../nifi/processors/hadoop/PutHDFS.java | 2 +- .../nifi/processors/hadoop/PutHDFSTest.java | 62 ++++++++++++++++--- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index bedf1b9458..4b929bdcb7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -345,7 +345,7 @@ public class PutHDFS extends AbstractHadoopProcessor { } } getLogger().error("Failed to write to HDFS due to {}", t); - session.rollback(); + session.transfer(flowFile, REL_FAILURE); context.yield(); } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 0b5fee8539..2eff5c341f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -16,30 +16,34 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.PutHDFS; - import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; public class PutHDFSTest { @@ -154,11 +158,12 @@ public class PutHDFSTest { } // The following only seems to work from cygwin...something about not finding the 'chmod' command. - @Ignore + @Test public void testPutFile() throws IOException { TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setValidateExpressionUsage(false); FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1"); Map attributes = new HashMap(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); @@ -168,6 +173,43 @@ public class PutHDFSTest { Configuration config = new Configuration(); FileSystem fs = FileSystem.get(config); + + List failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertTrue(failedFlowFiles.isEmpty()); + assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); } + + @Test + public void testPutFileWithException() throws IOException { + String dirName = "target/testPutFileWrongPermissions"; + File file = new File(dirName); + file.mkdirs(); + Configuration config = new Configuration(); + FileSystem fs = FileSystem.get(config); + Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + // modify permissions to ensure no one can write to this directory, + // forcing IOException downstream + fs.setPermission(p, new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ)); + + TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, dirName); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setValidateExpressionUsage(false); + FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1"); + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + fis.close(); + + List failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertFalse(failedFlowFiles.isEmpty()); + + fs.setPermission(p, new FsPermission(FsAction.EXECUTE, FsAction.EXECUTE, FsAction.EXECUTE)); + fs.delete(p, true); + } }