NIFI-11124: Add hadoop.file.url attribute to HDFS processors

This closes #6916.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Bathori 2023-02-01 20:21:51 +01:00 committed by Peter Turcsanyi
parent efe7b7d30f
commit b375f4b138
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
16 changed files with 77 additions and 36 deletions

View File

@ -158,6 +158,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
public static final String HADOOP_FILE_URL_ATTRIBUTE = "hadoop.file.url";
protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = "target.dir.created";

View File

@ -16,21 +16,6 @@
*/
package org.apache.nifi.processors.hadoop;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -57,6 +42,21 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Base processor for reading a data from HDFS that can be fetched into records.
*/
@ -234,6 +234,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
successFlowFile = session.putAttribute(successFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(successFlowFile, REL_SUCCESS);

View File

@ -378,6 +378,7 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
// Send a provenance event and transfer to success
final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
putFlowFile = session.putAttribute(putFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, REL_SUCCESS);

View File

@ -72,6 +72,7 @@ import java.util.regex.Pattern;
+ "If multiple files are deleted, then only the last filename is set."),
@WritesAttribute(attribute="hdfs.path", description="HDFS Path specified in the delete request. "
+ "If multiple paths are deleted, then only the last path is set."),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file to be deleted."),
@WritesAttribute(attribute="hdfs.error.message", description="HDFS error message related to the hdfs.error.code")
})
@SeeAlso({ListHDFS.class, PutHDFS.class})
@ -176,6 +177,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
fileSystem.delete(path, isRecursive(context, session));
getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible

View File

@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -61,8 +62,11 @@ import java.util.concurrent.TimeUnit;
@Tags({"hadoop", "hcfs", "hdfs", "get", "ingest", "fetch", "source"})
@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. "
+ "The file in HDFS is left intact without any changes being made to it.")
@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could "
+ "not be fetched from HDFS")
@WritesAttributes({
@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could "
+ "not be fetched from HDFS"),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute.")
})
@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
@Restricted(restrictions = {
@Restriction(
@ -173,6 +177,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {

View File

@ -75,7 +75,9 @@ import java.util.regex.Pattern;
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")})
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute.")
})
@SeeAlso({PutHDFS.class, GetHDFS.class})
@Restricted(restrictions = {
@Restriction(
@ -426,6 +428,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/");
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);

View File

@ -89,6 +89,7 @@ import java.util.stream.Stream;
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute."),
@WritesAttribute(attribute = "target.dir.created", description = "The result(true/false) indicates if the folder is created by the processor.")
})
@SeeAlso(GetHDFS.class)
@ -456,6 +457,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
putFlowFile = session.putAttribute(putFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, getSuccessRelationship());

View File

@ -233,6 +233,7 @@ public class PutHDFSTest {
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
verify(spyFileSystem, times(1)).rename(any(Path.class), any(Path.class));
}
@ -267,6 +268,8 @@ public class PutHDFSTest {
assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
verify(spyFileSystem, Mockito.never()).rename(any(Path.class), any(Path.class));
}
@ -304,6 +307,7 @@ public class PutHDFSTest {
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith("target/test-classes/randombytes-1"));
}
@Test
@ -330,6 +334,7 @@ public class PutHDFSTest {
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1.gz")));
assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME + ".gz"));
}
@Test
@ -423,6 +428,7 @@ public class PutHDFSTest {
assertTrue(mockFileSystem.exists(new Path("target/data_test/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/data_test", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith("target/data_test/" + FILE_NAME));
}
@Test

View File

@ -36,6 +36,7 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -77,6 +78,10 @@ public class TestDeleteHDFS {
assertEquals(1, provenanceEvents.size());
assertEquals(ProvenanceEventType.REMOTE_INVOCATION, provenanceEvents.get(0).getEventType());
assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", provenanceEvents.get(0).getTransitUri());
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE));
}
@Test

View File

@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -71,6 +72,9 @@ public class TestFetchHDFS {
assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(fetchEvent.getTransitUri().endsWith(file));
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(file));
}
@Test

View File

@ -60,6 +60,7 @@ import java.util.List;
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute."),
@WritesAttribute(attribute = "record.count", description = "The number of records written to the ORC file"),
@WritesAttribute(attribute = "hive.ddl", description = "Creates a partial Hive DDL statement for creating an external table in Hive from the destination folder. "
+ "This can be used in ReplaceText for setting the content to the DDL. To make it valid DDL, add \"LOCATION '<path_to_orc_file_in_hdfs>'\", where "

View File

@ -80,6 +80,7 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.function.BiFunction;
import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -191,6 +192,7 @@ public class PutORCTest {
mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100");
mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
"CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC");
assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + filename));
// verify we generated a provenance event
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
@ -260,6 +262,7 @@ public class PutORCTest {
// DDL will be created with field names normalized (lowercased, e.g.) for Hive by default
mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
"CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED AS ORC");
assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + filename));
// verify we generated a provenance event
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();

View File

@ -48,7 +48,8 @@ import java.io.IOException;
@WritesAttributes({
@WritesAttribute(attribute="fetch.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added " +
"indicating why the file could not be fetched from the given filesystem."),
@WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")
@WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file"),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute.")
})
@SeeAlso({PutParquet.class})
@Restricted(restrictions = {

View File

@ -66,6 +66,7 @@ import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."),
@WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop url for the file is stored in this attribute."),
@WritesAttribute(attribute = "record.count", description = "The number of records written to the Parquet file")
})
@Restricted(restrictions = {

View File

@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -131,6 +132,7 @@ public class FetchParquetTest {
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(USERS));
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + parquetFile.getName()));
// the mock record writer will write the header for each record so replace those to get down to just the records
String flowFileContent = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);

View File

@ -16,22 +16,6 @@
*/
package org.apache.nifi.processors.parquet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
@ -42,10 +26,10 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@ -70,6 +54,23 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@DisabledOnOs(OS.WINDOWS)
public class PutParquetTest {
@ -135,6 +136,7 @@ public class PutParquetTest {
mockFlowFile.assertAttributeEquals(PutParquet.ABSOLUTE_HDFS_PATH_ATTRIBUTE, avroParquetFile.getParent().toString());
mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
mockFlowFile.assertAttributeEquals(PutParquet.RECORD_COUNT_ATTR, "100");
assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY + "/" + filename));
// verify we generated a provenance event
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();