NIFI-13203: Include s3.url attribute in FetchS3Object, PutS3Object

This closes #8795

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2024-05-09 17:02:18 -04:00 committed by Joseph Witt
parent 7584a5c6f5
commit db7e073368
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 12 additions and 13 deletions

View File

@ -70,6 +70,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
@WritesAttributes({
@WritesAttribute(attribute = "s3.url", description = "The URL that can be used to access the S3 object"),
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
@WritesAttribute(attribute = "path", description = "The path of the file"),
@WritesAttribute(attribute = "absolute.path", description = "The path of the file"),
@ -439,12 +440,11 @@ public class FetchS3Object extends AbstractS3Processor {
throw ffae;
}
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
final String url = client.getResourceUrl(bucket, key);
attributes.put("s3.url", url);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final String url = client.getResourceUrl(bucket, key);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
session.getProvenanceReporter().fetch(flowFile, url, transferMillis);

View File

@ -105,6 +105,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
@WritesAttribute(attribute = "s3.url", description = "The URL that can be used to access the S3 object"),
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
@ -857,12 +858,11 @@ public class PutS3Object extends AbstractS3Processor {
throw e;
}
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
final String url = s3.getResourceUrl(bucket, key);
attributes.put("s3.url", url);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final String url = s3.getResourceUrl(bucket, key);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);

View File

@ -449,9 +449,9 @@ public class ITPutS3Object extends AbstractS3IT {
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE, BUCKET_NAME);
runner.setProperty(PutS3Object.KEY, "${filename}");
Map<String, String> attribs = new HashMap<>();
attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
runner.enqueue("prov1 contents".getBytes(), attribs);
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
runner.enqueue("prov1 contents".getBytes(), attributes);
runner.assertValid();
runner.run();
@ -461,12 +461,11 @@ public class ITPutS3Object extends AbstractS3IT {
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
ProvenanceEventRecord provRec1 = provenanceEvents.get(0);
ProvenanceEventRecord provRec1 = provenanceEvents.getFirst();
assertEquals(ProvenanceEventType.SEND, provRec1.getEventType());
assertEquals(runner.getProcessor().getIdentifier(), provRec1.getComponentId());
String targetUri = getClient().getUrl(BUCKET_NAME, PROV1_FILE).toString();
assertEquals(targetUri, provRec1.getTransitUri());
assertEquals(8, provRec1.getUpdatedAttributes().size());
assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY));
}