NIFI-9360 Update PutHDFS to handle filesystems which do not support getAclStatus()

This closes #5505

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2021-11-02 17:18:08 -04:00 committed by exceptionfactory
parent f01fb17555
commit 362a243e0f
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 12 additions and 4 deletions

View File

@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.AclEntryScope;
@ -280,7 +281,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
FlowFile putFlowFile = flowFile; FlowFile putFlowFile = flowFile;
try { try {
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile); final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
checkAclStatus(getAclStatus(dirPath));
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath); final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
final int bufferSize = getBufferSize(context, session, putFlowFile); final int bufferSize = getBufferSize(context, session, putFlowFile);
@ -298,14 +298,22 @@ public class PutHDFS extends AbstractHadoopProcessor {
// Create destination directory if it does not exist // Create destination directory if it does not exist
boolean targetDirCreated = false; boolean targetDirCreated = false;
try { try {
if (!hdfs.getFileStatus(dirPath).isDirectory()) { final FileStatus fileStatus = hdfs.getFileStatus(dirPath);
if (!fileStatus.isDirectory()) {
throw new IOException(dirPath.toString() + " already exists and is not a directory"); throw new IOException(dirPath.toString() + " already exists and is not a directory");
} }
if (fileStatus.hasAcl()) {
checkAclStatus(getAclStatus(dirPath));
}
} catch (FileNotFoundException fe) { } catch (FileNotFoundException fe) {
targetDirCreated = hdfs.mkdirs(dirPath); targetDirCreated = hdfs.mkdirs(dirPath);
if (!targetDirCreated) { if (!targetDirCreated) {
throw new IOException(dirPath.toString() + " could not be created"); throw new IOException(dirPath.toString() + " could not be created");
} }
final FileStatus fileStatus = hdfs.getFileStatus(dirPath);
if (fileStatus.hasAcl()) {
checkAclStatus(getAclStatus(dirPath));
}
changeOwner(context, hdfs, dirPath, flowFile); changeOwner(context, hdfs, dirPath, flowFile);
} }
@ -463,7 +471,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
return aclCache.get(dirPath, fn -> { return aclCache.get(dirPath, fn -> {
try { try {
return hdfs.getAclStatus(dirPath); return hdfs.getAclStatus(dirPath);
} catch (IOException e) { } catch (final IOException e) {
throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e); throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
} }
}); });

View File

@ -730,7 +730,7 @@ public class PutHDFSTest {
} }
private FileStatus newDir(Path p) { private FileStatus newDir(Path p) {
return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", p); return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false);
} }
@Override @Override