NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS

This closes #5423.

Signed-off-by: Peter Turcsanyi <>
This commit is contained in:
Bence Simon 2021-09-30 15:57:41 +02:00 committed by Peter Turcsanyi
parent 705c65c86c
commit 9b724df6d9
2 changed files with 90 additions and 19 deletions

View File

@ -125,6 +125,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
protected static final String FAIL_RESOLUTION = "fail";
protected static final String APPEND_RESOLUTION = "append";
protected static final String WRITE_AND_RENAME = "writeAndRename";
protected static final String SIMPLE_WRITE = "simpleWrite";
protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
REPLACE_RESOLUTION, "Replaces the existing file if any.");
protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
@ -134,6 +137,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
"Appends to the existing file if any, creates a new file otherwise.");
protected static final AllowableValue WRITE_AND_RENAME_AV = new AllowableValue(WRITE_AND_RENAME, "Write and rename",
"The processor writes FlowFile data into a temporary file and renames it after completion. This prevents other processes from reading partially written files.");
protected static final AllowableValue SIMPLE_WRITE_AV = new AllowableValue(SIMPLE_WRITE, "Simple write",
"The processor writes FlowFile data directly to the destination file. In some cases this might cause reading partially written files.");
protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory")
@ -142,6 +150,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder()
.displayName("Writing Strategy")
.description("Defines the approach for writing the FlowFile data.")
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
@ -219,6 +236,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
.description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.")
@ -280,6 +298,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
final String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue();
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
@ -295,6 +314,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
final Path tempCopyFile = new Path(dirPath, "." + filename);
final Path copyFile = new Path(dirPath, filename);
// Depending on the writing strategy, we might need a temporary file
final Path actualCopyFile = (writingStrategy.equals(WRITE_AND_RENAME))
? tempCopyFile
: copyFile;
// Create destination directory if it does not exist
boolean targetDirCreated = false;
try {
@ -361,7 +385,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
null, null);
@ -369,7 +393,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
if (codec != null) {
fos = codec.createOutputStream(fos);
createdFile = tempCopyFile;
createdFile = actualCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
@ -399,9 +423,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
if (!conflictResponse.equals(APPEND_RESOLUTION)
|| (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
if (
&& (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
) {
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) {
renamed = true;

View File

@ -71,6 +71,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class PutHDFSTest {
private final static String TARGET_DIRECTORY = "target/test-classes";
private final static String SOURCE_DIRECTORY = "src/test/resources/testdata";
private final static String FILE_NAME = "randombytes-1";
private KerberosProperties kerberosProperties;
private FileSystem mockFileSystem;
@ -197,27 +200,32 @@ public class PutHDFSTest {
public void testPutFile() throws IOException {
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
// given
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
// when
try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
runner.enqueue(fis, attributes);;
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
// then
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
final MockFlowFile flowFile = flowFiles.get(0);
assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
@ -225,7 +233,43 @@ public class PutHDFSTest {
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
Mockito.verify(spyFileSystem, Mockito.times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class));
public void testPutFileWithSimpleWrite() throws IOException {
// given
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
// when
try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
runner.enqueue(fis, attributes);;
// then
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.get(0);
assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
Mockito.verify(spyFileSystem, Mockito.never()).rename(Mockito.any(Path.class), Mockito.any(Path.class));