NIFI-3903 Unit Tests for AWS Processors

* Added non-integration unit tests for Lambda, S3, SNS, and SQS processors
* Moved non-integration tests out of integration test files
* Moved SQS integration tests to IT* files for clarification

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1814.
This commit is contained in:
James Wing 2017-05-11 14:48:57 -07:00 committed by Pierre Villard
parent 0a7d149656
commit 9238fdb493
16 changed files with 1536 additions and 200 deletions

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
import java.util.Arrays;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Unit tests for AWS Credential specification based on {@link AbstractAWSProcessor} and
* [@link AbstractAWSCredentialsProviderProcessor}, without interaction with S3.
*/
public class TestAWSCredentials {
private TestRunner runner = null;
private AbstractAWSProcessor mockAwsProcessor = null;
private AWSCredentials awsCredentials = null;
private AWSCredentialsProvider awsCredentialsProvider = null;
private ClientConfiguration clientConfiguration = null;
@Before
public void setUp() {
mockAwsProcessor = new AbstractAWSCredentialsProviderProcessor<AmazonS3Client>() {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(
AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE,
AbstractAWSProcessor.CREDENTIALS_FILE,
AbstractAWSProcessor.ACCESS_KEY,
AbstractAWSProcessor.SECRET_KEY,
AbstractAWSProcessor.TIMEOUT
);
}
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
awsCredentials = credentials;
clientConfiguration = config;
final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
return s3;
}
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
awsCredentialsProvider = credentialsProvider;
clientConfiguration = config;
final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config);
return s3;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
}
};
runner = TestRunners.newTestRunner(mockAwsProcessor);
}
@Test
public void testAnonymousByDefault() {
runner.assertValid();
runner.run(1);
assertEquals(AnonymousAWSCredentials.class, awsCredentials.getClass());
assertNull(awsCredentialsProvider);
}
@Test
public void testAccessKeySecretKey() {
runner.setProperty(AbstractAWSProcessor.ACCESS_KEY, "testAccessKey");
runner.setProperty(AbstractAWSProcessor.SECRET_KEY, "testSecretKey");
runner.assertValid();
runner.run(1);
assertEquals(BasicAWSCredentials.class, awsCredentials.getClass());
assertNull(awsCredentialsProvider);
}
@Test
public void testCredentialsFile() {
runner.setProperty(AbstractAWSProcessor.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
runner.assertValid();
runner.run(1);
assertEquals(PropertiesCredentials.class, awsCredentials.getClass());
assertNull(awsCredentialsProvider);
}
@Test
public void testCredentialsProviderControllerService() throws InitializationException {
final AWSCredentialsProviderControllerService credsService = new AWSCredentialsProviderControllerService();
runner.addControllerService("awsCredentialsProvider", credsService);
runner.setProperty(credsService, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
runner.setProperty(credsService, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
runner.enableControllerService(credsService);
runner.setProperty(AbstractAWSCredentialsProviderProcessor.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.assertValid();
runner.run(1);
assertEquals(StaticCredentialsProvider.class, awsCredentialsProvider.getClass());
assertNull(awsCredentials);
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.lambda;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.lambda.AWSLambdaClient;
import com.amazonaws.services.lambda.model.InvalidParameterValueException;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.TooManyRequestsException;
import com.amazonaws.util.Base64;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestPutLambda {
private TestRunner runner = null;
private PutLambda mockPutLambda = null;
private AWSLambdaClient actualLambdaClient = null;
private AWSLambdaClient mockLambdaClient = null;
@Before
public void setUp() {
mockLambdaClient = Mockito.mock(AWSLambdaClient.class);
mockPutLambda = new PutLambda() {
protected AWSLambdaClient getClient() {
actualLambdaClient = client;
return mockLambdaClient;
}
};
runner = TestRunners.newTestRunner(mockPutLambda);
}
@Test
public void testSizeGreaterThan6MB() throws Exception {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
runner.assertValid();
byte [] largeInput = new byte[6000001];
for (int i = 0; i < 6000001; i++) {
largeInput[i] = 'a';
}
runner.enqueue(largeInput);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
}
@Test
public void testPutLambdaSimple() {
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function");
runner.enqueue("TestContent");
InvokeResult invokeResult = new InvokeResult();
invokeResult.setStatusCode(200);
invokeResult.setLogResult(Base64.encodeAsString("test-log-result".getBytes()));
invokeResult.setPayload(ByteBuffer.wrap("test-payload".getBytes()));
Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenReturn(invokeResult);
runner.assertValid();
runner.run(1);
ArgumentCaptor<InvokeRequest> captureRequest = ArgumentCaptor.forClass(InvokeRequest.class);
Mockito.verify(mockLambdaClient, Mockito.times(1)).invoke(captureRequest.capture());
InvokeRequest request = captureRequest.getValue();
assertEquals("test-function", request.getFunctionName());
runner.assertAllFlowFilesTransferred(PutLambda.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutLambda.REL_SUCCESS);
final MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE, "200");
ff0.assertAttributeEquals(PutLambda.AWS_LAMBDA_RESULT_LOG, "test-log-result");
ff0.assertAttributeEquals(PutLambda.AWS_LAMBDA_RESULT_PAYLOAD, "test-payload");
}
@Test
public void testPutLambdaParameterException() {
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function");
runner.enqueue("TestContent");
Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenThrow(new InvalidParameterValueException("TestFail"));
runner.assertValid();
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
}
@Test
public void testPutLambdaTooManyRequestsException() {
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function");
runner.enqueue("TestContent");
Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenThrow(new TooManyRequestsException("TestFail"));
runner.assertValid();
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutLambda.REL_FAILURE);
final MockFlowFile ff0 = flowFiles.get(0);
assertTrue(ff0.isPenalized());
}
@Test
public void testPutLambdaAmazonException() {
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "test-function");
runner.enqueue("TestContent");
Mockito.when(mockLambdaClient.invoke(Mockito.any(InvokeRequest.class))).thenThrow(new AmazonServiceException("TestFail"));
runner.assertValid();
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
}
}

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.processors.aws.s3; package org.apache.nifi.processors.aws.s3;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -25,11 +24,8 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* Provides integration level testing with actual AWS S3 resources for {@link DeleteS3Object} and requires additional configuration and resources to work. * Provides integration level testing with actual AWS S3 resources for {@link DeleteS3Object} and requires additional configuration and resources to work.
@ -142,28 +138,4 @@ public class ITDeleteS3Object extends AbstractS3IT {
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
} }
@Test
public void testGetPropertyDescriptors() throws Exception {
DeleteS3Object processor = new DeleteS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 20, pd.size());
assertTrue(pd.contains(processor.ACCESS_KEY));
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(processor.BUCKET));
assertTrue(pd.contains(processor.CREDENTIALS_FILE));
assertTrue(pd.contains(processor.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(processor.FULL_CONTROL_USER_LIST));
assertTrue(pd.contains(processor.KEY));
assertTrue(pd.contains(processor.OWNER));
assertTrue(pd.contains(processor.READ_ACL_LIST));
assertTrue(pd.contains(processor.READ_USER_LIST));
assertTrue(pd.contains(processor.REGION));
assertTrue(pd.contains(processor.SECRET_KEY));
assertTrue(pd.contains(processor.SIGNER_OVERRIDE));
assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(processor.TIMEOUT));
assertTrue(pd.contains(processor.VERSION_ID));
assertTrue(pd.contains(processor.WRITE_ACL_LIST));
assertTrue(pd.contains(processor.WRITE_USER_LIST));
}
} }

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.aws.s3; package org.apache.nifi.processors.aws.s3;
import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -31,8 +30,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* Provides integration level testing with actual AWS S3 resources for {@link FetchS3Object} and requires additional configuration and resources to work. * Provides integration level testing with actual AWS S3 resources for {@link FetchS3Object} and requires additional configuration and resources to work.
@ -159,23 +156,4 @@ public class ITFetchS3Object extends AbstractS3IT {
} }
} }
@Test
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 14, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET));
assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE));
assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(FetchS3Object.KEY));
assertTrue(pd.contains(FetchS3Object.REGION));
assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(FetchS3Object.TIMEOUT));
assertTrue(pd.contains(FetchS3Object.VERSION_ID));
}
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.nifi.processors.aws.s3; package org.apache.nifi.processors.aws.s3;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -27,8 +26,6 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* Provides integration level testing with actual AWS S3 resources for {@link ListS3} and requires additional configuration and resources to work. * Provides integration level testing with actual AWS S3 resources for {@link ListS3} and requires additional configuration and resources to work.
@ -145,25 +142,4 @@ public class ITListS3 extends AbstractS3IT {
flowFiles.get(0).assertAttributeEquals("filename", "b/c"); flowFiles.get(0).assertAttributeEquals("filename", "b/c");
} }
@Test
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 15, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));
assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(ListS3.REGION));
assertTrue(pd.contains(ListS3.SECRET_KEY));
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(ListS3.TIMEOUT));
assertTrue(pd.contains(ListS3.PROXY_HOST));
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX));
assertTrue(pd.contains(ListS3.USE_VERSIONS));
}
} }

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.nifi.processors.aws.s3; package org.apache.nifi.processors.aws.s3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
@ -329,35 +326,6 @@ public class ITPutS3Object extends AbstractS3IT {
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
} }
@Test
public void testGetPropertyDescriptors() throws Exception {
PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 28, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
assertTrue(pd.contains(PutS3Object.CANNED_ACL));
assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE));
assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST));
assertTrue(pd.contains(PutS3Object.KEY));
assertTrue(pd.contains(PutS3Object.OWNER));
assertTrue(pd.contains(PutS3Object.READ_ACL_LIST));
assertTrue(pd.contains(PutS3Object.READ_USER_LIST));
assertTrue(pd.contains(PutS3Object.REGION));
assertTrue(pd.contains(PutS3Object.SECRET_KEY));
assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(PutS3Object.TIMEOUT));
assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID));
assertTrue(pd.contains(PutS3Object.STORAGE_CLASS));
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
}
@Test @Test
public void testDynamicProperty() throws IOException { public void testDynamicProperty() throws IOException {
final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp"; final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp";

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestDeleteS3Object {
private TestRunner runner = null;
private DeleteS3Object mockDeleteS3Object = null;
private AmazonS3Client actualS3Client = null;
private AmazonS3Client mockS3Client = null;
@Before
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockDeleteS3Object = new DeleteS3Object() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockDeleteS3Object);
}
@Test
public void testDeleteObjectSimple() throws IOException {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
ArgumentCaptor<DeleteObjectRequest> captureRequest = ArgumentCaptor.forClass(DeleteObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).deleteObject(captureRequest.capture());
DeleteObjectRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("delete-key", request.getKey());
Mockito.verify(mockS3Client, Mockito.never()).deleteVersion(Mockito.any(DeleteVersionRequest.class));
}
@Test
public void testDeleteObjectS3Exception() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "delete-key");
runner.enqueue(new byte[0], attrs);
Mockito.doThrow(new AmazonS3Exception("NoSuchBucket")).when(mockS3Client).deleteObject(Mockito.any());
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
ArgumentCaptor<DeleteObjectRequest> captureRequest = ArgumentCaptor.forClass(DeleteObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.never()).deleteVersion(Mockito.any(DeleteVersionRequest.class));
}
@Test
public void testDeleteVersionSimple() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "test-bucket");
runner.setProperty(DeleteS3Object.VERSION_ID, "test-version");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test-key");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
ArgumentCaptor<DeleteVersionRequest> captureRequest = ArgumentCaptor.forClass(DeleteVersionRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).deleteVersion(captureRequest.capture());
DeleteVersionRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("test-key", request.getKey());
assertEquals("test-version", request.getVersionId());
Mockito.verify(mockS3Client, Mockito.never()).deleteObject(Mockito.any(DeleteObjectRequest.class));
}
@Test
public void testDeleteVersionFromExpressions() {
runner.setProperty(DeleteS3Object.REGION, "us-west-2");
runner.setProperty(DeleteS3Object.BUCKET, "${s3.bucket}");
runner.setProperty(DeleteS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test-key");
attrs.put("s3.bucket", "test-bucket");
attrs.put("s3.version", "test-version");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
ArgumentCaptor<DeleteVersionRequest> captureRequest = ArgumentCaptor.forClass(DeleteVersionRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).deleteVersion(captureRequest.capture());
DeleteVersionRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("test-key", request.getKey());
assertEquals("test-version", request.getVersionId());
Mockito.verify(mockS3Client, Mockito.never()).deleteObject(Mockito.any(DeleteObjectRequest.class));
}
@Test
public void testGetPropertyDescriptors() throws Exception {
DeleteS3Object processor = new DeleteS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 20, pd.size());
assertTrue(pd.contains(processor.ACCESS_KEY));
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(processor.BUCKET));
assertTrue(pd.contains(processor.CREDENTIALS_FILE));
assertTrue(pd.contains(processor.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(processor.FULL_CONTROL_USER_LIST));
assertTrue(pd.contains(processor.KEY));
assertTrue(pd.contains(processor.OWNER));
assertTrue(pd.contains(processor.READ_ACL_LIST));
assertTrue(pd.contains(processor.READ_USER_LIST));
assertTrue(pd.contains(processor.REGION));
assertTrue(pd.contains(processor.SECRET_KEY));
assertTrue(pd.contains(processor.SIGNER_OVERRIDE));
assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(processor.TIMEOUT));
assertTrue(pd.contains(processor.VERSION_ID));
assertTrue(pd.contains(processor.WRITE_ACL_LIST));
assertTrue(pd.contains(processor.WRITE_USER_LIST));
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.util.StringInputStream;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestFetchS3Object {
private TestRunner runner = null;
private FetchS3Object mockFetchS3Object = null;
private AmazonS3Client actualS3Client = null;
private AmazonS3Client mockS3Client = null;
@Before
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockFetchS3Object = new FetchS3Object() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockFetchS3Object);
}
@Test
public void testGetObject() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);
S3Object s3ObjectResponse = new S3Object();
s3ObjectResponse.setBucketName("response-bucket-name");
s3ObjectResponse.setKey("response-key");
s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
metadata.setContentDisposition("key/path/to/file.txt");
metadata.setContentType("text/plain");
metadata.setContentMD5("testMD5hash");
Date expiration = new Date();
metadata.setExpirationTime(expiration);
metadata.setExpirationTimeRuleId("testExpirationRuleId");
Map<String, String> userMetadata = new HashMap<>();
userMetadata.put("userKey1", "userValue1");
userMetadata.put("userKey2", "userValue2");
metadata.setUserMetadata(userMetadata);
metadata.setSSEAlgorithm("testAlgorithm");
Mockito.when(metadata.getETag()).thenReturn("test-etag");
s3ObjectResponse.setObjectMetadata(metadata);
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
runner.run(1);
ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
GetObjectRequest request = captureRequest.getValue();
assertEquals("request-bucket", request.getBucketName());
assertEquals("request-key", request.getKey());
assertNull(request.getVersionId());
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
ff.assertAttributeEquals("hash.value", "testMD5hash");
ff.assertAttributeEquals("hash.algorithm", "MD5");
ff.assertAttributeEquals("s3.etag", "test-etag");
ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime()));
ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId");
ff.assertAttributeEquals("userKey1", "userValue1");
ff.assertAttributeEquals("userKey2", "userValue2");
ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm");
ff.assertContentEquals("Some Content");
}
@Test
public void testGetObjectVersion() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
runner.setProperty(FetchS3Object.VERSION_ID, "${s3.version}");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
attrs.put("s3.version", "request-version");
runner.enqueue(new byte[0], attrs);
S3Object s3ObjectResponse = new S3Object();
s3ObjectResponse.setBucketName("response-bucket-name");
s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
metadata.setContentDisposition("key/path/to/file.txt");
Mockito.when(metadata.getVersionId()).thenReturn("response-version");
s3ObjectResponse.setObjectMetadata(metadata);
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
runner.run(1);
ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
GetObjectRequest request = captureRequest.getValue();
assertEquals("request-bucket", request.getBucketName());
assertEquals("request-key", request.getKey());
assertEquals("request-version", request.getVersionId());
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
ff.assertAttributeEquals("s3.version", "response-version");
ff.assertContentEquals("Some Content");
}
@Test
public void testGetObjectExceptionGoesToFailure() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);
Mockito.doThrow(new AmazonS3Exception("NoSuchBucket")).when(mockS3Client).getObject(Mockito.any());
runner.run(1);
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
}
@Test
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 14, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET));
assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE));
assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(FetchS3Object.KEY));
assertTrue(pd.contains(FetchS3Object.REGION));
assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(FetchS3Object.TIMEOUT));
assertTrue(pd.contains(FetchS3Object.VERSION_ID));
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestListS3 {
private TestRunner runner = null;
private ListS3 mockListS3 = null;
private AmazonS3Client actualS3Client = null;
private AmazonS3Client mockS3Client = null;
@Before
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockListS3 = new ListS3() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockListS3);
}
@Test
public void testList() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
Date lastModified = new Date();
ObjectListing objectListing = new ObjectListing();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("a");
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("b/c");
objectSummary2.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test
public void testListVersions() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.USE_VERSIONS, "true");
Date lastModified = new Date();
VersionListing versionListing = new VersionListing();
S3VersionSummary versionSummary1 = new S3VersionSummary();
versionSummary1.setBucketName("test-bucket");
versionSummary1.setKey("test-key");
versionSummary1.setVersionId("1");
versionSummary1.setLastModified(lastModified);
versionListing.getVersionSummaries().add(versionSummary1);
S3VersionSummary versionSummary2 = new S3VersionSummary();
versionSummary2.setBucketName("test-bucket");
versionSummary2.setKey("test-key");
versionSummary2.setVersionId("2");
versionSummary2.setLastModified(lastModified);
versionListing.getVersionSummaries().add(versionSummary2);
Mockito.when(mockS3Client.listVersions(Mockito.any(ListVersionsRequest.class))).thenReturn(versionListing);
runner.run();
ArgumentCaptor<ListVersionsRequest> captureRequest = ArgumentCaptor.forClass(ListVersionsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listVersions(captureRequest.capture());
ListVersionsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listObjects(Mockito.any(ListObjectsRequest.class));
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "test-key");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
ff0.assertAttributeEquals("s3.lastModified", String.valueOf(lastModified.getTime()));
ff0.assertAttributeEquals("s3.version", "1");
MockFlowFile ff1 = flowFiles.get(1);
ff1.assertAttributeEquals("filename", "test-key");
ff1.assertAttributeEquals("s3.bucket", "test-bucket");
ff1.assertAttributeEquals("s3.lastModified", String.valueOf(lastModified.getTime()));
ff1.assertAttributeEquals("s3.version", "2");
}
@Test
public void testListObjectsNothingNew() throws IOException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
Calendar calendar = Calendar.getInstance();
calendar.set(2017, 5, 2);
Date objectLastModified = calendar.getTime();
long stateCurrentTimestamp = objectLastModified.getTime();
Map<String, String> state = new HashMap<>();
state.put(ListS3.CURRENT_TIMESTAMP, String.valueOf(stateCurrentTimestamp));
state.put(ListS3.CURRENT_KEY_PREFIX+"0", "test-key");
MockStateManager mockStateManager = runner.getStateManager();
mockStateManager.setState(state, Scope.CLUSTER);
ObjectListing objectListing = new ObjectListing();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("test-key");
objectSummary1.setLastModified(objectLastModified);
objectListing.getObjectSummaries().add(objectSummary1);
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0);
}
@Test
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 15, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));
assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(ListS3.REGION));
assertTrue(pd.contains(ListS3.SECRET_KEY));
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(ListS3.TIMEOUT));
assertTrue(pd.contains(ListS3.PROXY_HOST));
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX));
assertTrue(pd.contains(ListS3.USE_VERSIONS));
}
}

