NIFI-4544: Improve HDFS processors provenance transit URL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2238.
This commit is contained in:
Koji Kawamura 2017-10-30 11:35:13 +09:00 committed by Pierre Villard
parent 6413337918
commit 77a51e1a9e
9 changed files with 41 additions and 19 deletions

View File

@ -20,7 +20,6 @@ import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
@ -232,9 +231,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
successFlowFile = session.putAllAttributes(successFlowFile, attributes);
final URI uri = path.toUri();
getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(successFlowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(successFlowFile, REL_SUCCESS);
session.remove(originalFlowFile);
return null;

View File

@ -349,7 +349,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
putFlowFile = postProcess(context, session, putFlowFile, destFile);
final String outputPath = destFile.toString();
final String newFilename = destFile.getName();
final String hdfsPath = destFile.getParent().toString();
@ -361,8 +360,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
putFlowFile = session.putAllAttributes(putFlowFile, attributes);
// Send a provenance event and transfer to success
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(putFlowFile, transitUri);
final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, REL_SUCCESS);
} catch (IOException | FlowFileAccessException e) {

View File

@ -46,7 +46,6 @@ import org.apache.nifi.util.StopWatch;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
@ -128,7 +127,6 @@ public class FetchHDFS extends AbstractHadoopProcessor {
return;
}
final URI uri = path.toUri();
final StopWatch stopWatch = new StopWatch(true);
final FlowFile finalFlowFile = flowFile;
@ -149,6 +147,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
}
FlowFile flowFile = finalFlowFile;
final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
final String originalFilename = path.getName();
@ -166,16 +165,16 @@ public class FetchHDFS extends AbstractHadoopProcessor {
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final IOException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_COMMS_FAILURE);
} finally {

View File

@ -381,8 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue;
}
final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename;
session.getProvenanceReporter().receive(flowFile, transitUri);
session.getProvenanceReporter().receive(flowFile, file.toString());
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, file, millis, dataRate});

View File

@ -357,13 +357,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{putFlowFile, copyFile, millis, dataRate});
final String outputPath = copyFile.toString();
final String newFilename = copyFile.getName();
final String hdfsPath = copyFile.getParent().toString();
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(putFlowFile, transitUri);
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, REL_SUCCESS);

View File

@ -21,6 +21,8 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
@ -214,6 +216,12 @@ public class GetHDFSTest {
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord receiveEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, receiveEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(receiveEvent.getTransitUri().endsWith("13545423550275052.zip"));
}
@Test

View File

@ -27,6 +27,8 @@ import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
@ -220,6 +222,13 @@ public class PutHDFSTest {
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
}
@Test

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.hadoop;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
@ -61,6 +63,12 @@ public class TestFetchHDFS {
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord fetchEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(fetchEvent.getTransitUri().endsWith(file));
}
@Test

View File

@ -143,7 +143,8 @@ public class PutParquetTest {
// verify it was a SEND event with the correct URI
final ProvenanceEventRecord provEvent = provEvents.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
Assert.assertEquals("hdfs://" + avroParquetFile.toString(), provEvent.getTransitUri());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
// verify the content of the parquet file by reading it back in
verifyAvroParquetUsers(avroParquetFile, 100);