NIFI-9265 Fixing path handling for HDFS processors when there are multiplied separators in the path

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5437.
This commit is contained in:
Bence Simon 2021-10-05 16:54:47 +02:00 committed by Pierre Villard
parent 60d6d469bf
commit 75d1d7f6e7
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 48 additions and 28 deletions

View File

@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -85,6 +86,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
private static final Pattern LOCAL_FILE_SYSTEM_URI = Pattern.compile("^file:.*"); private static final Pattern LOCAL_FILE_SYSTEM_URI = Pattern.compile("^file:.*");
private static final String NORMALIZE_ERROR_WITH_PROPERTY = "The filesystem component of the URI configured in the '{}' property ({}) does not match " +
"the filesystem URI from the Hadoop configuration file ({}) and will be ignored.";
private static final String NORMALIZE_ERROR_WITHOUT_PROPERTY = "The filesystem component of the URI configured ({}) does not match the filesystem URI from " +
"the Hadoop configuration file ({}) and will be ignored.";
// properties // properties
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("Hadoop Configuration Resources") .name("Hadoop Configuration Resources")
@ -674,39 +681,33 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
protected Path getNormalizedPath(final String rawPath) { protected Path getNormalizedPath(final String rawPath) {
final Path path = new Path(rawPath); return getNormalizedPath(rawPath, Optional.empty());
final URI uri = path.toUri();
final URI fileSystemUri = getFileSystem().getUri();
if (uri.getScheme() != null) {
if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
getLogger().warn("The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
"and will be ignored.", uri, fileSystemUri);
}
return new Path(uri.getPath());
} else {
return path;
}
} }
protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) { protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) {
final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
final Path path = new Path(propertyValue); return getNormalizedPath(propertyValue, Optional.of(property.getDisplayName()));
final URI uri = path.toUri(); }
private Path getNormalizedPath(final String rawPath, final Optional<String> propertyName) {
final URI uri = new Path(rawPath).toUri();
final URI fileSystemUri = getFileSystem().getUri(); final URI fileSystemUri = getFileSystem().getUri();
final String path;
if (uri.getScheme() != null) { if (uri.getScheme() != null) {
if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) { if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
getLogger().warn("The filesystem component of the URI configured in the '{}' property ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " + if (propertyName.isPresent()) {
"and will be ignored.", property.getDisplayName(), uri, fileSystemUri); getLogger().warn(NORMALIZE_ERROR_WITH_PROPERTY, propertyName, uri, fileSystemUri);
} else {
getLogger().warn(NORMALIZE_ERROR_WITHOUT_PROPERTY, uri, fileSystemUri);
}
} }
return new Path(uri.getPath()); path = uri.getPath();
} else { } else {
return path; path = rawPath;
} }
return new Path(path.replaceAll("/+", "/"));
} }
} }

View File

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -57,9 +58,10 @@ public class TestFetchHDFS {
} }
@Test @Test
public void testFetchStaticFileThatExists() throws IOException { public void testFetchStaticFileThatExists() {
final String file = "src/test/resources/testdata/randombytes-1"; final String file = "src/test/resources/testdata/randombytes-1";
runner.setProperty(FetchHDFS.FILENAME, file); final String fileWithMultipliedSeparators = "src/test////resources//testdata/randombytes-1";
runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators);
runner.enqueue(new String("trigger flow file")); runner.enqueue(new String("trigger flow file"));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
@ -72,7 +74,24 @@ public class TestFetchHDFS {
} }
@Test @Test
public void testFetchStaticFileThatDoesNotExist() throws IOException { public void testFetchStaticFileThatExistsWithAbsolutePath() {
final File destination = new File("src/test/resources/testdata/randombytes-1");
final String file = destination.getAbsolutePath();
final String fileWithMultipliedSeparators = "/" + file;
runner.setProperty(FetchHDFS.FILENAME, fileWithMultipliedSeparators);
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord fetchEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType());
// As absolute path call results a different format under Windows, the assertion directly looks for duplication.
assertFalse(fetchEvent.getTransitUri().contains(File.separator + File.separator));
}
@Test
public void testFetchStaticFileThatDoesNotExist() {
final String file = "src/test/resources/testdata/doesnotexist"; final String file = "src/test/resources/testdata/doesnotexist";
runner.setProperty(FetchHDFS.FILENAME, file); runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file")); runner.enqueue(new String("trigger flow file"));
@ -81,7 +100,7 @@ public class TestFetchHDFS {
} }
@Test @Test
public void testFetchFileThatExistsFromIncomingFlowFile() throws IOException { public void testFetchFileThatExistsFromIncomingFlowFile() {
final String file = "src/test/resources/testdata/randombytes-1"; final String file = "src/test/resources/testdata/randombytes-1";
runner.setProperty(FetchHDFS.FILENAME, "${my.file}"); runner.setProperty(FetchHDFS.FILENAME, "${my.file}");
@ -94,7 +113,7 @@ public class TestFetchHDFS {
} }
@Test @Test
public void testFilenameWithValidEL() throws IOException { public void testFilenameWithValidEL() {
final String file = "src/test/resources/testdata/${literal('randombytes-1')}"; final String file = "src/test/resources/testdata/${literal('randombytes-1')}";
runner.setProperty(FetchHDFS.FILENAME, file); runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file")); runner.enqueue(new String("trigger flow file"));
@ -103,14 +122,14 @@ public class TestFetchHDFS {
} }
@Test @Test
public void testFilenameWithInvalidEL() throws IOException { public void testFilenameWithInvalidEL() {
final String file = "src/test/resources/testdata/${literal('randombytes-1'):foo()}"; final String file = "src/test/resources/testdata/${literal('randombytes-1'):foo()}";
runner.setProperty(FetchHDFS.FILENAME, file); runner.setProperty(FetchHDFS.FILENAME, file);
runner.assertNotValid(); runner.assertNotValid();
} }
@Test @Test
public void testFilenameWithUnrecognizedEL() throws IOException { public void testFilenameWithUnrecognizedEL() {
final String file = "data_${literal('testing'):substring(0,4)%7D"; final String file = "data_${literal('testing'):substring(0,4)%7D";
runner.setProperty(FetchHDFS.FILENAME, file); runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file")); runner.enqueue(new String("trigger flow file"));