View File

@ -16,10 +16,16 @@
*/ */
package org.apache.nifi.processors.aws.s3; package org.apache.nifi.processors.aws.s3;
import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -27,15 +33,94 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link PutS3Object}, without interaction with S3.
*/
public class TestPutS3Object { public class TestPutS3Object {
private TestRunner runner = null;
private PutS3Object mockPutS3Object = null;
private AmazonS3Client actualS3Client = null;
private AmazonS3Client mockS3Client = null;
@Before
public void setUp() {
mockS3Client = Mockito.mock(AmazonS3Client.class);
mockPutS3Object = new PutS3Object() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockPutS3Object);
}
@Test
public void testPutSinglePart() {
runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
runner.setProperty("x-custom-prop", "hello");
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "testfile.txt");
runner.enqueue("Test Content", ffAttributes);
PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class);
Date expiration = new Date();
putObjectResult.setExpirationTime(expiration);
putObjectResult.setMetadata(new ObjectMetadata());
putObjectResult.setVersionId("test-version");
Mockito.when(putObjectResult.getETag()).thenReturn("test-etag");
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
MultipartUploadListing uploadListing = new MultipartUploadListing();
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url");
runner.assertValid();
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt");
ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version");
}
@Test
public void testPutSinglePartException() {
runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "testfile.txt");
runner.enqueue("Test Content", ffAttributes);
MultipartUploadListing uploadListing = new MultipartUploadListing();
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail"));
runner.assertValid();
runner.run(1);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1);
}
@Test @Test
public void testSignerOverrideOptions() { public void testSignerOverrideOptions() {
@ -61,4 +146,32 @@ public class TestPutS3Object {
} }
} }
@Test
public void testGetPropertyDescriptors() throws Exception {
PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 28, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
assertTrue(pd.contains(PutS3Object.CANNED_ACL));
assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE));
assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST));
assertTrue(pd.contains(PutS3Object.KEY));
assertTrue(pd.contains(PutS3Object.OWNER));
assertTrue(pd.contains(PutS3Object.READ_ACL_LIST));
assertTrue(pd.contains(PutS3Object.READ_USER_LIST));
assertTrue(pd.contains(PutS3Object.REGION));
assertTrue(pd.contains(PutS3Object.SECRET_KEY));
assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE));
assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(PutS3Object.TIMEOUT));
assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID));
assertTrue(pd.contains(PutS3Object.STORAGE_CLASS));
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
}
} }

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sns;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.AmazonSNSException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestPutSNS {
private TestRunner runner = null;
private PutSNS mockPutSNS = null;
private AmazonSNSClient actualSNSClient = null;
private AmazonSNSClient mockSNSClient = null;
@Before
public void setUp() {
mockSNSClient = Mockito.mock(AmazonSNSClient.class);
mockPutSNS = new PutSNS() {
protected AmazonSNSClient getClient() {
actualSNSClient = client;
return mockSNSClient;
}
};
runner = TestRunners.newTestRunner(mockPutSNS);
}
@Test
public void testPublish() throws IOException {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutSNS.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1");
runner.setProperty(PutSNS.SUBJECT, "${eval.subject}");
assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "1.txt");
ffAttributes.put("eval.subject", "test-subject");
runner.enqueue("Test Message Content", ffAttributes);
PublishResult mockPublishResult = new PublishResult();
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult);
runner.run();
ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
PublishRequest request = captureRequest.getValue();
assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1", request.getTopicArn());
assertEquals("Test Message Content", request.getMessage());
assertEquals("test-subject", request.getSubject());
assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue());
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt");
}
@Test
public void testPublishFailure() throws IOException {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1");
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "1.txt");
runner.enqueue("Test Message Content", ffAttributes);
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenThrow(new AmazonSNSException("Fail"));
runner.run();
ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
runner.assertAllFlowFilesTransferred(PutSNS.REL_FAILURE, 1);
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sqs;
import java.util.List;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class ITGetSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testSimpleGet() {
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}
}
@Test
public void testSimpleGetWithEL() {
System.setProperty("test-account-property", "100515378163");
System.setProperty("test-queue-property", "test-queue-000000000");
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/${test-account-property}/${test-queue-property}");
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}
}
@Test
public void testSimpleGetUsingCredentialsProviderService() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
String queueUrl = "Add queue url here";
runner.setProperty(GetSQS.QUEUE_URL, queueUrl);
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
runner.addControllerService("awsCredentialsProvider", serviceImpl);
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
runner.enableControllerService(serviceImpl);
runner.assertValid(serviceImpl);
runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sqs;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class ITPutSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
@Test
public void testSimplePutUsingCredentialsProviderService() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
String queueUrl = "Add queue url here";
runner.setProperty(PutSQS.QUEUE_URL, queueUrl);
runner.setValidateExpressionUsage(false);
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
runner.addControllerService("awsCredentialsProvider", serviceImpl);
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
runner.enableControllerService(serviceImpl);
runner.assertValid(serviceImpl);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS);
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sqs;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
public class TestDeleteSQS {
private TestRunner runner = null;
private DeleteSQS mockDeleteSQS = null;
private AmazonSQSClient actualSQSClient = null;
private AmazonSQSClient mockSQSClient = null;
@Before
public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
mockDeleteSQS = new DeleteSQS() {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT);
}
protected AmazonSQSClient getClient() {
actualSQSClient = client;
return mockSQSClient;
}
};
runner = TestRunners.newTestRunner(mockDeleteSQS);
}
@Test
public void testDeleteSingleMessage() {
runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "1.txt");
ffAttributes.put("sqs.receipt.handle", "test-receipt-handle-1");
runner.enqueue("TestMessageBody", ffAttributes);
runner.assertValid();
runner.run(1);
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl());
assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle());
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
}
@Test
public void testDeleteException() {
runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
final Map<String, String> ff1Attributes = new HashMap<>();
ff1Attributes.put("filename", "1.txt");
ff1Attributes.put("sqs.receipt.handle", "test-receipt-handle-1");
runner.enqueue("TestMessageBody1", ff1Attributes);
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class)))
.thenThrow(new AmazonSQSException("TestFail"));
runner.assertValid();
runner.run(1);
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_FAILURE, 1);
}
}

