NIFI-8737: Fixed incorrect provenance data in HDFS processors when Directory property is inconsistent with core-site.xml

This commit is contained in:
Peter Turcsanyi 2021-06-23 17:04:08 +02:00 committed by Pierre Villard
parent 726082ffa6
commit 39ffa4a8ac
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
12 changed files with 117 additions and 43 deletions

View File

@ -34,6 +34,7 @@ import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
@ -606,4 +607,26 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
}
protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property) {
return getNormalizedPath(context, property, null);
}
protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property, FlowFile flowFile) {
final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
final Path path = new Path(propertyValue);
final URI uri = path.toUri();
final URI fileSystemUri = getFileSystem().getUri();
if (uri.getScheme() != null) {
if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
getLogger().warn("The filesystem component of the URI configured in the '{}' property ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
"and will be ignored.", property.getDisplayName(), uri, fileSystemUri);
}
return new Path(uri.getPath());
} else {
return path;
}
}
}

View File

@ -100,13 +100,12 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
final Path configuredRootDirPath = new Path(dirValue);
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long blockSize = getBlockSize(context, session, putFlowFile);
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
final int bufferSize = getBufferSize(context, session, putFlowFile);
final short replication = getReplication(context, session, putFlowFile);
final short replication = getReplication(context, session, putFlowFile, dirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
@ -114,19 +113,19 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
final Path copyFile = new Path(configuredRootDirPath, filename);
final Path tempCopyFile = new Path(dirPath, "." + filename);
final Path copyFile = new Path(dirPath, filename);
// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
if (!hdfs.getFileStatus(dirPath).isDirectory()) {
throw new IOException(dirPath.toString() + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(configuredRootDirPath)) {
throw new IOException(configuredRootDirPath.toString() + " could not be created");
if (!hdfs.mkdirs(dirPath)) {
throw new IOException(dirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, configuredRootDirPath, flowFile);
changeOwner(context, hdfs, dirPath, flowFile);
}
final boolean destinationExists = hdfs.exists(copyFile);
@ -274,7 +273,7 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
/**
* Returns with the expected block size.
*/
protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
/**
* Returns with the expected buffer size.
@ -284,7 +283,7 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
/**
* Returns with the expected replication factor.
*/
protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
/**
* Returns if file system should ignore locality.

View File

@ -179,7 +179,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
FlowFile child = null;
final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
try {
final Path path = new Path(filenameValue);
final Path path = getNormalizedPath(context, FILENAME, originalFlowFile);
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();

View File

@ -273,10 +273,9 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
FlowFile putFlowFile = flowFile;
try {
final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
// create the directory if it doesn't exist
final Path directoryPath = new Path(directoryValue);
final Path directoryPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
// write to tempFile first and on success rename to destFile

View File

@ -145,7 +145,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
// We need a FlowFile to report provenance correctly.
final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(finalFlowFile).getValue();
final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
final FileSystem fileSystem = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();

View File

@ -125,7 +125,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
final Path path;
try {
path = new Path(filenameValue);
path = getNormalizedPath(context, FILENAME, flowFile);
} catch (IllegalArgumentException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());

View File

@ -345,7 +345,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
final Path rootDir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
final Path rootDir = getNormalizedPath(context, DIRECTORY);
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
@ -427,7 +427,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
try {
final FileSystem hdfs = getFileSystem();
final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
final Path directoryPath = getNormalizedPath(context, DIRECTORY);
if (!hdfs.exists(directoryPath)) {
context.yield();

View File

@ -582,7 +582,8 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
*/
protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, FlowFile ff) {
HDFSFileInfoRequest req = new HDFSFileInfoRequest();
req.setFullPath(context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue());
String fullPath = getNormalizedPath(context, FULL_PATH, ff).toString();
req.setFullPath(fullPath);
req.setRecursive(context.getProperty(RECURSE_SUBDIRS).asBoolean());
PropertyValue pv;

View File

@ -402,8 +402,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
lastRunTimestamp = now;
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try {
final StateMap stateMap = session.getState(Scope.CLUSTER);
@ -443,7 +441,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
final Set<FileStatus> statuses;
try {
final Path rootPath = new Path(directory);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -245,7 +246,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
Path inputPath;
try {
inputPath = new Path(filenameValue);
inputPath = getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE, flowFile);
if (!hdfs.exists(inputPath)) {
throw new IOException("Input Directory or File does not exist in HDFS");
}
@ -348,9 +349,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
FlowFile flowFile = session.create(parentFlowFile);
try {
final String originalFilename = file.getName();
final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue();
final Path configuredRootOutputDirPath = new Path(outputDirValue);
final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile);
final Path newFile = new Path(outputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
@ -382,15 +382,15 @@ public class MoveHDFS extends AbstractHadoopProcessor {
// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
throw new IOException(configuredRootOutputDirPath.toString()
if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
throw new IOException(outputDirPath.toString()
+ " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
if (!hdfs.mkdirs(outputDirPath)) {
throw new IOException(outputDirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, configuredRootOutputDirPath);
changeOwner(context, hdfs, outputDirPath);
}
boolean moved = false;
@ -419,8 +419,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
: "hdfs://" + outputPath;
final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/");
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);

View File

@ -189,11 +189,9 @@ public class PutHDFS extends AbstractPutHDFS {
}
@Override
protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final Path configuredRootDirPath = new Path(dirValue);
protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(configuredRootDirPath);
return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
}
@Override
@ -203,12 +201,10 @@ public class PutHDFS extends AbstractPutHDFS {
}
@Override
protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final Path configuredRootDirPath = new Path(dirValue);
protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
.getDefaultReplication(configuredRootDirPath);
.getDefaultReplication(dirPath);
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.FileResourceReference;
@ -39,11 +41,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -207,4 +212,58 @@ public class AbstractHadoopTest {
runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath());
runner.assertValid();
}
@Test
public void testGetNormalizedPathWithoutFileSystem() throws URISyntaxException {
AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container1@storageaccount1");
TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "/dir1");
Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
assertEquals("/dir1", path.toString());
assertTrue(runner.getLogger().getWarnMessages().isEmpty());
}
@Test
public void testGetNormalizedPathWithCorrectFileSystem() throws URISyntaxException {
AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container2@storageaccount2");
TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "abfs://container2@storageaccount2/dir2");
Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
assertEquals("/dir2", path.toString());
assertTrue(runner.getLogger().getWarnMessages().isEmpty());
}
@Test
public void testGetNormalizedPathWithIncorrectFileSystem() throws URISyntaxException {
AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container3@storageaccount3");
TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "abfs://container*@storageaccount*/dir3");
Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
assertEquals("/dir3", path.toString());
assertFalse(runner.getLogger().getWarnMessages().isEmpty());
}
private AbstractHadoopProcessor initProcessorForTestGetNormalizedPath(String fileSystemUri) throws URISyntaxException {
final FileSystem fileSystem = mock(FileSystem.class);
when(fileSystem.getUri()).thenReturn(new URI(fileSystemUri));
final PutHDFS processor = new PutHDFS() {
@Override
protected FileSystem getFileSystem() {
return fileSystem;
}
};
return processor;
}
private TestRunner initTestRunnerForTestGetNormalizedPath(AbstractHadoopProcessor processor, String directory) throws URISyntaxException {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractHadoopProcessor.DIRECTORY, directory);
return runner;
}
}