NIFI-1024 NIFI-1062 Fixed PutHDFS processor to properly route failures. Ensured that during put failures the FlowFile is routed to 'failure' relationship. Added validation test Re-enabled previously ignored test.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Oleg Zhurakousky 2015-11-10 07:58:28 -05:00 committed by Bryan Bende
parent 6c510fae80
commit 62e3cfc629
2 changed files with 53 additions and 11 deletions

View File

@ -345,7 +345,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
} }
} }
getLogger().error("Failed to write to HDFS due to {}", t); getLogger().error("Failed to write to HDFS due to {}", t);
session.rollback(); session.transfer(flowFile, REL_FAILURE);
context.yield(); context.yield();
} }
} }

View File

@ -16,30 +16,34 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.nifi.processors.hadoop.PutHDFS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class PutHDFSTest { public class PutHDFSTest {
@ -154,11 +158,12 @@ public class PutHDFSTest {
} }
// The following only seems to work from cygwin...something about not finding the 'chmod' command. // The following only seems to work from cygwin...something about not finding the 'chmod' command.
@Ignore @Test
public void testPutFile() throws IOException { public void testPutFile() throws IOException {
TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); TestRunner runner = TestRunners.newTestRunner(PutHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
runner.setValidateExpressionUsage(false);
FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1"); FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");
Map<String, String> attributes = new HashMap<String, String>(); Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
@ -168,6 +173,43 @@ public class PutHDFSTest {
Configuration config = new Configuration(); Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config); FileSystem fs = FileSystem.get(config);
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertTrue(failedFlowFiles.isEmpty());
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
} }
@Test
public void testPutFileWithException() throws IOException {
String dirName = "target/testPutFileWrongPermissions";
File file = new File(dirName);
file.mkdirs();
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
// modify permissions to ensure no one can write to this directory,
// forcing IOException downstream
fs.setPermission(p, new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
TestRunner runner = TestRunners.newTestRunner(PutHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, dirName);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
runner.setValidateExpressionUsage(false);
FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");
Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
fis.close();
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertFalse(failedFlowFiles.isEmpty());
fs.setPermission(p, new FsPermission(FsAction.EXECUTE, FsAction.EXECUTE, FsAction.EXECUTE));
fs.delete(p, true);
}
} }