NIFI-4542 - add target.dir.created to indicate if the target directory created

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5397.
This commit is contained in:
hondawei 2021-09-19 21:29:52 +08:00 committed by Pierre Villard
parent 2d7e9c1c9a
commit 30cf49db7e
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 44 additions and 2 deletions

View File

@ -153,6 +153,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = "target.dir.created";
private static final Object RESOURCES_LOCK = new Object();
private static final HdfsResources EMPTY_HDFS_RESOURCES = new HdfsResources(null, null, null, null);

View File

@ -79,7 +79,8 @@ import java.util.stream.Stream;
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."),
@WritesAttribute(attribute = "target.dir.created", description = "The result(true/false) indicates if the folder is created by the processor.")
})
@SeeAlso(GetHDFS.class)
@Restricted(restrictions = {
@ -269,12 +270,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
final Path copyFile = new Path(dirPath, filename);
// Create destination directory if it does not exist
boolean targetDirCreated = false;
try {
if (!hdfs.getFileStatus(dirPath).isDirectory()) {
throw new IOException(dirPath.toString() + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(dirPath)) {
targetDirCreated = hdfs.mkdirs(dirPath);
if (!targetDirCreated) {
throw new IOException(dirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, dirPath, flowFile);
@ -388,6 +391,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
final String hdfsPath = copyFile.getParent().toString();
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());

View File

@ -210,6 +210,42 @@ public class PutHDFSTest {
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
}
@Test
public void testPutFileWhenTargetDirExists() throws IOException {
String targetDir = "target/test-classes";
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
proc.getFileSystem().mkdirs(new Path(targetDir));
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, targetDir);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertTrue(failedFlowFiles.isEmpty());
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("false", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());