diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/TestAWSCredentials.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/TestAWSCredentials.java new file mode 100644 index 0000000000..3a456e8786 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/TestAWSCredentials.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws; + +import java.util.Arrays; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * Unit tests for AWS Credential specification based on {@link AbstractAWSProcessor} and + * [@link AbstractAWSCredentialsProviderProcessor}, without interaction with S3. + */ +public class TestAWSCredentials { + + private TestRunner runner = null; + private AbstractAWSProcessor mockAwsProcessor = null; + private AWSCredentials awsCredentials = null; + private AWSCredentialsProvider awsCredentialsProvider = null; + private ClientConfiguration clientConfiguration = null; + + @Before + public void setUp() { + mockAwsProcessor = new AbstractAWSCredentialsProviderProcessor() { + + protected List getSupportedPropertyDescriptors() { + return Arrays.asList( + AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE, + AbstractAWSProcessor.CREDENTIALS_FILE, + AbstractAWSProcessor.ACCESS_KEY, + AbstractAWSProcessor.SECRET_KEY, + AbstractAWSProcessor.TIMEOUT + ); + } + + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + awsCredentials = credentials; + clientConfiguration = config; + final AmazonS3Client s3 = new AmazonS3Client(credentials, config); + return s3; + } + + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + awsCredentialsProvider = credentialsProvider; + clientConfiguration = config; + final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config); + return s3; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + } + }; + runner = TestRunners.newTestRunner(mockAwsProcessor); + } + + @Test + public void testAnonymousByDefault() { + runner.assertValid(); + runner.run(1); + + assertEquals(AnonymousAWSCredentials.class, awsCredentials.getClass()); + assertNull(awsCredentialsProvider); + } + + @Test + public void testAccessKeySecretKey() { + runner.setProperty(AbstractAWSProcessor.ACCESS_KEY, "testAccessKey"); + runner.setProperty(AbstractAWSProcessor.SECRET_KEY, "testSecretKey"); + + runner.assertValid(); + runner.run(1); + + assertEquals(BasicAWSCredentials.class, awsCredentials.getClass()); + assertNull(awsCredentialsProvider); + } + + @Test + public void testCredentialsFile() { + runner.setProperty(AbstractAWSProcessor.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties"); + + runner.assertValid(); + runner.run(1); + + assertEquals(PropertiesCredentials.class, awsCredentials.getClass()); + assertNull(awsCredentialsProvider); + } + + + @Test + public void testCredentialsProviderControllerService() throws InitializationException { + final AWSCredentialsProviderControllerService credsService = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", credsService); + runner.setProperty(credsService, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(credsService, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.enableControllerService(credsService); + + runner.setProperty(AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.assertValid(); + runner.run(1); + + assertEquals(StaticCredentialsProvider.class, awsCredentialsProvider.getClass()); + assertNull(awsCredentials); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/TestPutLambda.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/TestPutLambda.java new file mode 100644 index 0000000000..a8a3a9b414 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/TestPutLambda.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.lambda; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.lambda.AWSLambdaClient; +import com.amazonaws.services.lambda.model.InvalidParameterValueException; +import com.amazonaws.services.lambda.model.InvokeRequest; +import com.amazonaws.services.lambda.model.InvokeResult; +import com.amazonaws.services.lambda.model.TooManyRequestsException; +import com.amazonaws.util.Base64; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestPutLambda { + + private TestRunner runner = null; + private PutLambda mockPutLambda = null; + private AWSLambdaClient actualLambdaClient = null; + private AWSLambdaClient mockLambdaClient = null; + + @Before + public void setUp() { + mockLambdaClient = Mockito.mock(AWSLambdaClient.class); + mockPutLambda = new PutLambda() { + protected AWSLambdaClient getClient() { + actualLambdaClient = client; + return mockLambdaClient; + } + }; + runner = TestRunners.newTestRunner(mockPutLambda); + } + + @Test + public void testSizeGreaterThan6MB() throws Exception { + runner = TestRunners.newTestRunner(PutLambda.class); + runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello"); + runner.assertValid(); + byte [] largeInput = new byte[6000001]; + for (int i = 0; i < 6000001; i++) { + largeInput[i] = 'a'; + } + runner.enqueue(largeInput); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1); + } + + @Test + public void testPutLambdaSimple() { + runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function"); + runner.enqueue("TestContent"); + + InvokeResult invokeResult = new InvokeResult(); + invokeResult.setStatusCode(200); + invokeResult.setLogResult(Base64.encodeAsString("test-log-result".getBytes())); + invokeResult.setPayload(ByteBuffer.wrap("test-payload".getBytes())); + Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenReturn(invokeResult); + + runner.assertValid(); + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(InvokeRequest.class); + Mockito.verify(mockLambdaClient, Mockito.times(1)).invoke(captureRequest.capture()); + InvokeRequest request = captureRequest.getValue(); + assertEquals("test-function", request.getFunctionName()); + + runner.assertAllFlowFilesTransferred(PutLambda.REL_SUCCESS, 1); + final List flowFiles = runner.getFlowFilesForRelationship(PutLambda.REL_SUCCESS); + final MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE, "200"); + ff0.assertAttributeEquals(PutLambda.AWS_LAMBDA_RESULT_LOG, "test-log-result"); + ff0.assertAttributeEquals(PutLambda.AWS_LAMBDA_RESULT_PAYLOAD, "test-payload"); + } + + @Test + public void testPutLambdaParameterException() { + runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function"); + runner.enqueue("TestContent"); + Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenThrow(new InvalidParameterValueException("TestFail")); + + runner.assertValid(); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1); + } + + @Test + public void testPutLambdaTooManyRequestsException() { + runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function"); + runner.enqueue("TestContent"); + Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenThrow(new TooManyRequestsException("TestFail")); + + runner.assertValid(); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1); + final List flowFiles = runner.getFlowFilesForRelationship(PutLambda.REL_FAILURE); + final MockFlowFile ff0 = flowFiles.get(0); + assertTrue(ff0.isPenalized()); + } + + @Test + public void testPutLambdaAmazonException() { + runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function"); + runner.enqueue("TestContent"); + Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenThrow(new AmazonServiceException("TestFail")); + + runner.assertValid(); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1); + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java index ba7aedeccb..dc52c6b3d8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.aws.s3; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.TestRunner; @@ -25,11 +24,8 @@ import org.junit.Test; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * Provides integration level testing with actual AWS S3 resources for {@link DeleteS3Object} and requires additional configuration and resources to work. @@ -142,28 +138,4 @@ public class ITDeleteS3Object extends AbstractS3IT { runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); } - @Test - public void testGetPropertyDescriptors() throws Exception { - DeleteS3Object processor = new DeleteS3Object(); - List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 20, pd.size()); - assertTrue(pd.contains(processor.ACCESS_KEY)); - assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE)); - assertTrue(pd.contains(processor.BUCKET)); - assertTrue(pd.contains(processor.CREDENTIALS_FILE)); - assertTrue(pd.contains(processor.ENDPOINT_OVERRIDE)); - assertTrue(pd.contains(processor.FULL_CONTROL_USER_LIST)); - assertTrue(pd.contains(processor.KEY)); - assertTrue(pd.contains(processor.OWNER)); - assertTrue(pd.contains(processor.READ_ACL_LIST)); - assertTrue(pd.contains(processor.READ_USER_LIST)); - assertTrue(pd.contains(processor.REGION)); - assertTrue(pd.contains(processor.SECRET_KEY)); - assertTrue(pd.contains(processor.SIGNER_OVERRIDE)); - assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE)); - assertTrue(pd.contains(processor.TIMEOUT)); - assertTrue(pd.contains(processor.VERSION_ID)); - assertTrue(pd.contains(processor.WRITE_ACL_LIST)); - assertTrue(pd.contains(processor.WRITE_USER_LIST)); - } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java index bfb1e1e098..8bbc1f152c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.aws.s3; import com.amazonaws.services.s3.model.ObjectMetadata; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; @@ -31,8 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * Provides integration level testing with actual AWS S3 resources for {@link FetchS3Object} and requires additional configuration and resources to work. @@ -159,23 +156,4 @@ public class ITFetchS3Object extends AbstractS3IT { } } - - @Test - public void testGetPropertyDescriptors() throws Exception { - FetchS3Object processor = new FetchS3Object(); - List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 14, pd.size()); - assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); - assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); - assertTrue(pd.contains(FetchS3Object.BUCKET)); - assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE)); - assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE)); - assertTrue(pd.contains(FetchS3Object.KEY)); - assertTrue(pd.contains(FetchS3Object.REGION)); - assertTrue(pd.contains(FetchS3Object.SECRET_KEY)); - assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE)); - assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); - assertTrue(pd.contains(FetchS3Object.TIMEOUT)); - assertTrue(pd.contains(FetchS3Object.VERSION_ID)); - } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java index ce876415c6..e7a4482d3d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.aws.s3; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; @@ -27,8 +26,6 @@ import org.junit.Test; import java.io.IOException; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * Provides integration level testing with actual AWS S3 resources for {@link ListS3} and requires additional configuration and resources to work. @@ -145,25 +142,4 @@ public class ITListS3 extends AbstractS3IT { flowFiles.get(0).assertAttributeEquals("filename", "b/c"); } - @Test - public void testGetPropertyDescriptors() throws Exception { - ListS3 processor = new ListS3(); - List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 15, pd.size()); - assertTrue(pd.contains(ListS3.ACCESS_KEY)); - assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); - assertTrue(pd.contains(ListS3.BUCKET)); - assertTrue(pd.contains(ListS3.CREDENTIALS_FILE)); - assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE)); - assertTrue(pd.contains(ListS3.REGION)); - assertTrue(pd.contains(ListS3.SECRET_KEY)); - assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE)); - assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE)); - assertTrue(pd.contains(ListS3.TIMEOUT)); - assertTrue(pd.contains(ListS3.PROXY_HOST)); - assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); - assertTrue(pd.contains(ListS3.DELIMITER)); - assertTrue(pd.contains(ListS3.PREFIX)); - assertTrue(pd.contains(ListS3.USE_VERSIONS)); - } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 096ba2e096..2ac2d94547 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.processors.aws.s3; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; @@ -329,35 +326,6 @@ public class ITPutS3Object extends AbstractS3IT { runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); } - - @Test - public void testGetPropertyDescriptors() throws Exception { - PutS3Object processor = new PutS3Object(); - List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 28, pd.size()); - assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); - assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); - assertTrue(pd.contains(PutS3Object.BUCKET)); - assertTrue(pd.contains(PutS3Object.CANNED_ACL)); - assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE)); - assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE)); - assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST)); - assertTrue(pd.contains(PutS3Object.KEY)); - assertTrue(pd.contains(PutS3Object.OWNER)); - assertTrue(pd.contains(PutS3Object.READ_ACL_LIST)); - assertTrue(pd.contains(PutS3Object.READ_USER_LIST)); - assertTrue(pd.contains(PutS3Object.REGION)); - assertTrue(pd.contains(PutS3Object.SECRET_KEY)); - assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE)); - assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE)); - assertTrue(pd.contains(PutS3Object.TIMEOUT)); - assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID)); - assertTrue(pd.contains(PutS3Object.STORAGE_CLASS)); - assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); - assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); - assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); - } - @Test public void testDynamicProperty() throws IOException { final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp"; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java new file mode 100644 index 0000000000..381046c29a --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.DeleteVersionRequest; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestDeleteS3Object { + + private TestRunner runner = null; + private DeleteS3Object mockDeleteS3Object = null; + private AmazonS3Client actualS3Client = null; + private AmazonS3Client mockS3Client = null; + + @Before + public void setUp() { + mockS3Client = Mockito.mock(AmazonS3Client.class); + mockDeleteS3Object = new DeleteS3Object() { + protected AmazonS3Client getClient() { + actualS3Client = client; + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockDeleteS3Object); + } + + @Test + public void testDeleteObjectSimple() throws IOException { + runner.setProperty(DeleteS3Object.REGION, "us-west-2"); + runner.setProperty(DeleteS3Object.BUCKET, "test-bucket"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "delete-key"); + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(DeleteObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).deleteObject(captureRequest.capture()); + DeleteObjectRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + assertEquals("delete-key", request.getKey()); + Mockito.verify(mockS3Client, Mockito.never()).deleteVersion(Mockito.any(DeleteVersionRequest.class)); + } + + @Test + public void testDeleteObjectS3Exception() { + runner.setProperty(DeleteS3Object.REGION, "us-west-2"); + runner.setProperty(DeleteS3Object.BUCKET, "test-bucket"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "delete-key"); + runner.enqueue(new byte[0], attrs); + Mockito.doThrow(new AmazonS3Exception("NoSuchBucket")).when(mockS3Client).deleteObject(Mockito.any()); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1); + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(DeleteObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.never()).deleteVersion(Mockito.any(DeleteVersionRequest.class)); + } + + @Test + public void testDeleteVersionSimple() { + runner.setProperty(DeleteS3Object.REGION, "us-west-2"); + runner.setProperty(DeleteS3Object.BUCKET, "test-bucket"); + runner.setProperty(DeleteS3Object.VERSION_ID, "test-version"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "test-key"); + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(DeleteVersionRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).deleteVersion(captureRequest.capture()); + DeleteVersionRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + assertEquals("test-key", request.getKey()); + assertEquals("test-version", request.getVersionId()); + Mockito.verify(mockS3Client, Mockito.never()).deleteObject(Mockito.any(DeleteObjectRequest.class)); + } + + @Test + public void testDeleteVersionFromExpressions() { + runner.setProperty(DeleteS3Object.REGION, "us-west-2"); + runner.setProperty(DeleteS3Object.BUCKET, "${s3.bucket}"); + runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "test-key"); + attrs.put("s3.bucket", "test-bucket"); + attrs.put("s3.version", "test-version"); + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(DeleteVersionRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).deleteVersion(captureRequest.capture()); + DeleteVersionRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + assertEquals("test-key", request.getKey()); + assertEquals("test-version", request.getVersionId()); + Mockito.verify(mockS3Client, Mockito.never()).deleteObject(Mockito.any(DeleteObjectRequest.class)); + } + + @Test + public void testGetPropertyDescriptors() throws Exception { + DeleteS3Object processor = new DeleteS3Object(); + List pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 20, pd.size()); + assertTrue(pd.contains(processor.ACCESS_KEY)); + assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(processor.BUCKET)); + assertTrue(pd.contains(processor.CREDENTIALS_FILE)); + assertTrue(pd.contains(processor.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(processor.FULL_CONTROL_USER_LIST)); + assertTrue(pd.contains(processor.KEY)); + assertTrue(pd.contains(processor.OWNER)); + assertTrue(pd.contains(processor.READ_ACL_LIST)); + assertTrue(pd.contains(processor.READ_USER_LIST)); + assertTrue(pd.contains(processor.REGION)); + assertTrue(pd.contains(processor.SECRET_KEY)); + assertTrue(pd.contains(processor.SIGNER_OVERRIDE)); + assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(processor.TIMEOUT)); + assertTrue(pd.contains(processor.VERSION_ID)); + assertTrue(pd.contains(processor.WRITE_ACL_LIST)); + assertTrue(pd.contains(processor.WRITE_USER_LIST)); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java new file mode 100644 index 0000000000..1ebf79bbd0 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.util.StringInputStream; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + + +public class TestFetchS3Object { + + private TestRunner runner = null; + private FetchS3Object mockFetchS3Object = null; + private AmazonS3Client actualS3Client = null; + private AmazonS3Client mockS3Client = null; + + @Before + public void setUp() { + mockS3Client = Mockito.mock(AmazonS3Client.class); + mockFetchS3Object = new FetchS3Object() { + protected AmazonS3Client getClient() { + actualS3Client = client; + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockFetchS3Object); + } + + @Test + public void testGetObject() throws IOException { + runner.setProperty(FetchS3Object.REGION, "us-east-1"); + runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "request-key"); + runner.enqueue(new byte[0], attrs); + + S3Object s3ObjectResponse = new S3Object(); + s3ObjectResponse.setBucketName("response-bucket-name"); + s3ObjectResponse.setKey("response-key"); + s3ObjectResponse.setObjectContent(new StringInputStream("Some Content")); + ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class); + metadata.setContentDisposition("key/path/to/file.txt"); + metadata.setContentType("text/plain"); + metadata.setContentMD5("testMD5hash"); + Date expiration = new Date(); + metadata.setExpirationTime(expiration); + metadata.setExpirationTimeRuleId("testExpirationRuleId"); + Map userMetadata = new HashMap<>(); + userMetadata.put("userKey1", "userValue1"); + userMetadata.put("userKey2", "userValue2"); + metadata.setUserMetadata(userMetadata); + metadata.setSSEAlgorithm("testAlgorithm"); + Mockito.when(metadata.getETag()).thenReturn("test-etag"); + s3ObjectResponse.setObjectMetadata(metadata); + Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture()); + GetObjectRequest request = captureRequest.getValue(); + assertEquals("request-bucket", request.getBucketName()); + assertEquals("request-key", request.getKey()); + assertNull(request.getVersionId()); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + final List ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + MockFlowFile ff = ffs.get(0); + ff.assertAttributeEquals("s3.bucket", "response-bucket-name"); + ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt"); + ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to"); + ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt"); + ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); + ff.assertAttributeEquals("hash.value", "testMD5hash"); + ff.assertAttributeEquals("hash.algorithm", "MD5"); + ff.assertAttributeEquals("s3.etag", "test-etag"); + ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime())); + ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId"); + ff.assertAttributeEquals("userKey1", "userValue1"); + ff.assertAttributeEquals("userKey2", "userValue2"); + ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm"); + ff.assertContentEquals("Some Content"); + } + + @Test + public void testGetObjectVersion() throws IOException { + runner.setProperty(FetchS3Object.REGION, "us-east-1"); + runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); + runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "request-key"); + attrs.put("s3.version", "request-version"); + runner.enqueue(new byte[0], attrs); + + S3Object s3ObjectResponse = new S3Object(); + s3ObjectResponse.setBucketName("response-bucket-name"); + s3ObjectResponse.setObjectContent(new StringInputStream("Some Content")); + ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class); + metadata.setContentDisposition("key/path/to/file.txt"); + Mockito.when(metadata.getVersionId()).thenReturn("response-version"); + s3ObjectResponse.setObjectMetadata(metadata); + Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture()); + GetObjectRequest request = captureRequest.getValue(); + assertEquals("request-bucket", request.getBucketName()); + assertEquals("request-key", request.getKey()); + assertEquals("request-version", request.getVersionId()); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + final List ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + MockFlowFile ff = ffs.get(0); + ff.assertAttributeEquals("s3.bucket", "response-bucket-name"); + ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt"); + ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to"); + ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt"); + ff.assertAttributeEquals("s3.version", "response-version"); + ff.assertContentEquals("Some Content"); + } + + + @Test + public void testGetObjectExceptionGoesToFailure() throws IOException { + runner.setProperty(FetchS3Object.REGION, "us-east-1"); + runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "request-key"); + runner.enqueue(new byte[0], attrs); + Mockito.doThrow(new AmazonS3Exception("NoSuchBucket")).when(mockS3Client).getObject(Mockito.any()); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1); + } + + @Test + public void testGetPropertyDescriptors() throws Exception { + FetchS3Object processor = new FetchS3Object(); + List pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 14, pd.size()); + assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); + assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(FetchS3Object.BUCKET)); + assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE)); + assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(FetchS3Object.KEY)); + assertTrue(pd.contains(FetchS3Object.REGION)); + assertTrue(pd.contains(FetchS3Object.SECRET_KEY)); + assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE)); + assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(FetchS3Object.TIMEOUT)); + assertTrue(pd.contains(FetchS3Object.VERSION_ID)); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java new file mode 100644 index 0000000000..06b3683c81 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.IOException; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListVersionsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.S3VersionSummary; +import com.amazonaws.services.s3.model.VersionListing; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestListS3 { + + private TestRunner runner = null; + private ListS3 mockListS3 = null; + private AmazonS3Client actualS3Client = null; + private AmazonS3Client mockS3Client = null; + + @Before + public void setUp() { + mockS3Client = Mockito.mock(AmazonS3Client.class); + mockListS3 = new ListS3() { + protected AmazonS3Client getClient() { + actualS3Client = client; + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockListS3); + } + + + @Test + public void testList() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + + Date lastModified = new Date(); + ObjectListing objectListing = new ObjectListing(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("a"); + objectSummary1.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary1); + S3ObjectSummary objectSummary2 = new S3ObjectSummary(); + objectSummary2.setBucketName("test-bucket"); + objectSummary2.setKey("b/c"); + objectSummary2.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary2); + S3ObjectSummary objectSummary3 = new S3ObjectSummary(); + objectSummary3.setBucketName("test-bucket"); + objectSummary3.setKey("d/e"); + objectSummary3.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary3); + Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture()); + ListObjectsRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "a"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + String lastModifiedTimestamp = String.valueOf(lastModified.getTime()); + ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "d/e"); + runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + } + + @Test + public void testListVersions() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.USE_VERSIONS, "true"); + + Date lastModified = new Date(); + VersionListing versionListing = new VersionListing(); + S3VersionSummary versionSummary1 = new S3VersionSummary(); + versionSummary1.setBucketName("test-bucket"); + versionSummary1.setKey("test-key"); + versionSummary1.setVersionId("1"); + versionSummary1.setLastModified(lastModified); + versionListing.getVersionSummaries().add(versionSummary1); + S3VersionSummary versionSummary2 = new S3VersionSummary(); + versionSummary2.setBucketName("test-bucket"); + versionSummary2.setKey("test-key"); + versionSummary2.setVersionId("2"); + versionSummary2.setLastModified(lastModified); + versionListing.getVersionSummaries().add(versionSummary2); + Mockito.when(mockS3Client.listVersions(Mockito.any(ListVersionsRequest.class))).thenReturn(versionListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListVersionsRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listVersions(captureRequest.capture()); + ListVersionsRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + Mockito.verify(mockS3Client, Mockito.never()).listObjects(Mockito.any(ListObjectsRequest.class)); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "test-key"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + ff0.assertAttributeEquals("s3.lastModified", String.valueOf(lastModified.getTime())); + ff0.assertAttributeEquals("s3.version", "1"); + MockFlowFile ff1 = flowFiles.get(1); + ff1.assertAttributeEquals("filename", "test-key"); + ff1.assertAttributeEquals("s3.bucket", "test-bucket"); + ff1.assertAttributeEquals("s3.lastModified", String.valueOf(lastModified.getTime())); + ff1.assertAttributeEquals("s3.version", "2"); + } + + @Test + public void testListObjectsNothingNew() throws IOException { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + + Calendar calendar = Calendar.getInstance(); + calendar.set(2017, 5, 2); + Date objectLastModified = calendar.getTime(); + long stateCurrentTimestamp = objectLastModified.getTime(); + + Map state = new HashMap<>(); + state.put(ListS3.CURRENT_TIMESTAMP, String.valueOf(stateCurrentTimestamp)); + state.put(ListS3.CURRENT_KEY_PREFIX+"0", "test-key"); + MockStateManager mockStateManager = runner.getStateManager(); + mockStateManager.setState(state, Scope.CLUSTER); + + ObjectListing objectListing = new ObjectListing(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("test-key"); + objectSummary1.setLastModified(objectLastModified); + objectListing.getObjectSummaries().add(objectSummary1); + Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture()); + ListObjectsRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0); + } + + @Test + public void testGetPropertyDescriptors() throws Exception { + ListS3 processor = new ListS3(); + List pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 15, pd.size()); + assertTrue(pd.contains(ListS3.ACCESS_KEY)); + assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(ListS3.BUCKET)); + assertTrue(pd.contains(ListS3.CREDENTIALS_FILE)); + assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(ListS3.REGION)); + assertTrue(pd.contains(ListS3.SECRET_KEY)); + assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE)); + assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(ListS3.TIMEOUT)); + assertTrue(pd.contains(ListS3.PROXY_HOST)); + assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); + assertTrue(pd.contains(ListS3.DELIMITER)); + assertTrue(pd.contains(ListS3.PREFIX)); + assertTrue(pd.contains(ListS3.USE_VERSIONS)); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 00f4423911..0ee779240d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -16,10 +16,16 @@ */ package org.apache.nifi.processors.aws.s3; +import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -27,15 +33,94 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.MultipartUploadListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -/** - * Unit tests for {@link PutS3Object}, without interaction with S3. - */ public class TestPutS3Object { + private TestRunner runner = null; + private PutS3Object mockPutS3Object = null; + private AmazonS3Client actualS3Client = null; + private AmazonS3Client mockS3Client = null; + + @Before + public void setUp() { + mockS3Client = Mockito.mock(AmazonS3Client.class); + mockPutS3Object = new PutS3Object() { + protected AmazonS3Client getClient() { + actualS3Client = client; + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockPutS3Object); + } + + @Test + public void testPutSinglePart() { + runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); + runner.setProperty(PutS3Object.BUCKET, "test-bucket"); + runner.setProperty("x-custom-prop", "hello"); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "testfile.txt"); + runner.enqueue("Test Content", ffAttributes); + + PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class); + Date expiration = new Date(); + putObjectResult.setExpirationTime(expiration); + putObjectResult.setMetadata(new ObjectMetadata()); + putObjectResult.setVersionId("test-version"); + Mockito.when(putObjectResult.getETag()).thenReturn("test-etag"); + Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); + MultipartUploadListing uploadListing = new MultipartUploadListing(); + Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); + Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url"); + + runner.assertValid(); + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + PutObjectRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt"); + ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag"); + ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version"); + } + + @Test + public void testPutSinglePartException() { + runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); + runner.setProperty(PutS3Object.BUCKET, "test-bucket"); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "testfile.txt"); + runner.enqueue("Test Content", ffAttributes); + + MultipartUploadListing uploadListing = new MultipartUploadListing(); + Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); + Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail")); + + runner.assertValid(); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1); + } @Test public void testSignerOverrideOptions() { @@ -61,4 +146,32 @@ public class TestPutS3Object { } } + @Test + public void testGetPropertyDescriptors() throws Exception { + PutS3Object processor = new PutS3Object(); + List pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 28, pd.size()); + assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); + assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(PutS3Object.BUCKET)); + assertTrue(pd.contains(PutS3Object.CANNED_ACL)); + assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE)); + assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST)); + assertTrue(pd.contains(PutS3Object.KEY)); + assertTrue(pd.contains(PutS3Object.OWNER)); + assertTrue(pd.contains(PutS3Object.READ_ACL_LIST)); + assertTrue(pd.contains(PutS3Object.READ_USER_LIST)); + assertTrue(pd.contains(PutS3Object.REGION)); + assertTrue(pd.contains(PutS3Object.SECRET_KEY)); + assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE)); + assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(PutS3Object.TIMEOUT)); + assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID)); + assertTrue(pd.contains(PutS3Object.STORAGE_CLASS)); + assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); + assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); + assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); + } + } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java new file mode 100644 index 0000000000..663af3377c --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sns; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.AmazonSNSException; +import com.amazonaws.services.sns.model.PublishRequest; +import com.amazonaws.services.sns.model.PublishResult; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestPutSNS { + + private TestRunner runner = null; + private PutSNS mockPutSNS = null; + private AmazonSNSClient actualSNSClient = null; + private AmazonSNSClient mockSNSClient = null; + + @Before + public void setUp() { + mockSNSClient = Mockito.mock(AmazonSNSClient.class); + mockPutSNS = new PutSNS() { + protected AmazonSNSClient getClient() { + actualSNSClient = client; + return mockSNSClient; + } + }; + runner = TestRunners.newTestRunner(mockPutSNS); + } + + @Test + public void testPublish() throws IOException { + runner.setValidateExpressionUsage(false); + runner.setProperty(PutSNS.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties"); + runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1"); + runner.setProperty(PutSNS.SUBJECT, "${eval.subject}"); + assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid()); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "1.txt"); + ffAttributes.put("eval.subject", "test-subject"); + runner.enqueue("Test Message Content", ffAttributes); + + PublishResult mockPublishResult = new PublishResult(); + Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PublishRequest.class); + Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture()); + PublishRequest request = captureRequest.getValue(); + assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1", request.getTopicArn()); + assertEquals("Test Message Content", request.getMessage()); + assertEquals("test-subject", request.getSubject()); + assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue()); + + runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt"); + } + + @Test + public void testPublishFailure() throws IOException { + runner.setValidateExpressionUsage(false); + runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1"); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "1.txt"); + runner.enqueue("Test Message Content", ffAttributes); + Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenThrow(new AmazonSNSException("Fail")); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PublishRequest.class); + Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture()); + runner.assertAllFlowFilesTransferred(PutSNS.REL_FAILURE, 1); + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITGetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITGetSQS.java new file mode 100644 index 0000000000..ac9383e810 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITGetSQS.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.util.List; + +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.sns.PutSNS; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") +public class ITGetSQS { + + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Test + public void testSimpleGet() { + final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); + runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(GetSQS.TIMEOUT, "30 secs"); + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); + + runner.run(1); + + final List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + System.out.println(mff.getAttributes()); + System.out.println(new String(mff.toByteArray())); + } + } + + @Test + public void testSimpleGetWithEL() { + System.setProperty("test-account-property", "100515378163"); + System.setProperty("test-queue-property", "test-queue-000000000"); + final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); + runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(GetSQS.TIMEOUT, "30 secs"); + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/${test-account-property}/${test-queue-property}"); + + runner.run(1); + + final List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + System.out.println(mff.getAttributes()); + System.out.println(new String(mff.toByteArray())); + } + } + + @Test + public void testSimpleGetUsingCredentialsProviderService() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); + + runner.setProperty(GetSQS.TIMEOUT, "30 secs"); + String queueUrl = "Add queue url here"; + runner.setProperty(GetSQS.QUEUE_URL, queueUrl); + + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.run(1); + + final List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + System.out.println(mff.getAttributes()); + System.out.println(new String(mff.toByteArray())); + } + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java new file mode 100644 index 0000000000..69544a23c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITPutSQS.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.sns.PutSNS; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") +public class ITPutSQS { + + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Test + public void testSimplePut() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); + runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutSQS.TIMEOUT, "30 secs"); + runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "1.txt"); + runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); + } + + @Test + public void testSimplePutUsingCredentialsProviderService() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); + + runner.setProperty(PutSQS.TIMEOUT, "30 secs"); + String queueUrl = "Add queue url here"; + runner.setProperty(PutSQS.QUEUE_URL, queueUrl); + runner.setValidateExpressionUsage(false); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "1.txt"); + runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); + runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.run(1); + + final List flowFiles = runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + System.out.println(mff.getAttributes()); + System.out.println(new String(mff.toByteArray())); + } + + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java new file mode 100644 index 0000000000..11cc7cdbc7 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.AmazonSQSException; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; + + +public class TestDeleteSQS { + + private TestRunner runner = null; + private DeleteSQS mockDeleteSQS = null; + private AmazonSQSClient actualSQSClient = null; + private AmazonSQSClient mockSQSClient = null; + + @Before + public void setUp() { + mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockDeleteSQS = new DeleteSQS() { + + protected List getSupportedPropertyDescriptors() { + return Arrays.asList(RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT); + } + + protected AmazonSQSClient getClient() { + actualSQSClient = client; + return mockSQSClient; + } + }; + runner = TestRunners.newTestRunner(mockDeleteSQS); + } + + @Test + public void testDeleteSingleMessage() { + runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "1.txt"); + ffAttributes.put("sqs.receipt.handle", "test-receipt-handle-1"); + runner.enqueue("TestMessageBody", ffAttributes); + + runner.assertValid(); + runner.run(1); + + ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); + DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl()); + assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle()); + + runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1); + } + + @Test + public void testDeleteException() { + runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + final Map ff1Attributes = new HashMap<>(); + ff1Attributes.put("filename", "1.txt"); + ff1Attributes.put("sqs.receipt.handle", "test-receipt-handle-1"); + runner.enqueue("TestMessageBody1", ff1Attributes); + Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class))) + .thenThrow(new AmazonSQSException("TestFail")); + + runner.assertValid(); + runner.run(1); + + ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); + + runner.assertAllFlowFilesTransferred(DeleteSQS.REL_FAILURE, 1); + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java index 22d4d9bf11..1253e737ac 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java @@ -18,79 +18,134 @@ package org.apache.nifi.processors.aws.sqs; import java.util.List; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; -import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; -import org.apache.nifi.processors.aws.sns.PutSNS; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Ignore; -import org.junit.Test; -@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; + + public class TestGetSQS { - private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + private TestRunner runner = null; + private GetSQS mockGetSQS = null; + private AmazonSQSClient actualSQSClient = null; + private AmazonSQSClient mockSQSClient = null; - @Test - public void testSimpleGet() { - final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); - runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(GetSQS.TIMEOUT, "30 secs"); - runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); - - runner.run(1); - - final List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + @Before + public void setUp() { + mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockGetSQS = new GetSQS() { + protected AmazonSQSClient getClient() { + actualSQSClient = client; + return mockSQSClient; + } + }; + runner = TestRunners.newTestRunner(mockGetSQS); } @Test - public void testSimpleGetWithEL() { - System.setProperty("test-account-property", "100515378163"); - System.setProperty("test-queue-property", "test-queue-000000000"); - final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); - runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(GetSQS.TIMEOUT, "30 secs"); - runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/${test-account-property}/${test-queue-property}"); + public void testGetMessageNoAutoDelete() { + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + runner.setProperty(GetSQS.AUTO_DELETE, "false"); + + Message message1 = new Message(); + message1.setBody("TestMessage1"); + message1.addAttributesEntry("attrib-key-1", "attrib-value-1"); + MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); + messageAttributeValue.setStringValue("msg-attrib-value-1"); + message1.addMessageAttributesEntry("msg-attrib-key-1", messageAttributeValue); + message1.setMD5OfBody("test-md5-hash-1"); + message1.setMessageId("test-message-id-1"); + message1.setReceiptHandle("test-receipt-handle-1"); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult() + .withMessages(message1); + Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); runner.run(1); - final List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture()); + ReceiveMessageRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + Mockito.verify(mockSQSClient, Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class)); + + runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1"); + ff0.assertAttributeEquals("sqs.msg-attrib-key-1", "msg-attrib-value-1"); + ff0.assertAttributeEquals("hash.value", "test-md5-hash-1"); + ff0.assertAttributeEquals("hash.algorithm", "md5"); + ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1"); + ff0.assertAttributeEquals("sqs.receipt.handle", "test-receipt-handle-1"); } @Test - public void testSimpleGetUsingCredentialsProviderService() throws Throwable { - final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); + public void testGetNoMessages() { + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); + Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); - runner.setProperty(GetSQS.TIMEOUT, "30 secs"); - String queueUrl = "Add queue url here"; - runner.setProperty(GetSQS.QUEUE_URL, queueUrl); - - final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); - - runner.addControllerService("awsCredentialsProvider", serviceImpl); - - runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); - runner.enableControllerService(serviceImpl); - - runner.assertValid(serviceImpl); - - runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); runner.run(1); - final List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture()); + ReceiveMessageRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + + runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0); + } + + @Test + public void testGetMessageAndAutoDelete() { + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + runner.setProperty(GetSQS.AUTO_DELETE, "true"); + + Message message1 = new Message(); + message1.setBody("TestMessage1"); + message1.setMessageId("test-message-id-1"); + message1.setReceiptHandle("test-receipt-handle-1"); + Message message2 = new Message(); + message2.setBody("TestMessage2"); + message2.setMessageId("test-message-id-2"); + message2.setReceiptHandle("test-receipt-handle-2"); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult() + .withMessages(message1, message2); + Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); + + runner.run(1); + + ArgumentCaptor captureReceiveRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureReceiveRequest.capture()); + ReceiveMessageRequest receiveRequest = captureReceiveRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.getQueueUrl()); + + ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); + DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl()); + assertEquals("test-message-id-1", deleteRequest.getEntries().get(0).getId()); + assertEquals("test-message-id-2", deleteRequest.getEntries().get(1).getId()); + + runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1"); + MockFlowFile ff1 = flowFiles.get(1); + ff1.assertAttributeEquals("sqs.message.id", "test-message-id-2"); } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java index 7e21b8c24c..39bef8bd62 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java @@ -17,70 +17,90 @@ package org.apache.nifi.processors.aws.sqs; import java.io.IOException; -import java.nio.file.Paths; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; -import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; -import org.apache.nifi.processors.aws.sns.PutSNS; -import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.AmazonSQSException; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchResult; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; + + public class TestPutSQS { - private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + private TestRunner runner = null; + private PutSQS mockPutSQS = null; + private AmazonSQSClient actualSQSClient = null; + private AmazonSQSClient mockSQSClient = null; + + @Before + public void setUp() { + mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockPutSQS = new PutSQS() { + protected AmazonSQSClient getClient() { + actualSQSClient = client; + return mockSQSClient; + } + }; + runner = TestRunners.newTestRunner(mockPutSQS); + } @Test public void testSimplePut() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); - runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutSQS.TIMEOUT, "30 secs"); - runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); + runner.setValidateExpressionUsage(false); + runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); final Map attrs = new HashMap<>(); attrs.put("filename", "1.txt"); - runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); + runner.enqueue("TestMessageBody", attrs); + + SendMessageBatchResult batchResult = new SendMessageBatchResult(); + Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult); + runner.run(1); + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); + SendMessageBatchRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue()); + assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); } @Test - public void testSimplePutUsingCredentialsProviderService() throws Throwable { - final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); - - runner.setProperty(PutSQS.TIMEOUT, "30 secs"); - String queueUrl = "Add queue url here"; - runner.setProperty(PutSQS.QUEUE_URL, queueUrl); + public void testPutException() throws IOException { runner.setValidateExpressionUsage(false); - final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); - - runner.addControllerService("awsCredentialsProvider", serviceImpl); - - runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); - runner.enableControllerService(serviceImpl); - - runner.assertValid(serviceImpl); + runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); final Map attrs = new HashMap<>(); attrs.put("filename", "1.txt"); - runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); - runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.enqueue("TestMessageBody", attrs); + + Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new AmazonSQSException("TestFail")); + runner.run(1); - final List flowFiles = runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); + SendMessageBatchRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1); } + }