diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java index 62e7231e91..5ee54e3c49 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -283,11 +283,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { final Path tempFile = new Path(directoryPath, "." + filenameValue); final Path destFile = new Path(directoryPath, filenameValue); - final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile); + final boolean destinationOrTempExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile); final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean(); // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure - if (destinationExists && !shouldOverwrite) { + if (destinationOrTempExists && !shouldOverwrite) { session.transfer(session.penalize(putFlowFile), REL_FAILURE); getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile}); return null; @@ -339,6 +339,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { throw exceptionHolder.get(); } + final boolean destinationExists = fileSystem.exists(destFile); + + // If destination file already exists, resolve that based on processor configuration + if (destinationExists && shouldOverwrite) { + if (fileSystem.delete(destFile, false)) { + getLogger().info("deleted {} in order to replace with the contents of {}", + new Object[]{destFile, putFlowFile}); + } + } + // Attempt to rename from the tempFile to destFile, and change owner if successfully renamed rename(fileSystem, tempFile, destFile); changeOwner(fileSystem, destFile, remoteOwner, remoteGroup); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java index 20e30cc7d1..c063ea1ca4 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java @@ -140,6 +140,34 @@ public class PutORCTest { testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory"); } + @Test + public void testOverwriteFile() throws InitializationException { + configure(proc, 1); + + final String filename = "testORCWithDefaults-" + System.currentTimeMillis(); + + final Map flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.setProperty(PutORC.OVERWRITE, "true"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1); + + MockRecordParser readerFactory = (MockRecordParser) testRunner.getControllerService("mock-reader-factory"); + readerFactory.addRecord("name", 1, "blue", 10.0); + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 2); + + testRunner.setProperty(PutORC.OVERWRITE, "false"); + readerFactory.addRecord("name", 1, "blue", 10.0); + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertTransferCount(PutORC.REL_FAILURE, 1); + } + @Test public void testWriteORCWithDefaults() throws IOException, InitializationException { configure(proc, 100); @@ -454,4 +482,4 @@ public class PutORCTest { assertEquals(numExpectedUsers, currUser); } -} \ No newline at end of file +}