mirror of https://github.com/apache/nifi.git
NIFI-797 Corrected URL for PutS3 to include correct protocol and endpoint by region.
- corrected a missed 'final' on org.apache.nifi.processors.aws.AbstractAWSProcessor.relationships - added protected method org.apache.nifi.processors.aws.AbstractAWSProcessor.getRegion() - added protected method org.apache.nifi.processors.aws.s3.AbstractS3Processor.getUrlForObject(String, String) - explicitly set AWS client protocol to HTTPS, and created a static final field with comments if other protocols may be considered - added a static final field for the UserAgent Reviewed by Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
b75af5b344
commit
2a90bd501b
|
@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
|
||||
import com.amazonaws.AmazonWebServiceClient;
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.AnonymousAWSCredentials;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
|
@ -54,7 +55,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
|
||||
|
||||
public static Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||
|
||||
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
|
||||
|
@ -92,6 +93,11 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
.build();
|
||||
|
||||
private volatile ClientType client;
|
||||
private volatile Region region;
|
||||
|
||||
// If protocol is changed to be a property, ensure other uses are also changed
|
||||
protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
|
||||
protected static final String DEFAULT_USER_AGENT = "NiFi";
|
||||
|
||||
private static AllowableValue createAllowableValue(final Regions regions) {
|
||||
return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
|
||||
|
@ -133,8 +139,9 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
final ClientConfiguration config = new ClientConfiguration();
|
||||
config.setMaxConnections(context.getMaxConcurrentTasks());
|
||||
config.setMaxErrorRetry(0);
|
||||
config.setUserAgent("NiFi");
|
||||
|
||||
config.setUserAgent(DEFAULT_USER_AGENT);
|
||||
// If this is changed to be a property, ensure other uses are also changed
|
||||
config.setProtocol(DEFAULT_PROTOCOL);
|
||||
final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
config.setConnectionTimeout(commsTimeout);
|
||||
config.setSocketTimeout(commsTimeout);
|
||||
|
@ -151,7 +158,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
if (getSupportedPropertyDescriptors().contains(REGION)) {
|
||||
final String region = context.getProperty(REGION).getValue();
|
||||
if (region != null) {
|
||||
client.setRegion(Region.getRegion(Regions.fromName(region)));
|
||||
this.region = Region.getRegion(Regions.fromName(region));
|
||||
client.setRegion(this.region);
|
||||
} else{
|
||||
this.region = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -162,6 +172,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
return client;
|
||||
}
|
||||
|
||||
protected Region getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
protected AWSCredentials getCredentials(final ProcessContext context) {
|
||||
final String accessKey = context.getProperty(ACCESS_KEY).getValue();
|
||||
final String secretKey = context.getProperty(SECRET_KEY).getValue();
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
|||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.regions.Region;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.AccessControlList;
|
||||
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
||||
|
@ -134,6 +135,17 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C
|
|||
return grantees;
|
||||
}
|
||||
|
||||
protected String getUrlForObject(final String bucket, final String key) {
|
||||
Region region = getRegion();
|
||||
|
||||
if (region == null) {
|
||||
return DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + bucket + "/" + key;
|
||||
} else {
|
||||
final String endpoint = region.getServiceEndpoint("s3");
|
||||
return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + bucket + "/" + key;
|
||||
}
|
||||
}
|
||||
|
||||
protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) {
|
||||
final AccessControlList acl = new AccessControlList();
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
}
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final String url = "http://" + bucket + ".s3.amazonaws.com/" + key;
|
||||
final String url = getUrlForObject(bucket, key);
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, url, millis);
|
||||
|
||||
|
|
Loading…
Reference in New Issue