mirror of https://github.com/apache/nifi.git
Merge branch 'nifi-1509' of https://github.com/mans2singh/nifi into NIFI-1509
This commit is contained in:
commit
0f9b55afb6
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.aws;
|
package org.apache.nifi.processors.aws;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -75,6 +76,13 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@OnShutdown
|
||||||
|
public void onShutDown() {
|
||||||
|
if ( this.client != null ) {
|
||||||
|
this.client.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get credentials provider using the {@link AWSCredentialsProviderService}
|
* Get credentials provider using the {@link AWSCredentialsProviderService}
|
||||||
* @param context the process context
|
* @param context the process context
|
||||||
|
|
|
@ -64,9 +64,9 @@ import com.amazonaws.regions.Regions;
|
||||||
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
|
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||||
.description("FlowFiles are routed to success after being successfully copied to Amazon S3").build();
|
.description("FlowFiles are routed to success relationship").build();
|
||||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||||
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
|
.description("FlowFiles are routed to failure relationship").build();
|
||||||
|
|
||||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.lambda;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
import com.amazonaws.services.lambda.AWSLambdaClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is the base class for invoking aws lambda function
|
||||||
|
*/
|
||||||
|
public abstract class AbstractAWSLambdaProcessor extends AbstractAWSCredentialsProviderProcessor<AWSLambdaClient> {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_NAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("Amazon Lambda Name")
|
||||||
|
.description("The Lambda Function Name")
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_QUALIFIER = new PropertyDescriptor.Builder()
|
||||||
|
.name("Amazon Lambda Qualifier (version)")
|
||||||
|
.description("The Lambda Function Version")
|
||||||
|
.defaultValue("$LATEST")
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create client using aws credentials provider. This is the preferred way for creating clients
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected AWSLambdaClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
|
||||||
|
getLogger().info("Creating client using aws credentials provider");
|
||||||
|
|
||||||
|
return new AWSLambdaClient(credentialsProvider, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create client using AWSCredentails
|
||||||
|
*
|
||||||
|
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected AWSLambdaClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
||||||
|
getLogger().info("Creating client using aws credentials");
|
||||||
|
|
||||||
|
return new AWSLambdaClient(credentials, config);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,243 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.lambda;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.services.lambda.AWSLambdaClient;
|
||||||
|
import com.amazonaws.services.lambda.model.InvalidParameterValueException;
|
||||||
|
import com.amazonaws.services.lambda.model.InvalidRequestContentException;
|
||||||
|
import com.amazonaws.services.lambda.model.InvocationType;
|
||||||
|
import com.amazonaws.services.lambda.model.InvokeRequest;
|
||||||
|
import com.amazonaws.services.lambda.model.InvokeResult;
|
||||||
|
import com.amazonaws.services.lambda.model.LogType;
|
||||||
|
import com.amazonaws.services.lambda.model.RequestTooLargeException;
|
||||||
|
import com.amazonaws.services.lambda.model.ResourceNotFoundException;
|
||||||
|
import com.amazonaws.services.lambda.model.TooManyRequestsException;
|
||||||
|
import com.amazonaws.services.lambda.model.UnsupportedMediaTypeException;
|
||||||
|
import com.amazonaws.util.Base64;
|
||||||
|
|
||||||
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
|
@Tags({"amazon", "aws", "lambda", "put"})
|
||||||
|
@CapabilityDescription("Sends the contents to a specified Amazon Lamba Function. "
|
||||||
|
+ "The AWS credentials used for authentication must have permissions execute the Lambda function (lambda:InvokeFunction)."
|
||||||
|
+ "The FlowFile content must be JSON.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.result.function.error", description = "Function error message in result on posting message to AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.result.status.code", description = "Status code in the result for the message when posting to AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.result.payload", description = "Payload in the result from AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.result.log", description = "Log in the result of the message posted to Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.exception.message", description = "Exception message on invoking from AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.exception.cause", description = "Exception cause on invoking from AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.exception.error.code", description = "Exception error code on invoking from AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.exception.request.id", description = "Exception request id on invoking from AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.exception.status.code", description = "Exception status code on invoking from AWS Lambda"),
|
||||||
|
@WritesAttribute(attribute = "aws.lambda.exception.error.type", description = "Exception error type on invoking from AWS Lambda")
|
||||||
|
})
|
||||||
|
public class PutLambda extends AbstractAWSLambdaProcessor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda result function error message
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_RESULT_FUNCTION_ERROR = "aws.lambda.result.function.error";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda response status code
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_RESULT_STATUS_CODE = "aws.lambda.result.status.code";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda response log tail (4kb)
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_RESULT_LOG = "aws.lambda.result.log";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda payload in response
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_RESULT_PAYLOAD = "aws.lambda.result.payload";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda exception field
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_EXCEPTION_MESSAGE = "aws.lambda.exception.message";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda exception field
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_EXCEPTION_CAUSE = "aws.lambda.exception.cause";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda exception field
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_EXCEPTION_ERROR_CODE = "aws.lambda.exception.error.code";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda exception field
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_EXCEPTION_REQUEST_ID = "aws.lambda.exception.request.id";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda exception field
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_EXCEPTION_STATUS_CODE = "aws.lambda.exception.status.code";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lambda exception field
|
||||||
|
*/
|
||||||
|
public static final String AWS_LAMBDA_EXCEPTION_ERROR_TYPE = "aws.lambda.exception.error.type";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Max request body size
|
||||||
|
*/
|
||||||
|
public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
|
||||||
|
|
||||||
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
|
Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT
|
||||||
|
));
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String functionName = context.getProperty(AWS_LAMBDA_FUNCTION_NAME).getValue();
|
||||||
|
|
||||||
|
final String qualifier = context.getProperty(AWS_LAMBDA_FUNCTION_QUALIFIER).getValue();
|
||||||
|
|
||||||
|
// Max size of message is 6 MB
|
||||||
|
if ( flowFile.getSize() > MAX_REQUEST_SIZE) {
|
||||||
|
getLogger().error("Max size for request body is 6mb but was {} for flow file {} for function {}",
|
||||||
|
new Object[]{flowFile.getSize(), flowFile, functionName});
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final AWSLambdaClient client = getClient();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
session.exportTo(flowFile, baos);
|
||||||
|
|
||||||
|
InvokeRequest invokeRequest = new InvokeRequest()
|
||||||
|
.withFunctionName(functionName)
|
||||||
|
.withLogType(LogType.Tail).withInvocationType(InvocationType.RequestResponse)
|
||||||
|
.withPayload(ByteBuffer.wrap(baos.toByteArray()))
|
||||||
|
.withQualifier(qualifier);
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
|
||||||
|
InvokeResult result = client.invoke(invokeRequest);
|
||||||
|
|
||||||
|
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_STATUS_CODE, result.getStatusCode().toString());
|
||||||
|
|
||||||
|
if ( !StringUtils.isBlank(result.getLogResult() )) {
|
||||||
|
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_LOG, new String(Base64.decode(result.getLogResult()),Charset.defaultCharset()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( result.getPayload() != null ) {
|
||||||
|
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_PAYLOAD, new String(result.getPayload().array(),Charset.defaultCharset()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( ! StringUtils.isBlank(result.getFunctionError()) ){
|
||||||
|
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_FUNCTION_ERROR, result.getFunctionError());
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
} else {
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
final long totalTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
|
||||||
|
session.getProvenanceReporter().send(flowFile, functionName, totalTimeMillis);
|
||||||
|
}
|
||||||
|
} catch (final InvalidRequestContentException
|
||||||
|
| InvalidParameterValueException
|
||||||
|
| RequestTooLargeException
|
||||||
|
| ResourceNotFoundException
|
||||||
|
| UnsupportedMediaTypeException unrecoverableException) {
|
||||||
|
getLogger().error("Failed to invoke lambda {} with unrecoverable exception {} for flow file {}",
|
||||||
|
new Object[]{functionName, unrecoverableException, flowFile});
|
||||||
|
flowFile = populateExceptionAttributes(session, flowFile, unrecoverableException);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
} catch (final TooManyRequestsException retryableServiceException) {
|
||||||
|
getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}, therefore penalizing flowfile",
|
||||||
|
new Object[]{functionName, retryableServiceException, flowFile});
|
||||||
|
flowFile = populateExceptionAttributes(session, flowFile, retryableServiceException);
|
||||||
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
context.yield();
|
||||||
|
} catch (final AmazonServiceException unrecoverableServiceException) {
|
||||||
|
getLogger().error("Failed to invoke lambda {} with exception {} for flow file {} sending to fail",
|
||||||
|
new Object[]{functionName, unrecoverableServiceException, flowFile});
|
||||||
|
flowFile = populateExceptionAttributes(session, flowFile, unrecoverableServiceException);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
context.yield();
|
||||||
|
} catch (final Exception exception) {
|
||||||
|
getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}",
|
||||||
|
new Object[]{functionName, exception, flowFile});
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
context.yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populate exception attributes in the flow file
|
||||||
|
* @param session process session
|
||||||
|
* @param flowFile the flow file
|
||||||
|
* @param exception exception thrown during invocation
|
||||||
|
* @return FlowFile the updated flow file
|
||||||
|
*/
|
||||||
|
private FlowFile populateExceptionAttributes(final ProcessSession session, FlowFile flowFile,
|
||||||
|
final AmazonServiceException exception) {
|
||||||
|
Map<String,String> attributes = new HashMap<>();
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_ERROR_CODE, exception.getErrorCode());
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_REQUEST_ID, exception.getRequestId());
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_STATUS_CODE, Integer.toString(exception.getStatusCode()));
|
||||||
|
if ( exception.getCause() != null )
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_CAUSE, exception.getCause().getMessage());
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_ERROR_TYPE, exception.getErrorType().toString());
|
||||||
|
attributes.put(AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
return flowFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -19,3 +19,4 @@ org.apache.nifi.processors.aws.sns.PutSNS
|
||||||
org.apache.nifi.processors.aws.sqs.GetSQS
|
org.apache.nifi.processors.aws.sqs.GetSQS
|
||||||
org.apache.nifi.processors.aws.sqs.PutSQS
|
org.apache.nifi.processors.aws.sqs.PutSQS
|
||||||
org.apache.nifi.processors.aws.sqs.DeleteSQS
|
org.apache.nifi.processors.aws.sqs.DeleteSQS
|
||||||
|
org.apache.nifi.processors.aws.lambda.PutLambda
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.lambda;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test contains both unit and integration test (integration tests are ignored by default)
|
||||||
|
*/
|
||||||
|
public class ITPutLambda {
|
||||||
|
|
||||||
|
private TestRunner runner;
|
||||||
|
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutLambda.class);
|
||||||
|
runner.setProperty(PutLambda.ACCESS_KEY, "abcd");
|
||||||
|
runner.setProperty(PutLambda.SECRET_KEY, "secret key");
|
||||||
|
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "functionName");
|
||||||
|
runner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
runner = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSizeGreaterThan6MB() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutLambda.class);
|
||||||
|
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
|
||||||
|
runner.assertValid();
|
||||||
|
byte [] largeInput = new byte[6000001];
|
||||||
|
for (int i = 0; i < 6000001; i++) {
|
||||||
|
largeInput[i] = 'a';
|
||||||
|
}
|
||||||
|
runner.enqueue(largeInput);
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comment out ignore for integration tests (requires creds files)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testIntegrationSuccess() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutLambda.class);
|
||||||
|
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("{\"test\":\"hi\"}".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutLambda.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutLambda.REL_SUCCESS);
|
||||||
|
final MockFlowFile out = ffs.iterator().next();
|
||||||
|
assertNull("Function error should be null " + out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
|
||||||
|
assertNotNull("log should not be null", out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
|
||||||
|
assertEquals("Status should be equal", "200",out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comment out ignore for integration tests (requires creds files)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testIntegrationClientErrorBadMessageBody() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutLambda.class);
|
||||||
|
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("badbod".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutLambda.REL_FAILURE);
|
||||||
|
final MockFlowFile out = ffs.iterator().next();
|
||||||
|
assertNull("Function error should be null since there is exception"
|
||||||
|
+ out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
|
||||||
|
assertNull("log should not be null", out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
|
||||||
|
assertEquals("Status should be equal", null,out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
|
||||||
|
assertEquals("exception error code should be equal", "InvalidRequestContentException",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_ERROR_CODE));
|
||||||
|
assertEquals("exception exception error type should be equal", "Client",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_ERROR_TYPE));
|
||||||
|
assertEquals("exception exception error code should be equal", "400",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_STATUS_CODE));
|
||||||
|
assertTrue("exception exception error message should be start with",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_MESSAGE)
|
||||||
|
.startsWith("Could not parse request body into json: Unrecognized token 'badbod': was expecting ('true', 'false' or 'null')"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comment out ignore for integration tests (requires creds files)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testIntegrationFailedBadStreamName() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(PutLambda.class);
|
||||||
|
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "bad-function-name");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.enqueue("{\"test\":\"hi\"}".getBytes());
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutLambda.REL_FAILURE);
|
||||||
|
final MockFlowFile out = ffs.iterator().next();
|
||||||
|
assertNull("Function error should be null since there is exception"
|
||||||
|
+ out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
|
||||||
|
assertNull("log should not be null", out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
|
||||||
|
assertEquals("Status should be equal", null,out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue