mirror of https://github.com/apache/nifi.git
NIFI-8787: Wrapped hdfs.exists() call in UGI.doAs() in GetHDFS processor (#5217)
* NIFI-8787: Wrapped hdfs.exists() call in UGI.doAs() in GetHDFS processor * NIFI-8787: Unit tests updated
This commit is contained in:
parent
00f385b51b
commit
4477921665
|
@ -429,7 +429,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
|||
final FileSystem hdfs = getFileSystem();
|
||||
final Path directoryPath = getNormalizedPath(context, DIRECTORY);
|
||||
|
||||
if (!hdfs.exists(directoryPath)) {
|
||||
final boolean directoryExists = getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(directoryPath));
|
||||
if (!directoryExists) {
|
||||
context.yield();
|
||||
getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
|
||||
} else {
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.hadoop.KerberosProperties;
|
||||
|
@ -34,17 +37,24 @@ import org.junit.Assume;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class GetHDFSTest {
|
||||
|
@ -265,6 +275,62 @@ public class GetHDFSTest {
|
|||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectoryCheckWrappedInUGICallWhenDirectoryExists() throws IOException, InterruptedException {
|
||||
// GIVEN, WHEN
|
||||
boolean directoryExists = true;
|
||||
|
||||
// THEN
|
||||
directoryExistsWrappedInUGICall(directoryExists);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectoryCheckWrappedInUGICallWhenDirectoryDoesNotExist() throws IOException, InterruptedException {
|
||||
// GIVEN, WHEN
|
||||
boolean directoryExists = false;
|
||||
|
||||
// THEN
|
||||
directoryExistsWrappedInUGICall(directoryExists);
|
||||
}
|
||||
|
||||
private void directoryExistsWrappedInUGICall(boolean directoryExists) throws IOException, InterruptedException {
|
||||
// GIVEN
|
||||
FileSystem mockFileSystem = mock(FileSystem.class);
|
||||
UserGroupInformation mockUserGroupInformation = mock(UserGroupInformation.class);
|
||||
|
||||
GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, mockFileSystem, mockUserGroupInformation);
|
||||
TestRunner runner = TestRunners.newTestRunner(testSubject);
|
||||
runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata");
|
||||
|
||||
// WHEN
|
||||
Answer<?> answer = new Answer<Object>() {
|
||||
private int callCounter = 0;
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
final Object result;
|
||||
if (callCounter == 0) {
|
||||
when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists);
|
||||
result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
|
||||
verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
|
||||
verify(mockFileSystem).exists(any(Path.class));
|
||||
} else {
|
||||
when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]);
|
||||
result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
|
||||
verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
|
||||
verify(mockFileSystem).listStatus(any(Path.class));
|
||||
}
|
||||
++callCounter;
|
||||
return result;
|
||||
}
|
||||
};
|
||||
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(answer);
|
||||
runner.run();
|
||||
|
||||
// THEN
|
||||
verify(mockFileSystem).getUri();
|
||||
verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation);
|
||||
}
|
||||
|
||||
private static class TestableGetHDFS extends GetHDFS {
|
||||
|
||||
private final KerberosProperties testKerberosProperties;
|
||||
|
@ -279,4 +345,24 @@ public class GetHDFSTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static class TestableGetHDFSForUGI extends TestableGetHDFS {
|
||||
private FileSystem mockFileSystem;
|
||||
private UserGroupInformation mockUserGroupInformation;
|
||||
|
||||
public TestableGetHDFSForUGI(KerberosProperties testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation mockUserGroupInformation) {
|
||||
super(testKerberosProperties);
|
||||
this.mockFileSystem = mockFileSystem;
|
||||
this.mockUserGroupInformation = mockUserGroupInformation;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FileSystem getFileSystem() {
|
||||
return mockFileSystem;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UserGroupInformation getUserGroupInformation() {
|
||||
return mockUserGroupInformation;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue