mirror of https://github.com/apache/nifi.git
NIFI-2763: S3 processors do not work with older S3-compatible object stores
This closes #1076. Signed-off-by: James Wing <jvwing@gmail.com>
This commit is contained in:
parent
d4948a3778
commit
cfc738ec19
|
@ -21,6 +21,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -114,7 +115,16 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.defaultValue("${filename}")
|
.defaultValue("${filename}")
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor SIGNER_OVERRIDE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Signer Override")
|
||||||
|
.description("The AWS libraries use the default signer but this property allows you to specify a custom signer to support older S3-compatible services.")
|
||||||
|
.required(false)
|
||||||
|
.allowableValues(
|
||||||
|
new AllowableValue("Default Signature", "Default Signature"),
|
||||||
|
new AllowableValue("AWSS3V4Signer", "Signature v4"),
|
||||||
|
new AllowableValue("S3SignerType", "Signature v2"))
|
||||||
|
.defaultValue("Default Signature")
|
||||||
|
.build();
|
||||||
/**
|
/**
|
||||||
* Create client using credentials provider. This is the preferred way for creating clients
|
* Create client using credentials provider. This is the preferred way for creating clients
|
||||||
*/
|
*/
|
||||||
|
@ -122,6 +132,8 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
|
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
|
||||||
getLogger().info("Creating client with credentials provider");
|
getLogger().info("Creating client with credentials provider");
|
||||||
|
|
||||||
|
initializeSignerOverride(context, config);
|
||||||
|
|
||||||
final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config);
|
final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config);
|
||||||
|
|
||||||
initalizeEndpointOverride(context, s3);
|
initalizeEndpointOverride(context, s3);
|
||||||
|
@ -138,6 +150,14 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void initializeSignerOverride(final ProcessContext context, final ClientConfiguration config) {
|
||||||
|
String signer = context.getProperty(SIGNER_OVERRIDE).getValue();
|
||||||
|
|
||||||
|
if (signer != null && !signer.equals(SIGNER_OVERRIDE.getDefaultValue())) {
|
||||||
|
config.setSignerOverride(signer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create client using AWSCredentials
|
* Create client using AWSCredentials
|
||||||
*
|
*
|
||||||
|
@ -147,6 +167,8 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
||||||
getLogger().info("Creating client with AWS credentials");
|
getLogger().info("Creating client with AWS credentials");
|
||||||
|
|
||||||
|
initializeSignerOverride(context, config);
|
||||||
|
|
||||||
final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
|
final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
|
||||||
|
|
||||||
initalizeEndpointOverride(context, s3);
|
initalizeEndpointOverride(context, s3);
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class DeleteS3Object extends AbstractS3Processor {
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
|
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
||||||
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
|
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
|
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
|
||||||
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
|
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
|
|
@ -111,7 +111,7 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
|
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
|
||||||
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
||||||
PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS));
|
SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS));
|
||||||
|
|
||||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
||||||
|
|
|
@ -207,8 +207,8 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
|
||||||
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
|
ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE,
|
||||||
PROXY_HOST, PROXY_HOST_PORT));
|
SERVER_SIDE_ENCRYPTION, PROXY_HOST, PROXY_HOST_PORT));
|
||||||
|
|
||||||
final static String S3_BUCKET_KEY = "s3.bucket";
|
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||||
final static String S3_OBJECT_KEY = "s3.key";
|
final static String S3_OBJECT_KEY = "s3.key";
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
DeleteS3Object processor = new DeleteS3Object();
|
DeleteS3Object processor = new DeleteS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 19, pd.size());
|
assertEquals("size should be eq", 20, pd.size());
|
||||||
assertTrue(pd.contains(processor.ACCESS_KEY));
|
assertTrue(pd.contains(processor.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(processor.BUCKET));
|
assertTrue(pd.contains(processor.BUCKET));
|
||||||
|
@ -159,6 +159,7 @@ public class ITDeleteS3Object extends AbstractS3IT {
|
||||||
assertTrue(pd.contains(processor.READ_USER_LIST));
|
assertTrue(pd.contains(processor.READ_USER_LIST));
|
||||||
assertTrue(pd.contains(processor.REGION));
|
assertTrue(pd.contains(processor.REGION));
|
||||||
assertTrue(pd.contains(processor.SECRET_KEY));
|
assertTrue(pd.contains(processor.SECRET_KEY));
|
||||||
|
assertTrue(pd.contains(processor.SIGNER_OVERRIDE));
|
||||||
assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE));
|
assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE));
|
||||||
assertTrue(pd.contains(processor.TIMEOUT));
|
assertTrue(pd.contains(processor.TIMEOUT));
|
||||||
assertTrue(pd.contains(processor.VERSION_ID));
|
assertTrue(pd.contains(processor.VERSION_ID));
|
||||||
|
|
|
@ -164,7 +164,7 @@ public class ITFetchS3Object extends AbstractS3IT {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
FetchS3Object processor = new FetchS3Object();
|
FetchS3Object processor = new FetchS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 13, pd.size());
|
assertEquals("size should be eq", 14, pd.size());
|
||||||
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(FetchS3Object.BUCKET));
|
assertTrue(pd.contains(FetchS3Object.BUCKET));
|
||||||
|
@ -173,6 +173,7 @@ public class ITFetchS3Object extends AbstractS3IT {
|
||||||
assertTrue(pd.contains(FetchS3Object.KEY));
|
assertTrue(pd.contains(FetchS3Object.KEY));
|
||||||
assertTrue(pd.contains(FetchS3Object.REGION));
|
assertTrue(pd.contains(FetchS3Object.REGION));
|
||||||
assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
|
assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
|
||||||
|
assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE));
|
||||||
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
|
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
|
||||||
assertTrue(pd.contains(FetchS3Object.TIMEOUT));
|
assertTrue(pd.contains(FetchS3Object.TIMEOUT));
|
||||||
assertTrue(pd.contains(FetchS3Object.VERSION_ID));
|
assertTrue(pd.contains(FetchS3Object.VERSION_ID));
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class ITListS3 extends AbstractS3IT {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
ListS3 processor = new ListS3();
|
ListS3 processor = new ListS3();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 14, pd.size());
|
assertEquals("size should be eq", 15, pd.size());
|
||||||
assertTrue(pd.contains(ListS3.ACCESS_KEY));
|
assertTrue(pd.contains(ListS3.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(ListS3.BUCKET));
|
assertTrue(pd.contains(ListS3.BUCKET));
|
||||||
|
@ -157,6 +157,7 @@ public class ITListS3 extends AbstractS3IT {
|
||||||
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
|
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
|
||||||
assertTrue(pd.contains(ListS3.REGION));
|
assertTrue(pd.contains(ListS3.REGION));
|
||||||
assertTrue(pd.contains(ListS3.SECRET_KEY));
|
assertTrue(pd.contains(ListS3.SECRET_KEY));
|
||||||
|
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
|
||||||
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
|
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
|
||||||
assertTrue(pd.contains(ListS3.TIMEOUT));
|
assertTrue(pd.contains(ListS3.TIMEOUT));
|
||||||
assertTrue(pd.contains(ListS3.PROXY_HOST));
|
assertTrue(pd.contains(ListS3.PROXY_HOST));
|
||||||
|
|
|
@ -334,7 +334,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
PutS3Object processor = new PutS3Object();
|
PutS3Object processor = new PutS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 26, pd.size());
|
assertEquals("size should be eq", 27, pd.size());
|
||||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||||
|
@ -348,6 +348,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
assertTrue(pd.contains(PutS3Object.READ_USER_LIST));
|
assertTrue(pd.contains(PutS3Object.READ_USER_LIST));
|
||||||
assertTrue(pd.contains(PutS3Object.REGION));
|
assertTrue(pd.contains(PutS3Object.REGION));
|
||||||
assertTrue(pd.contains(PutS3Object.SECRET_KEY));
|
assertTrue(pd.contains(PutS3Object.SECRET_KEY));
|
||||||
|
assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE));
|
||||||
assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE));
|
assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.TIMEOUT));
|
assertTrue(pd.contains(PutS3Object.TIMEOUT));
|
||||||
assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID));
|
assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID));
|
||||||
|
|
Loading…
Reference in New Issue