mirror of https://github.com/apache/nifi.git
NIFI-6480: PutORC/PutParquet can't overwrite file even if set 'Overwrite Files' to true
This closes #3599. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
7623e6f5a1
commit
8a8852e73d
|
@ -283,11 +283,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
||||||
final Path tempFile = new Path(directoryPath, "." + filenameValue);
|
final Path tempFile = new Path(directoryPath, "." + filenameValue);
|
||||||
final Path destFile = new Path(directoryPath, filenameValue);
|
final Path destFile = new Path(directoryPath, filenameValue);
|
||||||
|
|
||||||
final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
|
final boolean destinationOrTempExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
|
||||||
final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
|
final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
|
||||||
|
|
||||||
// if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
|
// if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
|
||||||
if (destinationExists && !shouldOverwrite) {
|
if (destinationOrTempExists && !shouldOverwrite) {
|
||||||
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
|
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
|
||||||
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
|
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
|
||||||
return null;
|
return null;
|
||||||
|
@ -339,6 +339,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
|
||||||
throw exceptionHolder.get();
|
throw exceptionHolder.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final boolean destinationExists = fileSystem.exists(destFile);
|
||||||
|
|
||||||
|
// If destination file already exists, resolve that based on processor configuration
|
||||||
|
if (destinationExists && shouldOverwrite) {
|
||||||
|
if (fileSystem.delete(destFile, false)) {
|
||||||
|
getLogger().info("deleted {} in order to replace with the contents of {}",
|
||||||
|
new Object[]{destFile, putFlowFile});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
|
// Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
|
||||||
rename(fileSystem, tempFile, destFile);
|
rename(fileSystem, tempFile, destFile);
|
||||||
changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
|
changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
|
||||||
|
|
|
@ -140,6 +140,34 @@ public class PutORCTest {
|
||||||
testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
|
testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOverwriteFile() throws InitializationException {
|
||||||
|
configure(proc, 1);
|
||||||
|
|
||||||
|
final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
|
||||||
|
|
||||||
|
final Map<String, String> flowFileAttributes = new HashMap<>();
|
||||||
|
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
|
||||||
|
|
||||||
|
testRunner.setProperty(PutORC.OVERWRITE, "true");
|
||||||
|
|
||||||
|
testRunner.enqueue("trigger", flowFileAttributes);
|
||||||
|
testRunner.run();
|
||||||
|
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockRecordParser readerFactory = (MockRecordParser) testRunner.getControllerService("mock-reader-factory");
|
||||||
|
readerFactory.addRecord("name", 1, "blue", 10.0);
|
||||||
|
testRunner.enqueue("trigger", flowFileAttributes);
|
||||||
|
testRunner.run();
|
||||||
|
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 2);
|
||||||
|
|
||||||
|
testRunner.setProperty(PutORC.OVERWRITE, "false");
|
||||||
|
readerFactory.addRecord("name", 1, "blue", 10.0);
|
||||||
|
testRunner.enqueue("trigger", flowFileAttributes);
|
||||||
|
testRunner.run();
|
||||||
|
testRunner.assertTransferCount(PutORC.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteORCWithDefaults() throws IOException, InitializationException {
|
public void testWriteORCWithDefaults() throws IOException, InitializationException {
|
||||||
configure(proc, 100);
|
configure(proc, 100);
|
||||||
|
@ -454,4 +482,4 @@ public class PutORCTest {
|
||||||
assertEquals(numExpectedUsers, currUser);
|
assertEquals(numExpectedUsers, currUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue