mirror of https://github.com/apache/nifi.git
NIFI-1929: Improvements for PutHDFS attribute handling
This closes #486.
This commit is contained in:
parent
8d8a9cba79
commit
b32c70c419
|
@ -24,7 +24,9 @@ import org.apache.hadoop.io.compress.CompressionCodec;
|
|||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
|
@ -64,7 +66,11 @@ import java.util.concurrent.TimeUnit;
|
|||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
|
||||
@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
|
||||
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
|
||||
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
|
||||
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")
|
||||
})
|
||||
@SeeAlso(GetHDFS.class)
|
||||
public class PutHDFS extends AbstractHadoopProcessor {
|
||||
|
||||
|
@ -75,6 +81,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
|
||||
public static final int BUFFER_SIZE_DEFAULT = 4096;
|
||||
|
||||
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
|
||||
|
||||
// relationships
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
|
@ -329,8 +337,13 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
new Object[]{flowFile, copyFile, millis, dataRate});
|
||||
|
||||
final String outputPath = copyFile.toString();
|
||||
final String newFilename = copyFile.getName();
|
||||
final String hdfsPath = copyFile.getParent().toString();
|
||||
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
|
||||
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
|
||||
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
|
||||
session.getProvenanceReporter().send(flowFile, transitUri);
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
} catch (final Throwable t) {
|
||||
|
|
|
@ -214,7 +214,45 @@ public class PutHDFSTest {
|
|||
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
|
||||
assertTrue(failedFlowFiles.isEmpty());
|
||||
|
||||
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
|
||||
assertEquals(1, flowFiles.size());
|
||||
MockFlowFile flowFile = flowFiles.get(0);
|
||||
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
|
||||
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
|
||||
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutFileWithCompression() throws IOException {
|
||||
// Refer to comment in the BeforeClass method for an explanation
|
||||
assumeTrue(isNotWindows());
|
||||
|
||||
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
|
||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||
runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP");
|
||||
runner.setValidateExpressionUsage(false);
|
||||
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||
runner.enqueue(fis, attributes);
|
||||
runner.run();
|
||||
}
|
||||
|
||||
Configuration config = new Configuration();
|
||||
FileSystem fs = FileSystem.get(config);
|
||||
|
||||
List<MockFlowFile> failedFlowFiles = runner
|
||||
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
|
||||
assertTrue(failedFlowFiles.isEmpty());
|
||||
|
||||
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
|
||||
assertEquals(1, flowFiles.size());
|
||||
MockFlowFile flowFile = flowFiles.get(0);
|
||||
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1.gz")));
|
||||
assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
|
||||
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue