NIFI-7984 Use UGI doAs in DeleteHDFS (#4622)

This commit is contained in:
Bryan Bende 2020-10-27 11:24:48 -04:00 committed by GitHub
parent 3612973a47
commit 77b4abc21e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
@ -41,6 +42,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -141,63 +143,70 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
}
// We need a FlowFile to report provenance correctly.
FlowFile flowFile = originalFlowFile != null ? originalFlowFile : session.create();
final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(finalFlowFile).getValue();
final FileSystem fileSystem = getFileSystem();
try {
// Check if the user has supplied a file or directory pattern
List<Path> pathList = Lists.newArrayList();
if (GLOB_MATCHER.reset(fileOrDirectoryName).find()) {
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(fileOrDirectoryName));
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
pathList.add(fileStatus.getPath());
final UserGroupInformation ugi = getUserGroupInformation();
ugi.doAs((PrivilegedAction<Object>)() -> {
FlowFile flowFile = finalFlowFile;
try {
// Check if the user has supplied a file or directory pattern
List<Path> pathList = Lists.newArrayList();
if (GLOB_MATCHER.reset(fileOrDirectoryName).find()) {
FileStatus[] fileStatuses = fileSystem.globStatus(new Path(fileOrDirectoryName));
if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
pathList.add(fileStatus.getPath());
}
}
} else {
pathList.add(new Path(fileOrDirectoryName));
}
int failedPath = 0;
for (Path path : pathList) {
if (fileSystem.exists(path)) {
try {
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
attributes.put("hdfs.filename", path.getName());
attributes.put("hdfs.path", path.getParent().toString());
flowFile = session.putAllAttributes(flowFile, attributes);
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put("hdfs.error.message", ioe.getMessage());
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
failedPath++;
}
}
}
} else {
pathList.add(new Path(fileOrDirectoryName));
}
int failedPath = 0;
for (Path path : pathList) {
if (fileSystem.exists(path)) {
try {
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
attributes.put("hdfs.filename", path.getName());
attributes.put("hdfs.path", path.getParent().toString());
flowFile = session.putAllAttributes(flowFile, attributes);
fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);
Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put("hdfs.error.message", ioe.getMessage());
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
failedPath++;
}
if (failedPath == 0) {
session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
} else {
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
session.remove(flowFile);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
}
if (failedPath == 0) {
session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
} else {
// If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
session.remove(flowFile);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
}
return null;
});
}
}