1
0
mirror of https://github.com/apache/nifi.git synced 2025-02-15 22:45:27 +00:00

NIFI-12801 Add local file upload option in PutHDFS processor

This closes .

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
shubhamsharma 2024-02-15 10:31:09 -08:00 committed by Peter Turcsanyi
parent 30a44464f7
commit a21c2544ad
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
3 changed files with 91 additions and 2 deletions
nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors
pom.xml
src
main/java/org/apache/nifi/processors/hadoop
test/java/org/apache/nifi/processors/hadoop

@ -112,6 +112,21 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-resource-transfer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>

@ -48,6 +48,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
@ -80,6 +81,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.io.InputStream;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
/**
* This processor copies FlowFiles to HDFS.
@ -260,6 +266,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
props.add(IGNORE_LOCALITY);
props.add(RESOURCE_TRANSFER_SOURCE);
props.add(FILE_RESOURCE_SERVICE);
return props;
}
@ -402,7 +410,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
session.read(putFlowFile, in -> {
final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
try (final InputStream in = getFileResource(resourceTransferSource, context, flowFile.getAttributes())
.map(FileResource::getInputStream).orElseGet(() -> session.read(flowFile))) {
OutputStream fos = null;
Path createdFile = null;
try {
@ -463,7 +473,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
}
fos = null;
}
});
}
stopWatch.stop();
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);

@ -25,7 +25,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.fileresource.service.StandardFileResourceService;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.transfer.ResourceTransferProperties;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
@ -46,12 +50,17 @@ import javax.security.sasl.SaslException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.Collections;
import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
@ -703,6 +712,61 @@ public class PutHDFSTest {
mockFileSystem.delete(p, true);
}
@Test
public void testPutFileFromLocalFile() throws Exception {
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION);
runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE);
//Adding StandardFileResourceService controller service
String attributeName = "file.path";
String serviceId = FileResourceService.class.getSimpleName();
FileResourceService service = new StandardFileResourceService();
byte[] FILE_DATA = "0123456789".getBytes(StandardCharsets.UTF_8);
byte[] EMPTY_CONTENT = new byte[0];
runner.addControllerService(serviceId, service);
runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName));
runner.enableControllerService(service);
runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE, serviceId);
java.nio.file.Path tempFilePath = Files.createTempFile("PutHDFS_testPutFileFromLocalFile_", "");
Files.write(tempFilePath, FILE_DATA);
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
attributes.put(attributeName, tempFilePath.toString());
runner.enqueue(EMPTY_CONTENT, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHDFS.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).get(0);
flowFile.assertContentEquals(EMPTY_CONTENT);
//assert HDFS File and Directory structures
assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
verify(spyFileSystem, Mockito.never()).rename(any(Path.class), any(Path.class));
//assert Provenance events
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(ProvenanceEventType.SEND);
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
.map(ProvenanceEventRecord::getEventType)
.collect(Collectors.toSet());
assertEquals(expectedEventTypes, actualEventTypes);
}
@Test
public void testPutFileWithCreateException() throws IOException {
mockFileSystem = new MockFileSystem();