NIFI-12880 Added DeleteFile Processor

This closes #8489

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
EndzeitBegins 2024-03-09 16:44:59 +01:00 committed by exceptionfactory
parent 22f700f476
commit 57e07c080f
No known key found for this signature in database
3 changed files with 347 additions and 0 deletions

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"file", "remove", "delete", "local", "files", "filesystem"})
@CapabilityDescription("Deletes a file from the filesystem.")
@UseCase(
description = "Delete source file only after its processing completed",
configuration = """
Retrieve a file from the filesystem, e.g. using 'ListFile' and 'FetchFile'.
Process the file using any combination of processors.
Store the resulting file to a destination, e.g. using 'PutSFTP'.
Using 'DeleteFile', delete the file from the filesystem only after the result has been stored.
"""
)
@WritesAttributes({
@WritesAttribute(
attribute = DeleteFile.ATTRIBUTE_FAILURE_REASON,
description = "Human-readable reason of failure. Only available if FlowFile is routed to relationship 'failure'."),
@WritesAttribute(
attribute = DeleteFile.ATTRIBUTE_EXCEPTION_CLASS,
description = "The class name of the exception thrown during processor execution. Only available if an exception caused the FlowFile to be routed to relationship 'failure'."),
@WritesAttribute(
attribute = DeleteFile.ATTRIBUTE_EXCEPTION_MESSAGE,
description = "The message of the exception thrown during processor execution. Only available if an exception caused the FlowFile to be routed to relationship 'failure'.")
})
@Restricted(
restrictions = {
@Restriction(
requiredPermission = RequiredPermission.READ_FILESYSTEM,
explanation = "Provides operator the ability to read from any file that NiFi has access to."),
@Restriction(
requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
explanation = "Provides operator the ability to delete any file that NiFi has access to.")
}
)
public class DeleteFile extends AbstractProcessor {
public static final String ATTRIBUTE_FAILURE_REASON = "DeleteFile.failure.reason";
public static final String ATTRIBUTE_EXCEPTION_CLASS = "DeleteFile.failure.exception.class";
public static final String ATTRIBUTE_EXCEPTION_MESSAGE = "DeleteFile.failure.exception.message";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles, for which an existing file has been deleted, are routed to this relationship")
.build();
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not found")
.description("All FlowFiles, for which the file to delete did not exist, are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All FlowFiles, for which an existing file could not be deleted, are routed to this relationship")
.build();
private final static Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_NOT_FOUND, REL_FAILURE);
public static final PropertyDescriptor DIRECTORY_PATH = new PropertyDescriptor.Builder()
.name("Directory Path")
.description("The path to the directory the file to delete is located in.")
.required(true)
.defaultValue("${" + CoreAttributes.ABSOLUTE_PATH.key() + "}")
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
.name("Filename")
.description("The name of the file to delete.")
.required(true)
.defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
private final static List<PropertyDescriptor> properties = List.of(DIRECTORY_PATH, FILENAME);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
final String directoryPathProperty = context.getProperty(DIRECTORY_PATH).evaluateAttributeExpressions(flowFile).getValue();
final String filename = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
try {
final Path directoryPath = Paths.get(directoryPathProperty).toRealPath();
final Path filePath = directoryPath.resolve(filename).toRealPath();
if (!directoryPath.equals(filePath.getParent())) {
final String errorMessage = "Attempting to delete file at path '%s' which is not a direct child of the directory '%s'"
.formatted(filePath, directoryPath);
handleFailure(session, flowFile, errorMessage, null);
return;
}
Files.delete(filePath);
session.transfer(flowFile, REL_SUCCESS);
final String transitUri = "file://%s".formatted(filePath);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().debug("Successfully deleted file at path {} in {} millis; routing to success", flowFile, transferMillis);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUri, "Object deleted");
} catch (final NoSuchFileException noSuchFileException) {
session.transfer(flowFile, REL_NOT_FOUND);
} catch (final IOException ioException) {
final String errorMessage = "Failed to delete file '%s' in directory '%s'"
.formatted(filename, directoryPathProperty);
handleFailure(session, flowFile, errorMessage, ioException);
}
}
private void handleFailure(ProcessSession session, FlowFile flowFile, String errorMessage, Throwable throwable) {
getLogger().error(errorMessage, throwable);
session.putAttribute(flowFile, ATTRIBUTE_FAILURE_REASON, errorMessage);
if (throwable != null) {
session.putAttribute(flowFile, ATTRIBUTE_EXCEPTION_CLASS, throwable.getClass().toString());
session.putAttribute(flowFile, ATTRIBUTE_EXCEPTION_MESSAGE, throwable.getMessage());
}
session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}

View File