View File

@ -18,79 +18,134 @@ package org.apache.nifi.processors.aws.sqs;
import java.util.List; import java.util.List;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
public class TestGetSQS { public class TestGetSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; private TestRunner runner = null;
private GetSQS mockGetSQS = null;
private AmazonSQSClient actualSQSClient = null;
private AmazonSQSClient mockSQSClient = null;
@Test @Before
public void testSimpleGet() { public void setUp() {
final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); mockSQSClient = Mockito.mock(AmazonSQSClient.class);
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); mockGetSQS = new GetSQS() {
runner.setProperty(GetSQS.TIMEOUT, "30 secs"); protected AmazonSQSClient getClient() {
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); actualSQSClient = client;
return mockSQSClient;
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
} }
};
runner = TestRunners.newTestRunner(mockGetSQS);
} }
@Test @Test
public void testSimpleGetWithEL() { public void testGetMessageNoAutoDelete() {
System.setProperty("test-account-property", "100515378163"); runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
System.setProperty("test-queue-property", "test-queue-000000000"); runner.setProperty(GetSQS.AUTO_DELETE, "false");
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); Message message1 = new Message();
runner.setProperty(GetSQS.TIMEOUT, "30 secs"); message1.setBody("TestMessage1");
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/${test-account-property}/${test-queue-property}"); message1.addAttributesEntry("attrib-key-1", "attrib-value-1");
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setStringValue("msg-attrib-value-1");
message1.addMessageAttributesEntry("msg-attrib-key-1", messageAttributeValue);
message1.setMD5OfBody("test-md5-hash-1");
message1.setMessageId("test-message-id-1");
message1.setReceiptHandle("test-receipt-handle-1");
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
.withMessages(message1);
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
runner.run(1); runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); ArgumentCaptor<ReceiveMessageRequest> captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
for (final MockFlowFile mff : flowFiles) { Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture());
System.out.println(mff.getAttributes()); ReceiveMessageRequest request = captureRequest.getValue();
System.out.println(new String(mff.toByteArray())); assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
} Mockito.verify(mockSQSClient, Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class));
runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1");
ff0.assertAttributeEquals("sqs.msg-attrib-key-1", "msg-attrib-value-1");
ff0.assertAttributeEquals("hash.value", "test-md5-hash-1");
ff0.assertAttributeEquals("hash.algorithm", "md5");
ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1");
ff0.assertAttributeEquals("sqs.receipt.handle", "test-receipt-handle-1");
} }
@Test @Test
public void testSimpleGetUsingCredentialsProviderService() throws Throwable { public void testGetNoMessages() {
final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
String queueUrl = "Add queue url here";
runner.setProperty(GetSQS.QUEUE_URL, queueUrl);
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
runner.addControllerService("awsCredentialsProvider", serviceImpl);
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
runner.enableControllerService(serviceImpl);
runner.assertValid(serviceImpl);
runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
runner.run(1); runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); ArgumentCaptor<ReceiveMessageRequest> captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
for (final MockFlowFile mff : flowFiles) { Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture());
System.out.println(mff.getAttributes()); ReceiveMessageRequest request = captureRequest.getValue();
System.out.println(new String(mff.toByteArray())); assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0);
} }
@Test
public void testGetMessageAndAutoDelete() {
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
runner.setProperty(GetSQS.AUTO_DELETE, "true");
Message message1 = new Message();
message1.setBody("TestMessage1");
message1.setMessageId("test-message-id-1");
message1.setReceiptHandle("test-receipt-handle-1");
Message message2 = new Message();
message2.setBody("TestMessage2");
message2.setMessageId("test-message-id-2");
message2.setReceiptHandle("test-receipt-handle-2");
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
.withMessages(message1, message2);
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
runner.run(1);
ArgumentCaptor<ReceiveMessageRequest> captureReceiveRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureReceiveRequest.capture());
ReceiveMessageRequest receiveRequest = captureReceiveRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.getQueueUrl());
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl());
assertEquals("test-message-id-1", deleteRequest.getEntries().get(0).getId());
assertEquals("test-message-id-2", deleteRequest.getEntries().get(1).getId());
runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1");
MockFlowFile ff1 = flowFiles.get(1);
ff1.assertAttributeEquals("sqs.message.id", "test-message-id-2");
} }
} }

