mirror of https://github.com/apache/nifi.git
NIFI-10969: Created extension point for Signer Override in AWS S3 processors
Also added Signer Override property in AWSCredentialsProviderControllerService with Custom Signer extension point. Made Assume Role related properties dependent on Assume Role ARN property in AWSCredentialsProviderControllerService. Fixed S3 IT tests. This closes #6777. Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
parent
78be613a0f
commit
1e23e5146f
|
@ -29,6 +29,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -397,8 +398,8 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
|
|||
|
||||
/**
|
||||
* Stores allowable values from an enum class.
|
||||
* @param enumClass an enum class that implements the Allowable interface and contains a set of values
|
||||
* @param <E> generic parameter for an enum class that implements the Allowable interface
|
||||
* @param enumClass an enum class that implements the DescribedValue interface and contains a set of values
|
||||
* @param <E> generic parameter for an enum class that implements the DescribedValue interface
|
||||
* @return the builder
|
||||
*/
|
||||
public <E extends Enum<E> & DescribedValue> Builder allowableValues(final Class<E> enumClass) {
|
||||
|
@ -409,6 +410,20 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores allowable values from a set of enum values.
|
||||
* @param enumValues a set of enum values that implements the DescribedValue interface
|
||||
* @param <E> generic parameter for the enum values' class that implements the DescribedValue interface
|
||||
* @return the builder
|
||||
*/
|
||||
public <E extends Enum<E> & DescribedValue> Builder allowableValues(final EnumSet<E> enumValues) {
|
||||
this.allowableValues = new ArrayList<>();
|
||||
for (E enumValue : enumValues) {
|
||||
this.allowableValues.add(new AllowableValue(enumValue.getValue(), enumValue.getDisplayName(), enumValue.getDescription()));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param values constrained set of values
|
||||
* @return the builder
|
||||
|
|
|
@ -40,6 +40,18 @@ public enum EnumAllowableValue implements DescribedValue {
|
|||
public String getDescription() {
|
||||
return "RedDescription";
|
||||
}
|
||||
},
|
||||
|
||||
BLUE {
|
||||
@Override
|
||||
public String getDisplayName() {
|
||||
return "BlueDisplayName";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "BlueDescription";
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,10 +25,12 @@ import org.junit.jupiter.api.Test;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -67,7 +69,7 @@ public class TestPropertyDescriptor {
|
|||
}
|
||||
|
||||
@Test
|
||||
void testAllowableValuesWithEnumValue() {
|
||||
void testAllowableValuesWithEnumClass() {
|
||||
final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder()
|
||||
.name("enumAllowableValueDescriptor")
|
||||
.allowableValues(EnumAllowableValue.class)
|
||||
|
@ -81,6 +83,24 @@ public class TestPropertyDescriptor {
|
|||
assertEquals(expectedAllowableValues, propertyDescriptor.getAllowableValues());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAllowableValuesWithEnumSet() {
|
||||
final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder()
|
||||
.name("enumAllowableValueDescriptor")
|
||||
.allowableValues(EnumSet.of(
|
||||
EnumAllowableValue.GREEN,
|
||||
EnumAllowableValue.BLUE
|
||||
))
|
||||
.build();
|
||||
|
||||
assertNotNull(propertyDescriptor);
|
||||
|
||||
final List<AllowableValue> expectedAllowableValues = Stream.of(EnumAllowableValue.GREEN, EnumAllowableValue.BLUE)
|
||||
.map(enumValue -> new AllowableValue(enumValue.getValue(), enumValue.getDisplayName(), enumValue.getDescription()))
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(expectedAllowableValues, propertyDescriptor.getAllowableValues());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDependsOnWithEnumValue() {
|
||||
final PropertyDescriptor dependentPropertyDescriptor = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -343,8 +343,11 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
// e.g. in case of https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
|
||||
String regionValue = parseRegionForVPCE(urlstr, region.getName());
|
||||
client.setEndpoint(urlstr, client.getServiceName(), regionValue);
|
||||
} else if (isCustomSignerConfigured(context)) {
|
||||
// handling endpoints with a user defined custom signer
|
||||
client.setEndpoint(urlstr, client.getServiceName(), region.getName());
|
||||
} else {
|
||||
// handling non-vpce custom endpoints where the AWS library can parse the region out
|
||||
// handling other (non-vpce, no custom signer) custom endpoints where the AWS library can parse the region out
|
||||
// e.g. https://sqs.{region}.***.***.***.gov
|
||||
client.setEndpoint(urlstr);
|
||||
}
|
||||
|
@ -353,6 +356,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
|||
return region;
|
||||
}
|
||||
|
||||
protected boolean isCustomSignerConfigured(final ProcessContext context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
Note to developer(s):
|
||||
When setting an endpoint for an AWS Client i.e. client.setEndpoint(endpointUrl),
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws;
|
||||
|
||||
import com.amazonaws.auth.Signer;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
public final class AwsPropertyDescriptors {
|
||||
|
||||
private AwsPropertyDescriptors() {
|
||||
// constant class' constructor
|
||||
}
|
||||
|
||||
public static final PropertyDescriptor CUSTOM_SIGNER_CLASS_NAME = new PropertyDescriptor.Builder()
|
||||
.name("custom-signer-class-name")
|
||||
.displayName("Custom Signer Class Name")
|
||||
.description(String.format("Fully qualified class name of the custom signer class. The signer must implement %s interface.", Signer.class.getName()))
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CUSTOM_SIGNER_MODULE_LOCATION = new PropertyDescriptor.Builder()
|
||||
.name("custom-signer-module-location")
|
||||
.displayName("Custom Signer Module Location")
|
||||
.description("Comma-separated list of paths to files and/or directories which contain the custom signer's JAR file and its dependencies (if any).")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
|
||||
.dynamicallyModifiesClasspath(true)
|
||||
.build();
|
||||
|
||||
}
|
|
@ -22,11 +22,17 @@ import org.apache.nifi.components.resource.ResourceCardinality;
|
|||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.AwsPropertyDescriptors;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_V4_SIGNER;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
|
||||
|
||||
/**
|
||||
* Shared definitions of properties that specify various AWS credentials.
|
||||
*
|
||||
|
@ -122,7 +128,7 @@ public class CredentialPropertyDescriptors {
|
|||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(false)
|
||||
.description("The AWS Role ARN for cross account access. This is used in conjunction with role name and session timeout")
|
||||
.description("The AWS Role ARN for cross account access. This is used in conjunction with Assume Role Session Name and other Assume Role properties.")
|
||||
.build();
|
||||
|
||||
/**
|
||||
|
@ -132,10 +138,11 @@ public class CredentialPropertyDescriptors {
|
|||
.name("Assume Role Session Name")
|
||||
.displayName("Assume Role Session Name")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(false)
|
||||
.description("The AWS Role Name for cross account access. This is used in conjunction with role ARN and session time out")
|
||||
.description("The AWS Role Session Name for cross account access. This is used in conjunction with Assume Role ARN.")
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
/**
|
||||
|
@ -143,11 +150,13 @@ public class CredentialPropertyDescriptors {
|
|||
*/
|
||||
public static final PropertyDescriptor MAX_SESSION_TIME = new PropertyDescriptor.Builder()
|
||||
.name("Session Time")
|
||||
.description("Session time for role based session (between 900 and 3600 seconds). This is used in conjunction with role ARN and name")
|
||||
.displayName("Assume Role Session Time")
|
||||
.description("Session time for role based session (between 900 and 3600 seconds). This is used in conjunction with Assume Role ARN.")
|
||||
.defaultValue("3600")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.sensitive(false)
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
/**
|
||||
|
@ -160,8 +169,8 @@ public class CredentialPropertyDescriptors {
|
|||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(false)
|
||||
.description("External ID for cross-account access. This is used in conjunction with role arn, " +
|
||||
"role name, and optional session time out")
|
||||
.description("External ID for cross-account access. This is used in conjunction with Assume Role ARN.")
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
/**
|
||||
|
@ -174,7 +183,8 @@ public class CredentialPropertyDescriptors {
|
|||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(false)
|
||||
.description("Proxy host for cross-account access, if needed within your environment. This will configure a proxy to request for temporary access keys into another AWS account")
|
||||
.description("Proxy host for cross-account access, if needed within your environment. This will configure a proxy to request for temporary access keys into another AWS account.")
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ASSUME_ROLE_PROXY_PORT = new PropertyDescriptor.Builder()
|
||||
|
@ -184,12 +194,13 @@ public class CredentialPropertyDescriptors {
|
|||
.required(false)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.sensitive(false)
|
||||
.description("Proxy port for cross-account access, if needed within your environment. This will configure a proxy to request for temporary access keys into another AWS account")
|
||||
.description("Proxy port for cross-account access, if needed within your environment. This will configure a proxy to request for temporary access keys into another AWS account.")
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ASSUME_ROLE_STS_ENDPOINT = new PropertyDescriptor.Builder()
|
||||
.name("assume-role-sts-endpoint")
|
||||
.displayName("Assume Role STS Endpoint")
|
||||
.displayName("Assume Role STS Endpoint Override")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
|
@ -198,17 +209,41 @@ public class CredentialPropertyDescriptors {
|
|||
"all accounts that are not for China (Beijing) region or GovCloud. You only need to set " +
|
||||
"this property to \"sts.cn-north-1.amazonaws.com.cn\" when you are requesting session credentials " +
|
||||
"for services in China(Beijing) region or to \"sts.us-gov-west-1.amazonaws.com\" for GovCloud.")
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ASSUME_ROLE_REGION = new PropertyDescriptor.Builder()
|
||||
public static final PropertyDescriptor ASSUME_ROLE_STS_REGION = new PropertyDescriptor.Builder()
|
||||
.name("assume-role-sts-region")
|
||||
.displayName("Region")
|
||||
.displayName("Assume Role STS Region")
|
||||
.description("The AWS Security Token Service (STS) region")
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.allowableValues(getAvailableRegions())
|
||||
.defaultValue(createAllowableValue(Region.US_WEST_2).getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ASSUME_ROLE_STS_SIGNER_OVERRIDE = new PropertyDescriptor.Builder()
|
||||
.name("assume-role-sts-signer-override")
|
||||
.displayName("Assume Role STS Signer Override")
|
||||
.description("The AWS STS library uses Signature Version 4 by default. This property allows you to plug in your own custom signer implementation.")
|
||||
.required(false)
|
||||
.allowableValues(EnumSet.of(
|
||||
DEFAULT_SIGNER,
|
||||
AWS_V4_SIGNER,
|
||||
CUSTOM_SIGNER))
|
||||
.defaultValue(DEFAULT_SIGNER.getValue())
|
||||
.dependsOn(ASSUME_ROLE_ARN)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AwsPropertyDescriptors.CUSTOM_SIGNER_CLASS_NAME)
|
||||
.dependsOn(ASSUME_ROLE_STS_SIGNER_OVERRIDE, CUSTOM_SIGNER)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ASSUME_ROLE_STS_CUSTOM_SIGNER_MODULE_LOCATION = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AwsPropertyDescriptors.CUSTOM_SIGNER_MODULE_LOCATION)
|
||||
.dependsOn(ASSUME_ROLE_STS_SIGNER_OVERRIDE, CUSTOM_SIGNER)
|
||||
.build();
|
||||
|
||||
public static AllowableValue createAllowableValue(final Region region) {
|
||||
return new AllowableValue(region.id(), region.metadata().description(), "AWS Region Code : " + region.id());
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@ import com.amazonaws.services.s3.model.Grantee;
|
|||
import com.amazonaws.services.s3.model.Owner;
|
||||
import com.amazonaws.services.s3.model.Permission;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
@ -41,11 +40,20 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
||||
import org.apache.nifi.processors.aws.AwsPropertyDescriptors;
|
||||
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
|
||||
import org.apache.nifi.processors.aws.signer.AwsSignerType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V2_SIGNER;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.AWS_S3_V4_SIGNER;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
|
||||
|
||||
public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor<AmazonS3Client> {
|
||||
|
||||
public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder()
|
||||
|
@ -121,14 +129,27 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
|||
.build();
|
||||
public static final PropertyDescriptor SIGNER_OVERRIDE = new PropertyDescriptor.Builder()
|
||||
.name("Signer Override")
|
||||
.description("The AWS libraries use the default signer but this property allows you to specify a custom signer to support older S3-compatible services.")
|
||||
.description("The AWS S3 library uses Signature Version 4 by default but this property allows you to specify the Version 2 signer to support older S3-compatible services" +
|
||||
" or even to plug in your own custom signer implementation.")
|
||||
.required(false)
|
||||
.allowableValues(
|
||||
new AllowableValue("Default Signature", "Default Signature"),
|
||||
new AllowableValue("AWSS3V4SignerType", "Signature v4"),
|
||||
new AllowableValue("S3SignerType", "Signature v2"))
|
||||
.defaultValue("Default Signature")
|
||||
.allowableValues(EnumSet.of(
|
||||
DEFAULT_SIGNER,
|
||||
AWS_S3_V4_SIGNER,
|
||||
AWS_S3_V2_SIGNER,
|
||||
CUSTOM_SIGNER))
|
||||
.defaultValue(DEFAULT_SIGNER.getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor S3_CUSTOM_SIGNER_CLASS_NAME = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AwsPropertyDescriptors.CUSTOM_SIGNER_CLASS_NAME)
|
||||
.dependsOn(SIGNER_OVERRIDE, CUSTOM_SIGNER)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor S3_CUSTOM_SIGNER_MODULE_LOCATION = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AwsPropertyDescriptors.CUSTOM_SIGNER_MODULE_LOCATION)
|
||||
.dependsOn(SIGNER_OVERRIDE, CUSTOM_SIGNER)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("encryption-service")
|
||||
.displayName("Encryption Service")
|
||||
|
@ -201,13 +222,25 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
|||
}
|
||||
|
||||
private void initializeSignerOverride(final ProcessContext context, final ClientConfiguration config) {
|
||||
String signer = context.getProperty(SIGNER_OVERRIDE).getValue();
|
||||
final String signer = context.getProperty(SIGNER_OVERRIDE).getValue();
|
||||
final AwsSignerType signerType = AwsSignerType.forValue(signer);
|
||||
|
||||
if (signer != null && !signer.equals(SIGNER_OVERRIDE.getDefaultValue())) {
|
||||
if (signerType == CUSTOM_SIGNER) {
|
||||
final String signerClassName = context.getProperty(S3_CUSTOM_SIGNER_CLASS_NAME).evaluateAttributeExpressions().getValue();
|
||||
|
||||
config.setSignerOverride(AwsCustomSignerUtil.registerCustomSigner(signerClassName));
|
||||
} else if (signerType != DEFAULT_SIGNER) {
|
||||
config.setSignerOverride(signer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCustomSignerConfigured(final ProcessContext context) {
|
||||
final String signer = context.getProperty(SIGNER_OVERRIDE).getValue();
|
||||
final AwsSignerType signerType = AwsSignerType.forValue(signer);
|
||||
return signerType == CUSTOM_SIGNER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create client using AWSCredentials
|
||||
*
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.signer;
|
||||
|
||||
import com.amazonaws.auth.Signer;
|
||||
import com.amazonaws.auth.SignerFactory;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
public final class AwsCustomSignerUtil {
|
||||
|
||||
private AwsCustomSignerUtil() {
|
||||
// util class' constructor
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String registerCustomSigner(final String className) {
|
||||
final Class<? extends Signer> signerClass;
|
||||
|
||||
try {
|
||||
final Class<?> clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
|
||||
|
||||
if (Signer.class.isAssignableFrom(clazz)) {
|
||||
signerClass = (Class<? extends Signer>) clazz;
|
||||
} else {
|
||||
throw new ProcessException(String.format("Cannot create signer from class %s because it does not implement %s", className, Signer.class.getName()));
|
||||
}
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new ProcessException("Signer class not found: " + className);
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException("Error while creating signer from class: " + className);
|
||||
}
|
||||
|
||||
String signerName = signerClass.getName();
|
||||
|
||||
SignerFactory.registerSigner(signerName, signerClass);
|
||||
|
||||
return signerName;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.signer;
|
||||
|
||||
import com.amazonaws.auth.SignerFactory;
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public enum AwsSignerType implements DescribedValue {
|
||||
|
||||
// AWS_***_SIGNERs must follow the names in com.amazonaws.auth.SignerFactory and com.amazonaws.services.s3.AmazonS3Client
|
||||
DEFAULT_SIGNER("Default Signature", "Default Signature"),
|
||||
AWS_V4_SIGNER(SignerFactory.VERSION_FOUR_SIGNER, "Signature Version 4"),
|
||||
AWS_S3_V4_SIGNER("AWSS3V4SignerType", "Signature Version 4"), // AmazonS3Client.S3_V4_SIGNER
|
||||
AWS_S3_V2_SIGNER("S3SignerType", "Signature Version 2"), // AmazonS3Client.S3_SIGNER
|
||||
CUSTOM_SIGNER("CustomSignerType", "Custom Signature");
|
||||
|
||||
private static final Map<String, AwsSignerType> LOOKUP_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (AwsSignerType signerType : AwsSignerType.values()) {
|
||||
LOOKUP_MAP.put(signerType.getValue(), signerType);
|
||||
}
|
||||
}
|
||||
|
||||
private final String value;
|
||||
private final String displayName;
|
||||
private final String description;
|
||||
|
||||
AwsSignerType(String value, String displayName) {
|
||||
this(value, displayName, null);
|
||||
}
|
||||
|
||||
AwsSignerType(String value, String displayName, String description) {
|
||||
this.value = value;
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDisplayName() {
|
||||
return displayName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public static AwsSignerType forValue(final String value) {
|
||||
return LOOKUP_MAP.get(value);
|
||||
}
|
||||
}
|
|
@ -19,12 +19,13 @@ package org.apache.nifi.processors.aws.credentials.provider.factory.strategies;
|
|||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
|
||||
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
|
||||
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialsStrategy;
|
||||
import org.apache.nifi.processors.aws.signer.AwsCustomSignerUtil;
|
||||
import org.apache.nifi.processors.aws.signer.AwsSignerType;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||
import software.amazon.awssdk.http.apache.ApacheHttpClient;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
|
@ -44,9 +45,13 @@ import static org.apache.nifi.processors.aws.credentials.provider.factory.Creden
|
|||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_HOST;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_PORT;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_ENDPOINT;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_REGION;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_SIGNER_OVERRIDE;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.MAX_SESSION_TIME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_REGION;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.CUSTOM_SIGNER;
|
||||
import static org.apache.nifi.processors.aws.signer.AwsSignerType.DEFAULT_SIGNER;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -95,50 +100,30 @@ public class AssumeRoleCredentialsStrategy extends AbstractCredentialsStrategy {
|
|||
@Override
|
||||
public Collection<ValidationResult> validate(final ValidationContext validationContext,
|
||||
final CredentialsStrategy primaryStrategy) {
|
||||
final Collection<ValidationResult> validationFailureResults = new ArrayList<>();
|
||||
|
||||
final boolean assumeRoleArnIsSet = validationContext.getProperty(ASSUME_ROLE_ARN).isSet();
|
||||
final boolean assumeRoleNameIsSet = validationContext.getProperty(ASSUME_ROLE_NAME).isSet();
|
||||
final Integer maxSessionTime = validationContext.getProperty(MAX_SESSION_TIME).asInteger();
|
||||
final boolean assumeRoleExternalIdIsSet = validationContext.getProperty(ASSUME_ROLE_EXTERNAL_ID).isSet();
|
||||
final boolean assumeRoleProxyHostIsSet = validationContext.getProperty(ASSUME_ROLE_PROXY_HOST).isSet();
|
||||
final boolean assumeRoleProxyPortIsSet = validationContext.getProperty(ASSUME_ROLE_PROXY_PORT).isSet();
|
||||
final boolean assumeRoleSTSEndpointIsSet = validationContext.getProperty(ASSUME_ROLE_STS_ENDPOINT).isSet();
|
||||
|
||||
final Collection<ValidationResult> validationFailureResults = new ArrayList<ValidationResult>();
|
||||
if (assumeRoleArnIsSet) {
|
||||
final Integer maxSessionTime = validationContext.getProperty(MAX_SESSION_TIME).asInteger();
|
||||
|
||||
// Both role and arn name are req if present
|
||||
if (assumeRoleArnIsSet ^ assumeRoleNameIsSet ) {
|
||||
validationFailureResults.add(new ValidationResult.Builder().input("Assume Role Arn and Name")
|
||||
.valid(false).explanation("Assume role requires both arn and name to be set").build());
|
||||
}
|
||||
// Session time only b/w 900 to 3600 sec (see com.amazonaws.services.securitytoken.model.AssumeRoleRequest#withDurationSeconds)
|
||||
if (maxSessionTime < 900 || maxSessionTime > 3600) {
|
||||
validationFailureResults.add(new ValidationResult.Builder().valid(false).input(maxSessionTime + "")
|
||||
.explanation(MAX_SESSION_TIME.getDisplayName() +
|
||||
" must be between 900 and 3600 seconds").build());
|
||||
}
|
||||
|
||||
// Session time only b/w 900 to 3600 sec (see sts session class)
|
||||
if ( maxSessionTime < 900 || maxSessionTime > 3600 )
|
||||
validationFailureResults.add(new ValidationResult.Builder().valid(false).input(maxSessionTime + "")
|
||||
.explanation(MAX_SESSION_TIME.getDisplayName() +
|
||||
" must be between 900 and 3600 seconds").build());
|
||||
final boolean assumeRoleProxyHostIsSet = validationContext.getProperty(ASSUME_ROLE_PROXY_HOST).isSet();
|
||||
final boolean assumeRoleProxyPortIsSet = validationContext.getProperty(ASSUME_ROLE_PROXY_PORT).isSet();
|
||||
|
||||
// External ID should only be provided with viable Assume Role ARN and Name
|
||||
if (assumeRoleExternalIdIsSet && (!assumeRoleArnIsSet || !assumeRoleNameIsSet)) {
|
||||
validationFailureResults.add(new ValidationResult.Builder().input("Assume Role External ID")
|
||||
.valid(false)
|
||||
.explanation("Assume role requires both arn and name to be set with External ID")
|
||||
.build());
|
||||
}
|
||||
|
||||
// STS Endpoint should only be provided with viable Assume Role ARN and Name
|
||||
if (assumeRoleSTSEndpointIsSet && (!assumeRoleArnIsSet || !assumeRoleNameIsSet)) {
|
||||
validationFailureResults.add(new ValidationResult.Builder().input("Assume Role STS Endpoint")
|
||||
.valid(false)
|
||||
.explanation("Assume role requires both arn and name to be set with STS Endpoint")
|
||||
.build());
|
||||
}
|
||||
|
||||
// Both proxy host and proxy port are required if present
|
||||
if (assumeRoleProxyHostIsSet ^ assumeRoleProxyPortIsSet){
|
||||
validationFailureResults.add(new ValidationResult.Builder().input("Assume Role Proxy Host and Port")
|
||||
.valid(false)
|
||||
.explanation("Assume role with proxy requires both host and port for the proxy to be set")
|
||||
.build());
|
||||
// Both proxy host and proxy port are required if present
|
||||
if (assumeRoleProxyHostIsSet ^ assumeRoleProxyPortIsSet) {
|
||||
validationFailureResults.add(new ValidationResult.Builder().input("Assume Role Proxy Host and Port")
|
||||
.valid(false)
|
||||
.explanation("Assume role with proxy requires both host and port for the proxy to be set")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
return validationFailureResults;
|
||||
|
@ -158,7 +143,9 @@ public class AssumeRoleCredentialsStrategy extends AbstractCredentialsStrategy {
|
|||
rawMaxSessionTime = rawMaxSessionTime == null ? MAX_SESSION_TIME.getDefaultValue() : rawMaxSessionTime;
|
||||
final Integer maxSessionTime = Integer.parseInt(rawMaxSessionTime.trim());
|
||||
final String assumeRoleExternalId = properties.get(ASSUME_ROLE_EXTERNAL_ID);
|
||||
final String assumeRoleSTSRegion = properties.get(ASSUME_ROLE_STS_REGION);
|
||||
final String assumeRoleSTSEndpoint = properties.get(ASSUME_ROLE_STS_ENDPOINT);
|
||||
final String assumeRoleSTSSigner = properties.get(ASSUME_ROLE_STS_SIGNER_OVERRIDE);
|
||||
STSAssumeRoleSessionCredentialsProvider.Builder builder;
|
||||
ClientConfiguration config = new ClientConfiguration();
|
||||
|
||||
|
@ -170,10 +157,24 @@ public class AssumeRoleCredentialsStrategy extends AbstractCredentialsStrategy {
|
|||
config.withProxyPort(assumeRoleProxyPort);
|
||||
}
|
||||
|
||||
AWSSecurityTokenService securityTokenService = new AWSSecurityTokenServiceClient(primaryCredentialsProvider, config);
|
||||
if (assumeRoleSTSEndpoint != null && !assumeRoleSTSEndpoint.isEmpty()) {
|
||||
securityTokenService.setEndpoint(assumeRoleSTSEndpoint);
|
||||
final AwsSignerType assumeRoleSTSSignerType = AwsSignerType.forValue(assumeRoleSTSSigner);
|
||||
if (assumeRoleSTSSignerType == CUSTOM_SIGNER) {
|
||||
final String signerClassName = properties.get(ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME);
|
||||
|
||||
config.withSignerOverride(AwsCustomSignerUtil.registerCustomSigner(signerClassName));
|
||||
} else if (assumeRoleSTSSignerType != DEFAULT_SIGNER) {
|
||||
config.withSignerOverride(assumeRoleSTSSigner);
|
||||
}
|
||||
|
||||
AWSSecurityTokenServiceClient securityTokenService = new AWSSecurityTokenServiceClient(primaryCredentialsProvider, config);
|
||||
if (assumeRoleSTSEndpoint != null && !assumeRoleSTSEndpoint.isEmpty()) {
|
||||
if (assumeRoleSTSSignerType == CUSTOM_SIGNER) {
|
||||
securityTokenService.setEndpoint(assumeRoleSTSEndpoint, securityTokenService.getServiceName(), assumeRoleSTSRegion);
|
||||
} else {
|
||||
securityTokenService.setEndpoint(assumeRoleSTSEndpoint);
|
||||
}
|
||||
}
|
||||
|
||||
builder = new STSAssumeRoleSessionCredentialsProvider
|
||||
.Builder(assumeRoleArn, assumeRoleName)
|
||||
.withStsClient(securityTokenService)
|
||||
|
@ -203,7 +204,7 @@ public class AssumeRoleCredentialsStrategy extends AbstractCredentialsStrategy {
|
|||
final Integer maxSessionTime = Integer.parseInt(rawMaxSessionTime.trim());
|
||||
final String assumeRoleExternalId = properties.get(ASSUME_ROLE_EXTERNAL_ID);
|
||||
final String assumeRoleSTSEndpoint = properties.get(ASSUME_ROLE_STS_ENDPOINT);
|
||||
final String stsRegion = properties.get(ASSUME_ROLE_REGION);
|
||||
final String stsRegion = properties.get(ASSUME_ROLE_STS_REGION);
|
||||
|
||||
final StsAssumeRoleCredentialsProvider.Builder builder = StsAssumeRoleCredentialsProvider.builder();
|
||||
|
||||
|
|
|
@ -45,7 +45,10 @@ import static org.apache.nifi.processors.aws.credentials.provider.factory.Creden
|
|||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_HOST;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_PORT;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_ENDPOINT;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_SIGNER_OVERRIDE;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.CREDENTIALS_FILE;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_CUSTOM_SIGNER_MODULE_LOCATION;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.PROFILE_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.SECRET_KEY;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.USE_ANONYMOUS_CREDENTIALS;
|
||||
|
@ -74,7 +77,7 @@ public class AWSCredentialsProviderControllerService extends AbstractControllerS
|
|||
public static final PropertyDescriptor ASSUME_ROLE_ARN = CredentialPropertyDescriptors.ASSUME_ROLE_ARN;
|
||||
public static final PropertyDescriptor ASSUME_ROLE_NAME = CredentialPropertyDescriptors.ASSUME_ROLE_NAME;
|
||||
public static final PropertyDescriptor MAX_SESSION_TIME = CredentialPropertyDescriptors.MAX_SESSION_TIME;
|
||||
public static final PropertyDescriptor ASSUME_ROLE_REGION = CredentialPropertyDescriptors.ASSUME_ROLE_REGION;
|
||||
public static final PropertyDescriptor ASSUME_ROLE_STS_REGION = CredentialPropertyDescriptors.ASSUME_ROLE_STS_REGION;
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
|
@ -92,8 +95,11 @@ public class AWSCredentialsProviderControllerService extends AbstractControllerS
|
|||
props.add(ASSUME_ROLE_EXTERNAL_ID);
|
||||
props.add(ASSUME_ROLE_PROXY_HOST);
|
||||
props.add(ASSUME_ROLE_PROXY_PORT);
|
||||
props.add(ASSUME_ROLE_STS_REGION);
|
||||
props.add(ASSUME_ROLE_STS_ENDPOINT);
|
||||
props.add(ASSUME_ROLE_REGION);
|
||||
props.add(ASSUME_ROLE_STS_SIGNER_OVERRIDE);
|
||||
props.add(ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME);
|
||||
props.add(ASSUME_ROLE_STS_CUSTOM_SIGNER_MODULE_LOCATION);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
|
|
|
@ -64,10 +64,32 @@ public class DeleteS3Object extends AbstractS3Processor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
|
||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
||||
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
|
||||
KEY,
|
||||
BUCKET,
|
||||
ACCESS_KEY,
|
||||
SECRET_KEY,
|
||||
CREDENTIALS_FILE,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
REGION,
|
||||
TIMEOUT,
|
||||
VERSION_ID,
|
||||
FULL_CONTROL_USER_LIST,
|
||||
READ_USER_LIST,
|
||||
WRITE_USER_LIST,
|
||||
READ_ACL_LIST,
|
||||
WRITE_ACL_LIST,
|
||||
OWNER,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE,
|
||||
SIGNER_OVERRIDE,
|
||||
S3_CUSTOM_SIGNER_CLASS_NAME,
|
||||
S3_CUSTOM_SIGNER_MODULE_LOCATION,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
PROXY_HOST,
|
||||
PROXY_HOST_PORT,
|
||||
PROXY_USERNAME,
|
||||
PROXY_PASSWORD));
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
|
|
@ -127,10 +127,30 @@ public class FetchS3Object extends AbstractS3Processor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
|
||||
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
|
||||
PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS, RANGE_START, RANGE_LENGTH));
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
|
||||
BUCKET,
|
||||
KEY,
|
||||
REGION,
|
||||
ACCESS_KEY,
|
||||
SECRET_KEY,
|
||||
CREDENTIALS_FILE,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
TIMEOUT,
|
||||
VERSION_ID,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE,
|
||||
SIGNER_OVERRIDE,
|
||||
S3_CUSTOM_SIGNER_CLASS_NAME,
|
||||
S3_CUSTOM_SIGNER_MODULE_LOCATION,
|
||||
ENCRYPTION_SERVICE,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
PROXY_HOST,
|
||||
PROXY_HOST_PORT,
|
||||
PROXY_USERNAME,
|
||||
PROXY_PASSWORD,
|
||||
REQUESTER_PAYS,
|
||||
RANGE_START,
|
||||
RANGE_LENGTH));
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
|
|
@ -284,36 +284,38 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
|
|||
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
|
||||
LISTING_STRATEGY,
|
||||
TRACKING_STATE_CACHE,
|
||||
INITIAL_LISTING_TARGET,
|
||||
TRACKING_TIME_WINDOW,
|
||||
BUCKET,
|
||||
REGION,
|
||||
ACCESS_KEY,
|
||||
SECRET_KEY,
|
||||
RECORD_WRITER,
|
||||
MIN_AGE,
|
||||
MAX_AGE,
|
||||
BATCH_SIZE,
|
||||
WRITE_OBJECT_TAGS,
|
||||
WRITE_USER_METADATA,
|
||||
CREDENTIALS_FILE,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
TIMEOUT,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE,
|
||||
SIGNER_OVERRIDE,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
PROXY_HOST,
|
||||
PROXY_HOST_PORT,
|
||||
PROXY_USERNAME,
|
||||
PROXY_PASSWORD,
|
||||
DELIMITER,
|
||||
PREFIX,
|
||||
USE_VERSIONS,
|
||||
LIST_TYPE,
|
||||
REQUESTER_PAYS));
|
||||
LISTING_STRATEGY,
|
||||
TRACKING_STATE_CACHE,
|
||||
INITIAL_LISTING_TARGET,
|
||||
TRACKING_TIME_WINDOW,
|
||||
BUCKET,
|
||||
REGION,
|
||||
ACCESS_KEY,
|
||||
SECRET_KEY,
|
||||
RECORD_WRITER,
|
||||
MIN_AGE,
|
||||
MAX_AGE,
|
||||
BATCH_SIZE,
|
||||
WRITE_OBJECT_TAGS,
|
||||
WRITE_USER_METADATA,
|
||||
CREDENTIALS_FILE,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
TIMEOUT,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE,
|
||||
SIGNER_OVERRIDE,
|
||||
S3_CUSTOM_SIGNER_CLASS_NAME,
|
||||
S3_CUSTOM_SIGNER_MODULE_LOCATION,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
PROXY_HOST,
|
||||
PROXY_HOST_PORT,
|
||||
PROXY_USERNAME,
|
||||
PROXY_PASSWORD,
|
||||
DELIMITER,
|
||||
PREFIX,
|
||||
USE_VERSIONS,
|
||||
LIST_TYPE,
|
||||
REQUESTER_PAYS));
|
||||
|
||||
public static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
|
||||
|
||||
|
|
|
@ -276,12 +276,48 @@ public class PutS3Object extends AbstractS3Processor {
|
|||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST,
|
||||
READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE,
|
||||
MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, MULTIPART_TEMP_DIR, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING,
|
||||
USE_PATH_STYLE_ACCESS, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
|
||||
KEY,
|
||||
BUCKET,
|
||||
CONTENT_TYPE,
|
||||
CONTENT_DISPOSITION,
|
||||
CACHE_CONTROL,
|
||||
ACCESS_KEY,
|
||||
SECRET_KEY,
|
||||
CREDENTIALS_FILE,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
OBJECT_TAGS_PREFIX,
|
||||
REMOVE_TAG_PREFIX,
|
||||
STORAGE_CLASS,
|
||||
REGION,
|
||||
TIMEOUT,
|
||||
EXPIRATION_RULE_ID,
|
||||
FULL_CONTROL_USER_LIST,
|
||||
READ_USER_LIST,
|
||||
WRITE_USER_LIST,
|
||||
READ_ACL_LIST,
|
||||
WRITE_ACL_LIST,
|
||||
OWNER,
|
||||
CANNED_ACL,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE,
|
||||
SIGNER_OVERRIDE,
|
||||
S3_CUSTOM_SIGNER_CLASS_NAME,
|
||||
S3_CUSTOM_SIGNER_MODULE_LOCATION,
|
||||
MULTIPART_THRESHOLD,
|
||||
MULTIPART_PART_SIZE,
|
||||
MULTIPART_S3_AGEOFF_INTERVAL,
|
||||
MULTIPART_S3_MAX_AGE,
|
||||
MULTIPART_TEMP_DIR,
|
||||
SERVER_SIDE_ENCRYPTION,
|
||||
ENCRYPTION_SERVICE,
|
||||
USE_CHUNKED_ENCODING,
|
||||
USE_PATH_STYLE_ACCESS,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
PROXY_HOST,
|
||||
PROXY_HOST_PORT,
|
||||
PROXY_USERNAME,
|
||||
PROXY_PASSWORD));
|
||||
|
||||
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||
final static String S3_OBJECT_KEY = "s3.key";
|
||||
|
|
|
@ -106,11 +106,29 @@ public class TagS3Object extends AbstractS3Processor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(KEY, BUCKET, VERSION_ID, TAG_KEY, TAG_VALUE, APPEND_TAG, ACCESS_KEY, SECRET_KEY,
|
||||
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT,
|
||||
PROXY_USERNAME, PROXY_PASSWORD));
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
|
||||
KEY,
|
||||
BUCKET,
|
||||
VERSION_ID,
|
||||
TAG_KEY,
|
||||
TAG_VALUE,
|
||||
APPEND_TAG,
|
||||
ACCESS_KEY,
|
||||
SECRET_KEY,
|
||||
CREDENTIALS_FILE,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
REGION,
|
||||
TIMEOUT,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
ENDPOINT_OVERRIDE,
|
||||
SIGNER_OVERRIDE,
|
||||
S3_CUSTOM_SIGNER_CLASS_NAME,
|
||||
S3_CUSTOM_SIGNER_MODULE_LOCATION,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
PROXY_HOST,
|
||||
PROXY_HOST_PORT,
|
||||
PROXY_USERNAME,
|
||||
PROXY_PASSWORD));
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
|
|
@ -36,8 +36,11 @@ import static org.apache.nifi.processors.aws.credentials.provider.factory.Creden
|
|||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_HOST;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_PORT;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_REGION;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_CUSTOM_SIGNER_MODULE_LOCATION;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_REGION;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_ENDPOINT;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.ASSUME_ROLE_STS_SIGNER_OVERRIDE;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.MAX_SESSION_TIME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.PROFILE_NAME;
|
||||
import static org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors.USE_ANONYMOUS_CREDENTIALS;
|
||||
|
@ -62,8 +65,11 @@ public class MockAWSProcessor extends AbstractAWSCredentialsProviderProcessor<Am
|
|||
ASSUME_ROLE_EXTERNAL_ID,
|
||||
ASSUME_ROLE_PROXY_HOST,
|
||||
ASSUME_ROLE_PROXY_PORT,
|
||||
ASSUME_ROLE_STS_REGION,
|
||||
ASSUME_ROLE_STS_ENDPOINT,
|
||||
ASSUME_ROLE_REGION
|
||||
ASSUME_ROLE_STS_SIGNER_OVERRIDE,
|
||||
ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME,
|
||||
ASSUME_ROLE_STS_CUSTOM_SIGNER_MODULE_LOCATION
|
||||
);
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,17 +16,21 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.aws.credentials.provider.factory;
|
||||
|
||||
import com.amazonaws.SignableRequest;
|
||||
import com.amazonaws.auth.AWS4Signer;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AnonymousAWSCredentials;
|
||||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
|
||||
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
|
||||
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
|
||||
import com.amazonaws.auth.Signer;
|
||||
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
|
||||
import com.amazonaws.internal.StaticCredentialsProvider;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider;
|
||||
import org.apache.nifi.processors.aws.s3.FetchS3Object;
|
||||
import org.apache.nifi.processors.aws.signer.AwsSignerType;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -41,6 +45,9 @@ import java.util.Map;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* Tests of the validation and credentials provider capabilities of CredentialsProviderFactory.
|
||||
|
@ -52,7 +59,7 @@ public class TestCredentialsProviderFactory {
|
|||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -71,7 +78,7 @@ public class TestCredentialsProviderFactory {
|
|||
runner.setProperty(CredentialPropertyDescriptors.USE_DEFAULT_CREDENTIALS, "true");
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -100,7 +107,7 @@ public class TestCredentialsProviderFactory {
|
|||
runner.setProperty(CredentialPropertyDescriptors.SECRET_KEY, "BogusSecretKey");
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -133,7 +140,7 @@ public class TestCredentialsProviderFactory {
|
|||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -163,7 +170,7 @@ public class TestCredentialsProviderFactory {
|
|||
|
||||
assertThrows(IllegalStateException.class, () -> factory.getAwsCredentialsProvider(properties));
|
||||
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_STS_REGION, Region.US_WEST_1.id());
|
||||
final Map<PropertyDescriptor, String> properties2 = runner.getProcessContext().getProperties();
|
||||
final AwsCredentialsProvider credentialsProviderV2 = factory.getAwsCredentialsProvider(properties2);
|
||||
assertNotNull(credentialsProviderV2);
|
||||
|
@ -171,14 +178,6 @@ public class TestCredentialsProviderFactory {
|
|||
credentialsProviderV2.getClass(), "credentials provider should be equal");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssumeRoleCredentialsMissingARN() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_NAME, "BogusSession");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssumeRoleCredentialsInvalidSessionTime() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
|
@ -189,29 +188,13 @@ public class TestCredentialsProviderFactory {
|
|||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssumeRoleExternalIdMissingArnAndName() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_EXTERNAL_ID, "BogusExternalId");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssumeRoleSTSEndpointMissingArnAndName() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_STS_ENDPOINT, "BogusSTSEndpoint");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAnonymousCredentials() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.USE_ANONYMOUS_CREDENTIALS, "true");
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -239,7 +222,7 @@ public class TestCredentialsProviderFactory {
|
|||
runner.setProperty(CredentialPropertyDescriptors.PROFILE_NAME, "BogusProfile");
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -258,12 +241,12 @@ public class TestCredentialsProviderFactory {
|
|||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_ARN, "BogusArn");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_NAME, "BogusSession");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_REGION, Region.US_WEST_2.id());
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_STS_REGION, Region.US_WEST_2.id());
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_HOST, "proxy.company.com");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_PORT, "8080");
|
||||
runner.assertValid();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
assertNotNull(credentialsProvider);
|
||||
|
@ -280,6 +263,8 @@ public class TestCredentialsProviderFactory {
|
|||
public void testAssumeRoleMissingProxyHost() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_ARN, "BogusArn");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_NAME, "BogusSession");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_PORT, "8080");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
@ -288,6 +273,8 @@ public class TestCredentialsProviderFactory {
|
|||
public void testAssumeRoleMissingProxyPort() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_ARN, "BogusArn");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_NAME, "BogusSession");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_HOST, "proxy.company.com");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
@ -296,8 +283,51 @@ public class TestCredentialsProviderFactory {
|
|||
public void testAssumeRoleInvalidProxyPort() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_ARN, "BogusArn");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_NAME, "BogusSession");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_HOST, "proxy.company.com");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_PROXY_PORT, "notIntPort");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssumeRoleCredentialsWithCustomSigner() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(MockAWSProcessor.class);
|
||||
runner.setProperty(CredentialPropertyDescriptors.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_ARN, "BogusArn");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_NAME, "BogusSession");
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_STS_SIGNER_OVERRIDE, AwsSignerType.CUSTOM_SIGNER.getValue());
|
||||
runner.setProperty(CredentialPropertyDescriptors.ASSUME_ROLE_STS_CUSTOM_SIGNER_CLASS_NAME, CustomSTSSigner.class.getName());
|
||||
runner.assertValid();
|
||||
|
||||
final Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||
final CredentialsProviderFactory factory = new CredentialsProviderFactory();
|
||||
|
||||
final Signer signerChecker = mock(Signer.class);
|
||||
CustomSTSSigner.setSignerChecker(signerChecker);
|
||||
|
||||
final AWSCredentialsProvider credentialsProvider = factory.getCredentialsProvider(properties);
|
||||
|
||||
try {
|
||||
credentialsProvider.getCredentials();
|
||||
} catch (Exception e) {
|
||||
// Expected to fail, we are only interested in the Signer
|
||||
}
|
||||
|
||||
verify(signerChecker).sign(any(), any());
|
||||
}
|
||||
|
||||
public static class CustomSTSSigner extends AWS4Signer {
|
||||
|
||||
private static final ThreadLocal<Signer> SIGNER_CHECKER = new ThreadLocal<>();
|
||||
|
||||
public static void setSignerChecker(Signer signerChecker) {
|
||||
SIGNER_CHECKER.set(signerChecker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
|
||||
SIGNER_CHECKER.get().sign(request, credentials);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public class AWSCredentialsProviderControllerServiceTest {
|
|||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
|
||||
runner.enableControllerService(serviceImpl);
|
||||
|
@ -102,7 +102,7 @@ public class AWSCredentialsProviderControllerServiceTest {
|
|||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "1000");
|
||||
|
@ -125,7 +125,7 @@ public class AWSCredentialsProviderControllerServiceTest {
|
|||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "900");
|
||||
|
@ -141,7 +141,7 @@ public class AWSCredentialsProviderControllerServiceTest {
|
|||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "900");
|
||||
|
@ -188,18 +188,6 @@ public class AWSCredentialsProviderControllerServiceTest {
|
|||
runner.assertNotValid(serviceImpl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKeysCredentialsProviderWithRoleNameOnlyInvalid() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class);
|
||||
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
|
||||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
|
||||
|
||||
runner.assertNotValid(serviceImpl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileCredentialsProviderWithRole() throws Throwable {
|
||||
final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class);
|
||||
|
@ -207,7 +195,7 @@ public class AWSCredentialsProviderControllerServiceTest {
|
|||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
|
||||
"src/test/resources/mock-aws-credentials.properties");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_STS_REGION, Region.US_WEST_1.id());
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
|
||||
runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
|
||||
runner.enableControllerService(serviceImpl);
|
||||
|
|
|
@ -41,6 +41,7 @@ public class ITListS3 extends AbstractS3IT {
|
|||
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
|
@ -62,6 +63,7 @@ public class ITListS3 extends AbstractS3IT {
|
|||
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
|
@ -91,6 +93,7 @@ public class ITListS3 extends AbstractS3IT {
|
|||
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
|
@ -111,6 +114,7 @@ public class ITListS3 extends AbstractS3IT {
|
|||
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
|
@ -131,6 +135,7 @@ public class ITListS3 extends AbstractS3IT {
|
|||
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
|
@ -153,12 +158,13 @@ public class ITListS3 extends AbstractS3IT {
|
|||
objectTags.add(new Tag("dummytag1", "dummyvalue1"));
|
||||
objectTags.add(new Tag("dummytag2", "dummyvalue2"));
|
||||
|
||||
putFileWithObjectTag("b/fileWithTag", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), objectTags);
|
||||
putFileWithObjectTag("t/fileWithTag", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), objectTags);
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(ListS3.PREFIX, "b/");
|
||||
runner.setProperty(ListS3.PREFIX, "t/");
|
||||
runner.setProperty(ListS3.REGION, REGION);
|
||||
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||
runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
|
||||
|
@ -169,7 +175,7 @@ public class ITListS3 extends AbstractS3IT {
|
|||
|
||||
MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
|
||||
|
||||
flowFiles.assertAttributeEquals("filename", "b/fileWithTag");
|
||||
flowFiles.assertAttributeEquals("filename", "t/fileWithTag");
|
||||
flowFiles.assertAttributeExists("s3.tag.dummytag1");
|
||||
flowFiles.assertAttributeExists("s3.tag.dummytag2");
|
||||
flowFiles.assertAttributeEquals("s3.tag.dummytag1", "dummyvalue1");
|
||||
|
@ -182,12 +188,13 @@ public class ITListS3 extends AbstractS3IT {
|
|||
userMetadata.put("dummy.metadata.1", "dummyvalue1");
|
||||
userMetadata.put("dummy.metadata.2", "dummyvalue2");
|
||||
|
||||
putFileWithUserMetadata("b/fileWithUserMetadata", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), userMetadata);
|
||||
putFileWithUserMetadata("m/fileWithUserMetadata", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), userMetadata);
|
||||
waitForFilesAvailable();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||
|
||||
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(ListS3.PREFIX, "b/");
|
||||
runner.setProperty(ListS3.PREFIX, "m/");
|
||||
runner.setProperty(ListS3.REGION, REGION);
|
||||
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||
runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
|
||||
|
@ -198,11 +205,19 @@ public class ITListS3 extends AbstractS3IT {
|
|||
|
||||
MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
|
||||
|
||||
flowFiles.assertAttributeEquals("filename", "b/fileWithUserMetadata");
|
||||
flowFiles.assertAttributeEquals("filename", "m/fileWithUserMetadata");
|
||||
flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.1");
|
||||
flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.2");
|
||||
flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.1", "dummyvalue1");
|
||||
flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.2", "dummyvalue2");
|
||||
}
|
||||
|
||||
private void waitForFilesAvailable() {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -577,7 +578,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
|||
runner.run();
|
||||
|
||||
assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString());
|
||||
assertEquals(TESTKEY, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString());
|
||||
assertEquals(TESTKEY, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions(Collections.emptyMap()).toString());
|
||||
assertEquals(TEST_ENDPOINT, context.getProperty(PutS3Object.ENDPOINT_OVERRIDE).toString());
|
||||
|
||||
String s3url = ((TestablePutS3Object)processor).testable_getClient().getResourceUrl(BUCKET_NAME, TESTKEY);
|
||||
|
@ -598,7 +599,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
|||
runner.setProperty(PutS3Object.KEY, AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME);
|
||||
|
||||
assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString());
|
||||
assertEquals(SAMPLE_FILE_RESOURCE_NAME, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString());
|
||||
assertEquals(SAMPLE_FILE_RESOURCE_NAME, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions(Collections.emptyMap()).toString());
|
||||
assertEquals(TEST_PARTSIZE_LONG.longValue(),
|
||||
context.getProperty(PutS3Object.MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue());
|
||||
}
|
||||
|
@ -609,7 +610,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
|||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
|
||||
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
|
||||
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
|
||||
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).evaluateAttributeExpressions(Collections.emptyMap()).getValue();
|
||||
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
|
||||
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
|
||||
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
|
||||
|
@ -682,7 +683,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
|||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
|
||||
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
|
||||
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
|
||||
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).evaluateAttributeExpressions(Collections.emptyMap()).getValue();
|
||||
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
|
||||
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
|
||||
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
|
||||
|
@ -758,7 +759,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
|||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
|
||||
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
|
||||
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
|
||||
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).evaluateAttributeExpressions(Collections.emptyMap()).getValue();
|
||||
final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
|
||||
|
||||
final List<MultipartUpload> uploadList = new ArrayList<>();
|
||||
|
|
|
@ -137,7 +137,7 @@ public class TestDeleteS3Object {
|
|||
public void testGetPropertyDescriptors() {
|
||||
DeleteS3Object processor = new DeleteS3Object();
|
||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||
assertEquals(23, pd.size(), "size should be eq");
|
||||
assertEquals(25, pd.size(), "size should be eq");
|
||||
assertTrue(pd.contains(processor.ACCESS_KEY));
|
||||
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||
assertTrue(pd.contains(processor.BUCKET));
|
||||
|
@ -151,6 +151,8 @@ public class TestDeleteS3Object {
|
|||
assertTrue(pd.contains(processor.REGION));
|
||||
assertTrue(pd.contains(processor.SECRET_KEY));
|
||||
assertTrue(pd.contains(processor.SIGNER_OVERRIDE));
|
||||
assertTrue(pd.contains(processor.S3_CUSTOM_SIGNER_CLASS_NAME));
|
||||
assertTrue(pd.contains(processor.S3_CUSTOM_SIGNER_MODULE_LOCATION));
|
||||
assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE));
|
||||
assertTrue(pd.contains(processor.TIMEOUT));
|
||||
assertTrue(pd.contains(processor.VERSION_ID));
|
||||
|
|
|
@ -362,7 +362,7 @@ public class TestFetchS3Object {
|
|||
public void testGetPropertyDescriptors() {
|
||||
FetchS3Object processor = new FetchS3Object();
|
||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||
assertEquals("size should be eq", 21, pd.size());
|
||||
assertEquals("size should be eq", 23, pd.size());
|
||||
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
|
||||
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||
assertTrue(pd.contains(FetchS3Object.BUCKET));
|
||||
|
@ -372,6 +372,8 @@ public class TestFetchS3Object {
|
|||
assertTrue(pd.contains(FetchS3Object.REGION));
|
||||
assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
|
||||
assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE));
|
||||
assertTrue(pd.contains(FetchS3Object.S3_CUSTOM_SIGNER_CLASS_NAME));
|
||||
assertTrue(pd.contains(FetchS3Object.S3_CUSTOM_SIGNER_MODULE_LOCATION));
|
||||
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
|
||||
assertTrue(pd.contains(FetchS3Object.TIMEOUT));
|
||||
assertTrue(pd.contains(FetchS3Object.VERSION_ID));
|
||||
|
|
|
@ -19,7 +19,11 @@ package org.apache.nifi.processors.aws.s3;
|
|||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
|
||||
import com.amazonaws.auth.Signer;
|
||||
import com.amazonaws.auth.SignerFactory;
|
||||
import com.amazonaws.auth.SignerParams;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.internal.AWSS3V4Signer;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
||||
import com.amazonaws.services.s3.model.MultipartUploadListing;
|
||||
|
@ -33,6 +37,7 @@ import org.apache.nifi.components.AllowableValue;
|
|||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processors.aws.signer.AwsSignerType;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -51,7 +56,10 @@ import java.util.Map;
|
|||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
|
||||
public class TestPutS3Object {
|
||||
|
@ -62,7 +70,7 @@ public class TestPutS3Object {
|
|||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
mockS3Client = Mockito.mock(AmazonS3Client.class);
|
||||
mockS3Client = mock(AmazonS3Client.class);
|
||||
putS3Object = new PutS3Object() {
|
||||
@Override
|
||||
protected AmazonS3Client getClient() {
|
||||
|
@ -116,11 +124,11 @@ public class TestPutS3Object {
|
|||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
|
||||
final List<AllowableValue> allowableSignerValues = PutS3Object.SIGNER_OVERRIDE.getAllowableValues();
|
||||
final String defaultSignerValue = PutS3Object.SIGNER_OVERRIDE.getDefaultValue();
|
||||
final String customSignerValue = AwsSignerType.CUSTOM_SIGNER.getValue(); // Custom Signer is tested separately
|
||||
|
||||
for (AllowableValue allowableSignerValue : allowableSignerValues) {
|
||||
String signerType = allowableSignerValue.getValue();
|
||||
if (!signerType.equals(defaultSignerValue)) {
|
||||
if (!signerType.equals(customSignerValue)) {
|
||||
runner.setProperty(PutS3Object.SIGNER_OVERRIDE, signerType);
|
||||
ProcessContext context = runner.getProcessContext();
|
||||
assertDoesNotThrow(() -> processor.createClient(context, credentialsProvider, config));
|
||||
|
@ -241,7 +249,7 @@ public class TestPutS3Object {
|
|||
public void testGetPropertyDescriptors() {
|
||||
PutS3Object processor = new PutS3Object();
|
||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||
assertEquals(39, pd.size(), "size should be eq");
|
||||
assertEquals(41, pd.size(), "size should be eq");
|
||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||
|
@ -256,6 +264,8 @@ public class TestPutS3Object {
|
|||
assertTrue(pd.contains(PutS3Object.REGION));
|
||||
assertTrue(pd.contains(PutS3Object.SECRET_KEY));
|
||||
assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE));
|
||||
assertTrue(pd.contains(PutS3Object.S3_CUSTOM_SIGNER_CLASS_NAME));
|
||||
assertTrue(pd.contains(PutS3Object.S3_CUSTOM_SIGNER_MODULE_LOCATION));
|
||||
assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE));
|
||||
assertTrue(pd.contains(PutS3Object.TIMEOUT));
|
||||
assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID));
|
||||
|
@ -282,4 +292,28 @@ public class TestPutS3Object {
|
|||
assertTrue(pd.contains(PutS3Object.MULTIPART_S3_MAX_AGE));
|
||||
assertTrue(pd.contains(PutS3Object.MULTIPART_TEMP_DIR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomSigner() {
|
||||
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
|
||||
final ClientConfiguration config = new ClientConfiguration();
|
||||
final PutS3Object processor = new PutS3Object();
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
|
||||
runner.setProperty(PutS3Object.SIGNER_OVERRIDE, AwsSignerType.CUSTOM_SIGNER.getValue());
|
||||
runner.setProperty(PutS3Object.S3_CUSTOM_SIGNER_CLASS_NAME, CustomS3Signer.class.getName());
|
||||
|
||||
ProcessContext context = runner.getProcessContext();
|
||||
processor.createClient(context, credentialsProvider, config);
|
||||
|
||||
final String signerName = config.getSignerOverride();
|
||||
assertNotNull(signerName);
|
||||
final Signer signer = SignerFactory.createSigner(signerName, new SignerParams("s3", "us-west-2"));
|
||||
assertNotNull(signer);
|
||||
assertSame(CustomS3Signer.class, signer.getClass());
|
||||
}
|
||||
|
||||
public static class CustomS3Signer extends AWSS3V4Signer {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ public class TestTagS3Object {
|
|||
public void testGetPropertyDescriptors() throws Exception {
|
||||
TagS3Object processor = new TagS3Object();
|
||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||
assertEquals(20, pd.size(), "size should be eq");
|
||||
assertEquals(22, pd.size(), "size should be eq");
|
||||
assertTrue(pd.contains(TagS3Object.ACCESS_KEY));
|
||||
assertTrue(pd.contains(TagS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||
assertTrue(pd.contains(TagS3Object.BUCKET));
|
||||
|
@ -254,6 +254,8 @@ public class TestTagS3Object {
|
|||
assertTrue(pd.contains(TagS3Object.REGION));
|
||||
assertTrue(pd.contains(TagS3Object.SECRET_KEY));
|
||||
assertTrue(pd.contains(TagS3Object.SIGNER_OVERRIDE));
|
||||
assertTrue(pd.contains(TagS3Object.S3_CUSTOM_SIGNER_CLASS_NAME));
|
||||
assertTrue(pd.contains(TagS3Object.S3_CUSTOM_SIGNER_MODULE_LOCATION));
|
||||
assertTrue(pd.contains(TagS3Object.SSL_CONTEXT_SERVICE));
|
||||
assertTrue(pd.contains(TagS3Object.TIMEOUT));
|
||||
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
|
||||
|
|
Loading…
Reference in New Issue