first commit for aws lambda

corrected attributes of flow file

added shutdown + provenence calls

minor formatting and unused imports correction

removed unused property

updated to populate exception attributes in flow file

updated write attributes
This commit is contained in:
mans2singh 2016-02-10 20:45:53 -08:00
parent f44eb643dd
commit 180a90d12b
5 changed files with 459 additions and 0 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processor.ProcessContext;
@ -75,6 +76,13 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends
}
@OnShutdown
public void onShutDown() {
if ( this.client != null ) {
this.client.shutdown();
}
}
/**
* Get credentials provider using the {@link AWSCredentialsProviderService}
* @param context the process context

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.lambda;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.lambda.AWSLambdaClient;
/**
* This class provides processor the base class for invoking aws lambda
*/
public abstract class AbstractAWSLambdaProcessor extends AbstractAWSCredentialsProviderProcessor<AWSLambdaClient> {
public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_NAME = new PropertyDescriptor.Builder()
.name("Amazon Lambda Name")
.description("The Lambda Function Name")
.expressionLanguageSupported(false)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_QUALIFIER = new PropertyDescriptor.Builder()
.name("Amazon Lambda Qualifier (version)")
.description("The Lambda Function Version")
.defaultValue("$LATEST")
.expressionLanguageSupported(false)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
/**
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override
protected AWSLambdaClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials provider");
return new AWSLambdaClient(credentialsProvider, config);
}
/**
* Create client using AWSCredentails
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
protected AWSLambdaClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials");
return new AWSLambdaClient(credentials, config);
}
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.lambda;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.lambda.AWSLambdaClient;
import com.amazonaws.services.lambda.model.InvalidParameterValueException;
import com.amazonaws.services.lambda.model.InvalidRequestContentException;
import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.LogType;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import com.amazonaws.services.lambda.model.ResourceNotFoundException;
import com.amazonaws.services.lambda.model.ServiceException;
import com.amazonaws.services.lambda.model.TooManyRequestsException;
import com.amazonaws.services.lambda.model.UnsupportedMediaTypeException;
import com.amazonaws.util.Base64;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"amazon", "aws", "lambda", "put"})
@CapabilityDescription("Sends the contents to a specified Amazon Lamba Function")
@WritesAttributes({
@WritesAttribute(attribute = "aws.lambda.result.function.error", description = "Function error message in result on posting message to AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.result.status.code", description = "Status code in the result for the message when posting to AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.result.payload", description = "Payload in the result from AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.result.log", description = "Log in the result of the message posted to Lambda"),
@WritesAttribute(attribute = "aws.lambda.exception.message", description = "Exception message on invoking from AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.exception.cause", description = "Exception cause on invoking from AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.exception.error.code", description = "Exception error code on invoking from AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.exception.request.id", description = "Exception request id on invoking from AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.exception.status.code", description = "Exception status code on invoking from AWS Lambda"),
@WritesAttribute(attribute = "aws.lambda.exception.error.type", description = "Exception error type on invoking from AWS Lambda")
})
public class PutLambda extends AbstractAWSLambdaProcessor {
/**
* Lambda result function error message
*/
public static final String AWS_LAMBDA_RESULT_FUNCTION_ERROR = "aws.lambda.result.function.error";
/**
* Lambda response status code
*/
public static final String AWS_LAMBDA_RESULT_STATUS_CODE = "aws.lambda.result.status.code";
/**
* Lambda response log tail (4kb)
*/
public static final String AWS_LAMBDA_RESULT_LOG = "aws.lambda.result.log";
/**
* Lambda payload in response
*/
public static final String AWS_LAMBDA_RESULT_PAYLOAD = "aws.lambda.result.payload";
/**
* Lambda exception field
*/
public static final String AWS_LAMBDA_EXCEPTION_MESSAGE = "aws.lambda.exception.message";
/**
* Lambda exception field
*/
public static final String AWS_LAMBDA_EXCEPTION_CAUSE = "aws.lambda.exception.cause";
/**
* Lambda exception field
*/
public static final String AWS_LAMBDA_EXCEPTION_ERROR_CODE = "aws.lambda.exception.error.code";
/**
* Lambda exception field
*/
public static final String AWS_LAMBDA_EXCEPTION_REQUEST_ID = "aws.lambda.exception.request.id";
/**
* Lambda exception field
*/
public static final String AWS_LAMBDA_EXCEPTION_STATUS_CODE = "aws.lambda.exception.status.code";
/**
* Lambda exception field
*/
public static final String AWS_LAMBDA_EXCEPTION_ERROR_TYPE = "aws.lambda.exception.error.type";
/**
* Max request body size
*/
public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String functionName = context.getProperty(AWS_LAMBDA_FUNCTION_NAME).getValue();
final String qualifier = context.getProperty(AWS_LAMBDA_FUNCTION_QUALIFIER).getValue();
// Max size of message is 6 MB
if ( flowFile.getSize() > MAX_REQUEST_SIZE) {
getLogger().error("Max size for request body is 6mb but was {} for flow file {} for function {}",
new Object[]{flowFile.getSize(), flowFile, functionName});
session.transfer(flowFile, REL_FAILURE);
return;
}
final AWSLambdaClient client = getClient();
try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
InvokeRequest invokeRequest = new InvokeRequest()
.withFunctionName(functionName)
.withLogType(LogType.Tail).withInvocationType(InvocationType.RequestResponse)
.withPayload(ByteBuffer.wrap(baos.toByteArray()))
.withQualifier(qualifier);
long startTime = System.nanoTime();
InvokeResult result = client.invoke(invokeRequest);
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_STATUS_CODE, result.getStatusCode().toString());
if ( !StringUtils.isBlank(result.getLogResult() )) {
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_LOG, new String(Base64.decode(result.getLogResult()),Charset.defaultCharset()));
}
if ( result.getPayload() != null ) {
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_PAYLOAD, new String(result.getPayload().array(),Charset.defaultCharset()));
}
if ( ! StringUtils.isBlank(result.getFunctionError()) ){
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_FUNCTION_ERROR, result.getFunctionError());
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
final long totalTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
session.getProvenanceReporter().send(flowFile, functionName, totalTimeMillis);
}
} catch (final InvalidRequestContentException
| InvalidParameterValueException
| RequestTooLargeException
| ResourceNotFoundException
| UnsupportedMediaTypeException unrecoverableException) {
getLogger().error("Failed to invoke lambda {} with unrecoverable exception {} for flow file {}",
new Object[]{functionName, unrecoverableException, flowFile});
flowFile = populateExceptionAttributes(session, flowFile, unrecoverableException);
session.transfer(flowFile, REL_FAILURE);
} catch (final ServiceException | TooManyRequestsException exception) {
getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}, therefore penalizing flowfile",
new Object[]{functionName, exception, flowFile});
flowFile = populateExceptionAttributes(session, flowFile, exception);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
context.yield();
} catch (final Exception exception) {
getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}",
new Object[]{functionName, exception, flowFile});
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
/**
* Populate exception attributes in the flow file
* @param session process session
* @param flowFile the flow file
* @param exception exception thrown during invocation
* @return FlowFile the updated flow file
*/
private FlowFile populateExceptionAttributes(final ProcessSession session, FlowFile flowFile,
final AmazonServiceException exception) {
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_ERROR_CODE, exception.getErrorCode());
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_REQUEST_ID, exception.getRequestId());
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_STATUS_CODE, Integer.toString(exception.getStatusCode()));
if ( exception.getCause() != null )
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_CAUSE, exception.getCause().getMessage());
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_ERROR_TYPE, exception.getErrorType().toString());
flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
return flowFile;
}
}