@ -23,6 +23,7 @@ org.apache.nifi.processors.standard.ConvertRecord
org.apache.nifi.processors.standard.CountText
org.apache.nifi.processors.standard.CryptographicHashContent
org.apache.nifi.processors.standard.DebugFlow
org.apache.nifi.processors.standard.DeleteFile
org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DeduplicateRecord
org.apache.nifi.processors.standard.DistributeLoad

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TestDeleteFile {
private final TestRunner runner = TestRunners.newTestRunner(DeleteFile.class);
@TempDir
private Path testDirectory;
@Test
void deletesExistingFile() throws IOException {
final Path directoryPath = testDirectory.toAbsolutePath();
final String filename = "test.txt";
final MockFlowFile enqueuedFlowFile = enqueue(directoryPath.toString(), filename);
final Path fileToDelete = Files.writeString(testDirectory.resolve(filename), "some text");
assertExists(fileToDelete);
runner.run();
assertNotExists(fileToDelete);
runner.assertAllFlowFilesTransferred(DeleteFile.REL_SUCCESS, 1);
runner.assertAllFlowFiles(
DeleteFile.REL_SUCCESS,
flowFileInRelationship -> assertEquals(enqueuedFlowFile, flowFileInRelationship)
);
runner.assertProvenanceEvent(ProvenanceEventType.REMOTE_INVOCATION);
}
@Test
void deletesExistingEmptyDirectory() throws IOException {
final Path directoryPath = testDirectory.toAbsolutePath();
final String filename = "test-directory";
enqueue(directoryPath.toString(), filename);
final Path fileToDelete = Files.createDirectory(testDirectory.resolve(filename));
assertExists(fileToDelete);
runner.run();
assertNotExists(fileToDelete);
runner.assertAllFlowFilesTransferred(DeleteFile.REL_SUCCESS);
}
@Test
void sendsFlowFileToNotFoundWhenFileDoesNotExist() {
final Path directoryPath = testDirectory.toAbsolutePath();
final String filename = "test.txt";
enqueue(directoryPath.toString(), filename);
final Path fileToDelete = testDirectory.resolve(filename);
assertNotExists(fileToDelete);
runner.run();
assertNotExists(fileToDelete);
runner.assertAllFlowFilesTransferred(DeleteFile.REL_NOT_FOUND);
}
@Test
void sendsFlowFileToNotFoundWhenDirectoryDoesNotExist() {
final Path directoryPath = testDirectory.resolve("non-existing-directory").toAbsolutePath();
final String filename = "test.txt";
enqueue(directoryPath.toString(), filename);
final Path fileToDelete = testDirectory.resolve(filename);
assertNotExists(fileToDelete);
runner.run();
assertNotExists(fileToDelete);
runner.assertAllFlowFilesTransferred(DeleteFile.REL_NOT_FOUND);
}
@Test
void sendsFlowFileToFailureWhenTargetIsAnNonEmptyDirectory() throws IOException {
final Path directoryPath = testDirectory.toAbsolutePath();
final String filename = "test-directory";
enqueue(directoryPath.toString(), filename);
final Path fileToDelete = Files.createDirectory(testDirectory.resolve(filename));
Files.writeString(testDirectory.resolve(filename).resolve("disturbance"), "not empty");
assertExists(fileToDelete);
runner.run();
assertExists(fileToDelete);
runner.assertAllFlowFilesTransferred(DeleteFile.REL_FAILURE);
runner.assertPenalizeCount(1);
final MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(DeleteFile.REL_FAILURE).getFirst();
resultFlowFile.assertAttributeExists(DeleteFile.ATTRIBUTE_FAILURE_REASON);
resultFlowFile.assertAttributeExists(DeleteFile.ATTRIBUTE_EXCEPTION_CLASS);
resultFlowFile.assertAttributeExists(DeleteFile.ATTRIBUTE_EXCEPTION_MESSAGE);
}
@Test
void sendsFlowFileToFailureWhenFileIsNotADirectChildOfTheDirectory() throws IOException {
final Path directory = Files.createDirectory(testDirectory.resolve("test-directory")).toAbsolutePath();
final String filename = "../sibling.txt";
enqueue(directory.toString(), filename);
final Path fileToDelete = Files.writeString(directory.resolve(filename), "sibling content");
assertExists(fileToDelete);
runner.run();
assertExists(fileToDelete);
runner.assertAllFlowFilesTransferred(DeleteFile.REL_FAILURE, 1);
runner.assertPenalizeCount(1);
final MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(DeleteFile.REL_FAILURE).getFirst();
resultFlowFile.assertAttributeExists(DeleteFile.ATTRIBUTE_FAILURE_REASON);
resultFlowFile.assertAttributeNotExists(DeleteFile.ATTRIBUTE_EXCEPTION_CLASS);
resultFlowFile.assertAttributeNotExists(DeleteFile.ATTRIBUTE_EXCEPTION_MESSAGE);
}
private MockFlowFile enqueue(String directoryPath, String filename) {
final Map<String, String> attributes = Map.of(
CoreAttributes.ABSOLUTE_PATH.key(), directoryPath,
CoreAttributes.FILENAME.key(), filename
);
return runner.enqueue("data", attributes);
}
private static void assertNotExists(Path filePath) {
assertTrue(Files.notExists(filePath), () -> "File " + filePath + "still exists");
}
private static void assertExists(Path filePath) {
assertTrue(Files.exists(filePath), () -> "File " + filePath + "does not exist");
}
}