mirror of https://github.com/apache/nifi.git
NIFI-7591: Allow PutS3Object to post to AWS Snowball
Added properties to enable/disable chunked encoding and path-style access for endpoints that do not support chunked encoding / only support path-style access. Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4386.
This commit is contained in:
parent
1fc25db47c
commit
ae877b9908
|
@ -136,6 +136,21 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
.required(false)
|
.required(false)
|
||||||
.identifiesControllerService(AmazonS3EncryptionService.class)
|
.identifiesControllerService(AmazonS3EncryptionService.class)
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor USE_CHUNKED_ENCODING = new PropertyDescriptor.Builder()
|
||||||
|
.name("use-chunked-encoding")
|
||||||
|
.displayName("Use Chunked Encoding")
|
||||||
|
.description("Enables / disables chunked encoding for upload requests. Set it to false only if your endpoint does not support chunked uploading.")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor USE_PATH_STYLE_ACCESS = new PropertyDescriptor.Builder()
|
||||||
|
.name("use-path-style-access")
|
||||||
|
.displayName("Use Path Style Access")
|
||||||
|
.description("Path-style access can be enforced by setting this property to true. Set it to true if your endpoint does not support " +
|
||||||
|
"virtual-hosted-style requests, only path-style requests.")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create client using credentials provider. This is the preferred way for creating clients
|
* Create client using credentials provider. This is the preferred way for creating clients
|
||||||
|
@ -155,17 +170,32 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||||
s3 = new AmazonS3Client(credentialsProvider, config);
|
s3 = new AmazonS3Client(credentialsProvider, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
initalizeEndpointOverride(context, s3);
|
configureClientOptions(context, s3);
|
||||||
|
|
||||||
return s3;
|
return s3;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initalizeEndpointOverride(final ProcessContext context, final AmazonS3Client s3) {
|
private void configureClientOptions(final ProcessContext context, final AmazonS3Client s3) {
|
||||||
// if ENDPOINT_OVERRIDE is set, use PathStyleAccess
|
S3ClientOptions.Builder builder = S3ClientOptions.builder();
|
||||||
if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()).isEmpty() == false){
|
|
||||||
final S3ClientOptions s3Options = new S3ClientOptions();
|
// disable chunked encoding if "Use Chunked Encoding" has been set to false, otherwise use the default (not disabled)
|
||||||
s3Options.setPathStyleAccess(true);
|
Boolean useChunkedEncoding = context.getProperty(USE_CHUNKED_ENCODING).asBoolean();
|
||||||
s3.setS3ClientOptions(s3Options);
|
if (useChunkedEncoding != null && !useChunkedEncoding) {
|
||||||
|
builder.disableChunkedEncoding();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// use PathStyleAccess if "Use Path Style Access" has been set to true, otherwise use the default (false)
|
||||||
|
Boolean usePathStyleAccess = context.getProperty(USE_PATH_STYLE_ACCESS).asBoolean();
|
||||||
|
if (usePathStyleAccess != null && usePathStyleAccess) {
|
||||||
|
builder.setPathStyleAccess(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if ENDPOINT_OVERRIDE is set, use PathStyleAccess
|
||||||
|
if (!StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()).isEmpty()){
|
||||||
|
builder.setPathStyleAccess(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
s3.setS3ClientOptions(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeSignerOverride(final ProcessContext context, final ClientConfiguration config) {
|
private void initializeSignerOverride(final ProcessContext context, final ClientConfiguration config) {
|
||||||
|
|
|
@ -235,8 +235,8 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
|
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
|
||||||
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
||||||
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
|
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
|
||||||
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
|
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
|
||||||
PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
||||||
|
|
||||||
final static String S3_BUCKET_KEY = "s3.bucket";
|
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||||
final static String S3_OBJECT_KEY = "s3.key";
|
final static String S3_OBJECT_KEY = "s3.key";
|
||||||
|
|
|
@ -113,11 +113,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimplePut() throws IOException {
|
public void testSimplePut() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
|
|
||||||
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
||||||
|
|
||||||
|
@ -133,11 +129,8 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimplePutEncrypted() throws IOException {
|
public void testSimplePutEncrypted() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||||
|
|
||||||
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
||||||
|
@ -158,11 +151,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimplePutFilenameWithNationalCharacters() throws IOException {
|
public void testSimplePutFilenameWithNationalCharacters() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
|
|
||||||
final Map<String, String> attrs = new HashMap<>();
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
attrs.put("filename", "Iñtërnâtiônàližætiøn.txt");
|
attrs.put("filename", "Iñtërnâtiônàližætiøn.txt");
|
||||||
|
@ -176,11 +165,8 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
private void testPutThenFetch(String sseAlgorithm) throws IOException {
|
private void testPutThenFetch(String sseAlgorithm) throws IOException {
|
||||||
|
|
||||||
// Put
|
// Put
|
||||||
TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
if(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION.equals(sseAlgorithm)){
|
if(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION.equals(sseAlgorithm)){
|
||||||
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, sseAlgorithm);
|
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, sseAlgorithm);
|
||||||
}
|
}
|
||||||
|
@ -292,12 +278,8 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContentType() throws IOException {
|
public void testContentType() throws IOException {
|
||||||
PutS3Object processor = new PutS3Object();
|
TestRunner runner = initTestRunner();
|
||||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
runner.setProperty(PutS3Object.CONTENT_TYPE, "text/plain");
|
runner.setProperty(PutS3Object.CONTENT_TYPE, "text/plain");
|
||||||
|
|
||||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
@ -312,10 +294,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutInFolder() throws IOException {
|
public void testPutInFolder() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
|
|
||||||
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
||||||
runner.assertValid();
|
runner.assertValid();
|
||||||
|
@ -331,11 +310,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStorageClasses() throws IOException {
|
public void testStorageClasses() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
|
|
||||||
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
||||||
|
|
||||||
|
@ -358,11 +333,8 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStorageClassesMultipart() throws IOException {
|
public void testStorageClassesMultipart() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
|
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
|
||||||
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
|
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
|
||||||
|
|
||||||
|
@ -387,12 +359,9 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPermissions() throws IOException {
|
public void testPermissions() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
|
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
|
|
||||||
final Map<String, String> attrs = new HashMap<>();
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
attrs.put("filename", "folder/4.txt");
|
attrs.put("filename", "folder/4.txt");
|
||||||
|
@ -918,10 +887,8 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testObjectTags() throws IOException, InterruptedException {
|
public void testObjectTags() throws IOException, InterruptedException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
TestRunner runner = initTestRunner();
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
|
||||||
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
|
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
|
||||||
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
|
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
|
||||||
|
|
||||||
|
@ -1194,4 +1161,41 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
return this.getClient();
|
return this.getClient();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChunkedEncodingDisabled() throws IOException {
|
||||||
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.USE_CHUNKED_ENCODING, "false");
|
||||||
|
|
||||||
|
executeSimplePutTest(runner);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPathStyleAccessEnabled() throws IOException {
|
||||||
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.USE_PATH_STYLE_ACCESS, "true");
|
||||||
|
|
||||||
|
executeSimplePutTest(runner);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestRunner initTestRunner() {
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(PutS3Object.class);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
|
||||||
|
return runner;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeSimplePutTest(TestRunner runner) throws IOException {
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,7 +209,7 @@ public class TestPutS3Object {
|
||||||
public void testGetPropertyDescriptors() {
|
public void testGetPropertyDescriptors() {
|
||||||
PutS3Object processor = new PutS3Object();
|
PutS3Object processor = new PutS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 34, pd.size());
|
assertEquals("size should be eq", 36, pd.size());
|
||||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||||
|
@ -232,6 +232,8 @@ public class TestPutS3Object {
|
||||||
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
|
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
|
||||||
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
|
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
|
||||||
assertTrue(pd.contains(PutS3Object.ENCRYPTION_SERVICE));
|
assertTrue(pd.contains(PutS3Object.ENCRYPTION_SERVICE));
|
||||||
|
assertTrue(pd.contains(PutS3Object.USE_CHUNKED_ENCODING));
|
||||||
|
assertTrue(pd.contains(PutS3Object.USE_PATH_STYLE_ACCESS));
|
||||||
assertTrue(pd.contains(PutS3Object.PROXY_CONFIGURATION_SERVICE));
|
assertTrue(pd.contains(PutS3Object.PROXY_CONFIGURATION_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.PROXY_HOST));
|
assertTrue(pd.contains(PutS3Object.PROXY_HOST));
|
||||||
assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
|
assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
|
||||||
|
|
Loading…
Reference in New Issue