View File

@ -17,70 +17,90 @@
package org.apache.nifi.processors.aws.sqs; package org.apache.nifi.processors.aws.sqs;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
public class TestPutSQS { public class TestPutSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; private TestRunner runner = null;
private PutSQS mockPutSQS = null;
private AmazonSQSClient actualSQSClient = null;
private AmazonSQSClient mockSQSClient = null;
@Before
public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
mockPutSQS = new PutSQS() {
protected AmazonSQSClient getClient() {
actualSQSClient = client;
return mockSQSClient;
}
};
runner = TestRunners.newTestRunner(mockPutSQS);
}
@Test @Test
public void testSimplePut() throws IOException { public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); runner.setValidateExpressionUsage(false);
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt"); attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.enqueue("TestMessageBody", attrs);
SendMessageBatchResult batchResult = new SendMessageBatchResult();
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
runner.run(1); runner.run(1);
ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody());
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
} }
@Test @Test
public void testSimplePutUsingCredentialsProviderService() throws Throwable { public void testPutException() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
String queueUrl = "Add queue url here";
runner.setProperty(PutSQS.QUEUE_URL, queueUrl);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
runner.addControllerService("awsCredentialsProvider", serviceImpl);
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
runner.enableControllerService(serviceImpl);
runner.assertValid(serviceImpl);
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt"); attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); runner.enqueue("TestMessageBody", attrs);
runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new AmazonSQSException("TestFail"));
runner.run(1); runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS); ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
for (final MockFlowFile mff : flowFiles) { Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture());
System.out.println(mff.getAttributes()); SendMessageBatchRequest request = captureRequest.getValue();
System.out.println(new String(mff.toByteArray())); assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody());
runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1);
} }
} }
}