View File

@ -19,3 +19,4 @@ org.apache.nifi.processors.aws.sns.PutSNS
org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS
org.apache.nifi.processors.aws.sqs.DeleteSQS
org.apache.nifi.processors.aws.lambda.PutLambda

View File

@ -0,0 +1,146 @@
package org.apache.nifi.processors.aws.lambda;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.List;
import org.apache.nifi.processors.aws.s3.FetchS3Object;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* This test contains both unit and integration test (integration tests are ignored by default)
*/
public class ITPutLambdaTest {
private TestRunner runner;
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.ACCESS_KEY, "abcd");
runner.setProperty(PutLambda.SECRET_KEY, "secret key");
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "functionName");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testSizeGreaterThan6MB() throws Exception {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
runner.assertValid();
byte [] largeInput = new byte[6000001];
for (int i = 0; i < 6000001; i++) {
largeInput[i] = 'a';
}
runner.enqueue(largeInput);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
}
/**
* Comment out ignore for integration tests (requires creds files)
*/
@Test
@Ignore
public void testIntegrationSuccess() throws Exception {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
runner.assertValid();
runner.enqueue("{\"test\":\"hi\"}".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
final MockFlowFile out = ffs.iterator().next();
assertNull("Function error should be null " + out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
assertNotNull("log should not be null", out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
assertEquals("Status should be equal", "200",out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
}
/**
* Comment out ignore for integration tests (requires creds files)
*/
@Test
// @Ignore
public void testIntegrationClientErrorBadMessageBody() throws Exception {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
runner.assertValid();
runner.enqueue("badbod".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
final MockFlowFile out = ffs.iterator().next();
assertNull("Function error should be null since there is exception"
+ out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
assertNull("log should not be null", out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
assertEquals("Status should be equal", null,out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
assertEquals("exception error code should be equal", "InvalidRequestContentException",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_ERROR_CODE));
assertEquals("exception exception error type should be equal", "Client",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_ERROR_TYPE));
assertEquals("exception exception error code should be equal", "400",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_STATUS_CODE));
assertTrue("exception exception error message should be start with",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_MESSAGE)
.startsWith("Could not parse request body into json: Unrecognized token 'badbod': was expecting ('true', 'false' or 'null')"));
}
/**
* Comment out ignore for integration tests (requires creds files)
*/
@Test
@Ignore
public void testIntegrationFailedBadStreamName() throws Exception {
runner = TestRunners.newTestRunner(PutLambda.class);
runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "bad-function-name");
runner.assertValid();
runner.enqueue("{\"test\":\"hi\"}".getBytes());
runner.run(1);
runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
final MockFlowFile out = ffs.iterator().next();
assertNull("Function error should be null since there is exception"
+ out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
assertNull("log should not be null", out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
assertEquals("Status should be equal", null,out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
}
}