NIFI-1184 Mock FileSystem for PutHDFSTest

This closes #2892

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
uday 2018-07-15 02:24:58 +05:30 committed by Mike Thomsen
parent 4a25402c1a
commit 57ae9b65a0

View File

@ -17,9 +17,14 @@
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -36,12 +41,14 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -51,7 +58,6 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -59,29 +65,19 @@ public class PutHDFSTest {
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@BeforeClass
public static void setUpClass() throws Exception {
/*
* Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality
* provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules
* to support running these tests in Windows environment, therefore they are ignored. You can also get more info from this StackOverflow thread [2]
*
* [1] https://wiki.apache.org/hadoop/Hadoop2OnWindows
* [2] http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path
*/
}
private FileSystem mockFileSystem;
@Before
public void setup() {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
mockFileSystem = new MockFileSystem();
}
@Test
public void testValidators() {
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
Collection<ValidationResult> results;
ProcessContext pc;
@ -119,7 +115,7 @@ public class PutHDFSTest {
assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero"));
}
proc = new TestablePutHDFS(kerberosProperties);
proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
runner = TestRunners.newTestRunner(proc);
results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "/target");
@ -134,7 +130,7 @@ public class PutHDFSTest {
assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero"));
}
proc = new TestablePutHDFS(kerberosProperties);
proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
runner = TestRunners.newTestRunner(proc);
results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "/target");
@ -149,7 +145,7 @@ public class PutHDFSTest {
assertTrue(vr.toString().contains("is invalid because octal umask [-1] cannot be negative"));
}
proc = new TestablePutHDFS(kerberosProperties);
proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
runner = TestRunners.newTestRunner(proc);
results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "/target");
@ -178,7 +174,7 @@ public class PutHDFSTest {
}
results = new HashSet<>();
proc = new TestablePutHDFS(kerberosProperties);
proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "/target");
runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
@ -196,9 +192,8 @@ public class PutHDFSTest {
@Test
public void testPutFile() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
@ -209,9 +204,6 @@ public class PutHDFSTest {
runner.run();
}
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertTrue(failedFlowFiles.isEmpty());
@ -219,7 +211,7 @@ public class PutHDFSTest {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
@ -234,9 +226,8 @@ public class PutHDFSTest {
@Test
public void testPutFileWithCompression() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
@ -248,9 +239,6 @@ public class PutHDFSTest {
runner.run();
}
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertTrue(failedFlowFiles.isEmpty());
@ -258,7 +246,7 @@ public class PutHDFSTest {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1.gz")));
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1.gz")));
assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
}
@ -266,14 +254,11 @@ public class PutHDFSTest {
@Test
public void testPutFileWithException() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
String dirName = "target/testPutFileWrongPermissions";
File file = new File(dirName);
file.mkdirs();
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());
final KerberosProperties testKerberosProperties = kerberosProperties;
TestRunner runner = TestRunners.newTestRunner(new PutHDFS() {
@ -302,28 +287,24 @@ public class PutHDFSTest {
assertFalse(failedFlowFiles.isEmpty());
assertTrue(failedFlowFiles.get(0).isPenalized());
fs.delete(p, true);
mockFileSystem.delete(p, true);
}
@Test
public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>();
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertTrue(failedFlowFiles.isEmpty());
@ -331,7 +312,7 @@ public class PutHDFSTest {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
assertTrue(mockFileSystem.exists(new Path("target/data_test/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/data_test", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
}
@ -339,9 +320,8 @@ public class PutHDFSTest {
@Test
public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
// this value somehow causes NiFi to not even recognize the EL, and thus it returns successfully from calling
@ -362,9 +342,8 @@ public class PutHDFSTest {
@Test
public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
// the validator should pick up the invalid EL
runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):foo()}");
@ -372,22 +351,130 @@ public class PutHDFSTest {
runner.assertNotValid();
}
private boolean isNotWindows() {
return !System.getProperty("os.name").startsWith("Windows");
}
private static class TestablePutHDFS extends PutHDFS {
private class TestablePutHDFS extends PutHDFS {
private KerberosProperties testKerberosProperties;
private FileSystem fileSystem;
public TestablePutHDFS(KerberosProperties testKerberosProperties) {
public TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) {
this.testKerberosProperties = testKerberosProperties;
this.fileSystem = fileSystem;
}
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerberosProperties;
}
@Override
protected FileSystem getFileSystem(Configuration config) throws IOException {
return fileSystem;
}
@Override
protected FileSystem getFileSystem() {
return fileSystem;
}
}
private class MockFileSystem extends FileSystem {
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
@Override
public URI getUri() {
return URI.create("file:///");
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) throws IOException {
pathToStatus.put(f, newFile(f));
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
return null;
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
if (pathToStatus.containsKey(src)) {
pathToStatus.put(dst, pathToStatus.remove(src));
} else {
return false;
}
return true;
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
if (pathToStatus.containsKey(f)) {
pathToStatus.remove(f);
} else {
return false;
}
return true;
}
@Override
public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
return null;
}
@Override
public void setWorkingDirectory(final Path new_dir) {
}
@Override
public Path getWorkingDirectory() {
return new Path(new File(".").getAbsolutePath());
}
@Override
public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
return false;
}
@Override
public boolean mkdirs(Path f) throws IOException {
pathToStatus.put(f, newDir(f));
return true;
}
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
final FileStatus fileStatus = pathToStatus.get(f);
if (fileStatus == null) throw new FileNotFoundException();
return fileStatus;
}
@Override
public boolean exists(Path f) throws IOException {
return pathToStatus.containsKey(f);
}
private FileStatus newFile(Path p) {
return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0644), "owner", "group", p);
}
private FileStatus newDir(Path p) {
return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", p);
}
@Override
public long getDefaultBlockSize(Path f) {
return 33554432L;
}
}
static FsPermission perms(short p) {
return new FsPermission(p);
}
}