mirror of https://github.com/apache/nifi.git
NIFI-5557 Added test in PutHDFSTest for IOException with a nested GSSException
Resolved most of the code warnings in PutHDFSTest This closes #2971.
This commit is contained in:
parent
0f55cbfb9f
commit
e24388aa7f
|
@ -30,19 +30,21 @@ import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.MockProcessContext;
|
import org.apache.nifi.util.MockProcessContext;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.ietf.jgss.GSSException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.security.sasl.SaslException;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
@ -57,20 +59,16 @@ import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
public class PutHDFSTest {
|
public class PutHDFSTest {
|
||||||
|
|
||||||
private NiFiProperties mockNiFiProperties;
|
|
||||||
private KerberosProperties kerberosProperties;
|
private KerberosProperties kerberosProperties;
|
||||||
private FileSystem mockFileSystem;
|
private FileSystem mockFileSystem;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
mockNiFiProperties = mock(NiFiProperties.class);
|
|
||||||
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
|
|
||||||
kerberosProperties = new KerberosProperties(null);
|
kerberosProperties = new KerberosProperties(null);
|
||||||
mockFileSystem = new MockFileSystem();
|
mockFileSystem = new MockFileSystem();
|
||||||
}
|
}
|
||||||
|
@ -191,14 +189,12 @@ public class PutHDFSTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFile() throws IOException {
|
public void testPutFile() throws IOException {
|
||||||
// Refer to comment in the BeforeClass method for an explanation
|
|
||||||
|
|
||||||
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||||
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
Map<String, String> attributes = new HashMap<String, String>();
|
Map<String, String> attributes = new HashMap<>();
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
runner.enqueue(fis, attributes);
|
runner.enqueue(fis, attributes);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
@ -225,15 +221,13 @@ public class PutHDFSTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFileWithCompression() throws IOException {
|
public void testPutFileWithCompression() throws IOException {
|
||||||
// Refer to comment in the BeforeClass method for an explanation
|
|
||||||
|
|
||||||
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||||
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP");
|
runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP");
|
||||||
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
Map<String, String> attributes = new HashMap<String, String>();
|
Map<String, String> attributes = new HashMap<>();
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
runner.enqueue(fis, attributes);
|
runner.enqueue(fis, attributes);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
@ -252,31 +246,60 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFileWithException() throws IOException {
|
public void testPutFileWithGSSException() throws IOException {
|
||||||
// Refer to comment in the BeforeClass method for an explanation
|
FileSystem noCredentialsFileSystem = new MockFileSystem() {
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path path) throws IOException {
|
||||||
|
throw new IOException("ioe", new SaslException("sasle", new GSSException(13)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, noCredentialsFileSystem));
|
||||||
|
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||||
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
|
|
||||||
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
|
runner.enqueue(fis, attributes);
|
||||||
|
runner.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
// assert no flowfiles transferred to outgoing relationships
|
||||||
|
runner.assertTransferCount(PutHDFS.REL_SUCCESS, 0);
|
||||||
|
runner.assertTransferCount(PutHDFS.REL_FAILURE, 0);
|
||||||
|
// assert the input flowfile was penalized
|
||||||
|
List<MockFlowFile> penalizedFlowFiles = runner.getPenalizedFlowFiles();
|
||||||
|
assertEquals(1, penalizedFlowFiles.size());
|
||||||
|
assertEquals("randombytes-1", penalizedFlowFiles.iterator().next().getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
// assert the processor's queue is not empty
|
||||||
|
assertFalse(runner.isQueueEmpty());
|
||||||
|
assertEquals(1, runner.getQueueSize().getObjectCount());
|
||||||
|
// assert the input file is back on the queue
|
||||||
|
ProcessSession session = runner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile queuedFlowFile = session.get();
|
||||||
|
assertNotNull(queuedFlowFile);
|
||||||
|
assertEquals("randombytes-1", queuedFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
session.rollback();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutFileWithProcessException() throws IOException {
|
||||||
String dirName = "target/testPutFileWrongPermissions";
|
String dirName = "target/testPutFileWrongPermissions";
|
||||||
File file = new File(dirName);
|
File file = new File(dirName);
|
||||||
file.mkdirs();
|
file.mkdirs();
|
||||||
Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());
|
Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());
|
||||||
|
|
||||||
final KerberosProperties testKerberosProperties = kerberosProperties;
|
TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem) {
|
||||||
TestRunner runner = TestRunners.newTestRunner(new PutHDFS() {
|
|
||||||
@Override
|
@Override
|
||||||
protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) {
|
protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) {
|
||||||
throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling");
|
throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
|
||||||
return testKerberosProperties;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
runner.setProperty(PutHDFS.DIRECTORY, dirName);
|
runner.setProperty(PutHDFS.DIRECTORY, dirName);
|
||||||
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
|
|
||||||
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
Map<String, String> attributes = new HashMap<String, String>();
|
Map<String, String> attributes = new HashMap<>();
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
runner.enqueue(fis, attributes);
|
runner.enqueue(fis, attributes);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
@ -292,13 +315,11 @@ public class PutHDFSTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException {
|
public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException {
|
||||||
// Refer to comment in the BeforeClass method for an explanation
|
|
||||||
|
|
||||||
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}");
|
runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}");
|
||||||
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
Map<String, String> attributes = new HashMap<>();
|
Map<String, String> attributes = new HashMap<>();
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
runner.enqueue(fis, attributes);
|
runner.enqueue(fis, attributes);
|
||||||
|
@ -319,8 +340,6 @@ public class PutHDFSTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException {
|
public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException {
|
||||||
// Refer to comment in the BeforeClass method for an explanation
|
|
||||||
|
|
||||||
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
|
||||||
|
@ -329,8 +348,8 @@ public class PutHDFSTest {
|
||||||
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
|
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
|
||||||
|
|
||||||
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
Map<String, String> attributes = new HashMap<String, String>();
|
Map<String, String> attributes = new HashMap<>();
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
runner.enqueue(fis, attributes);
|
runner.enqueue(fis, attributes);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
@ -340,9 +359,7 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException {
|
public void testPutFileWhenDirectoryUsesInvalidEL() {
|
||||||
// Refer to comment in the BeforeClass method for an explanation
|
|
||||||
|
|
||||||
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
// the validator should pick up the invalid EL
|
// the validator should pick up the invalid EL
|
||||||
|
@ -356,7 +373,7 @@ public class PutHDFSTest {
|
||||||
private KerberosProperties testKerberosProperties;
|
private KerberosProperties testKerberosProperties;
|
||||||
private FileSystem fileSystem;
|
private FileSystem fileSystem;
|
||||||
|
|
||||||
public TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) {
|
TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) {
|
||||||
this.testKerberosProperties = testKerberosProperties;
|
this.testKerberosProperties = testKerberosProperties;
|
||||||
this.fileSystem = fileSystem;
|
this.fileSystem = fileSystem;
|
||||||
}
|
}
|
||||||
|
@ -367,7 +384,7 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected FileSystem getFileSystem(Configuration config) throws IOException {
|
protected FileSystem getFileSystem(Configuration config) {
|
||||||
return fileSystem;
|
return fileSystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,24 +403,24 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
|
public FSDataInputStream open(final Path f, final int bufferSize) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
|
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
|
||||||
final long blockSize, final Progressable progress) throws IOException {
|
final long blockSize, final Progressable progress) {
|
||||||
pathToStatus.put(f, newFile(f));
|
pathToStatus.put(f, newFile(f));
|
||||||
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
|
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
|
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean rename(final Path src, final Path dst) throws IOException {
|
public boolean rename(final Path src, final Path dst) {
|
||||||
if (pathToStatus.containsKey(src)) {
|
if (pathToStatus.containsKey(src)) {
|
||||||
pathToStatus.put(dst, pathToStatus.remove(src));
|
pathToStatus.put(dst, pathToStatus.remove(src));
|
||||||
} else {
|
} else {
|
||||||
|
@ -413,7 +430,7 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean delete(final Path f, final boolean recursive) throws IOException {
|
public boolean delete(final Path f, final boolean recursive) {
|
||||||
if (pathToStatus.containsKey(f)) {
|
if (pathToStatus.containsKey(f)) {
|
||||||
pathToStatus.remove(f);
|
pathToStatus.remove(f);
|
||||||
} else {
|
} else {
|
||||||
|
@ -423,7 +440,7 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
|
public FileStatus[] listStatus(final Path f) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,12 +455,12 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
|
public boolean mkdirs(final Path f, final FsPermission permission) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean mkdirs(Path f) throws IOException {
|
public boolean mkdirs(Path f) {
|
||||||
pathToStatus.put(f, newDir(f));
|
pathToStatus.put(f, newDir(f));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -456,7 +473,7 @@ public class PutHDFSTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean exists(Path f) throws IOException {
|
public boolean exists(Path f) {
|
||||||
return pathToStatus.containsKey(f);
|
return pathToStatus.containsKey(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue