diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 3f08da014e..f1e8661366 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -429,7 +429,8 @@ public class GetHDFS extends AbstractHadoopProcessor { final FileSystem hdfs = getFileSystem(); final Path directoryPath = getNormalizedPath(context, DIRECTORY); - if (!hdfs.exists(directoryPath)) { + final boolean directoryExists = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.exists(directoryPath)); + if (!directoryExists) { context.yield(); getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath}); } else { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index fcba96ed58..91781a1200 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -17,7 +17,10 @@ package org.apache.nifi.processors.hadoop; import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; @@ -34,17 +37,24 @@ import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.HashSet; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class GetHDFSTest { @@ -265,6 +275,62 @@ public class GetHDFSTest { runner.assertNotValid(); } + @Test + public void testDirectoryCheckWrappedInUGICallWhenDirectoryExists() throws IOException, InterruptedException { + // GIVEN, WHEN + boolean directoryExists = true; + + // THEN + directoryExistsWrappedInUGICall(directoryExists); + } + + @Test + public void testDirectoryCheckWrappedInUGICallWhenDirectoryDoesNotExist() throws IOException, InterruptedException { + // GIVEN, WHEN + boolean directoryExists = false; + + // THEN + directoryExistsWrappedInUGICall(directoryExists); + } + + private void directoryExistsWrappedInUGICall(boolean directoryExists) throws IOException, InterruptedException { + // GIVEN + FileSystem mockFileSystem = mock(FileSystem.class); + UserGroupInformation mockUserGroupInformation = mock(UserGroupInformation.class); + + GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, mockFileSystem, mockUserGroupInformation); + TestRunner runner = TestRunners.newTestRunner(testSubject); + runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata"); + + // WHEN + Answer answer = new Answer() { + private int callCounter = 0; + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + final Object result; + if (callCounter == 0) { + when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists); + result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run(); + verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class)); + verify(mockFileSystem).exists(any(Path.class)); + } else { + when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]); + result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run(); + verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class)); + verify(mockFileSystem).listStatus(any(Path.class)); + } + ++callCounter; + return result; + } + }; + when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(answer); + runner.run(); + + // THEN + verify(mockFileSystem).getUri(); + verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation); + } + private static class TestableGetHDFS extends GetHDFS { private final KerberosProperties testKerberosProperties; @@ -279,4 +345,24 @@ public class GetHDFSTest { } } + private static class TestableGetHDFSForUGI extends TestableGetHDFS { + private FileSystem mockFileSystem; + private UserGroupInformation mockUserGroupInformation; + + public TestableGetHDFSForUGI(KerberosProperties testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation mockUserGroupInformation) { + super(testKerberosProperties); + this.mockFileSystem = mockFileSystem; + this.mockUserGroupInformation = mockUserGroupInformation; + } + + @Override + protected FileSystem getFileSystem() { + return mockFileSystem; + } + + @Override + protected UserGroupInformation getUserGroupInformation() { + return mockUserGroupInformation; + } + } }