mirror of https://github.com/apache/nifi.git
NIFI-13194 Removed InvokeAWSGatewayApi Processor
This closes #8787 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
53207a20a0
commit
1592b98298
|
@ -1,630 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.wag;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||
import com.amazonaws.http.AmazonHttpClient;
|
||||
import com.amazonaws.http.HttpMethodName;
|
||||
import com.amazonaws.regions.Region;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.utils.URLEncodedUtils;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayClient;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayClientBuilder;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayRequest;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayRequestBuilder;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||
|
||||
/**
|
||||
* This class is the base class for invoking aws gateway api endpoints
|
||||
*/
|
||||
public abstract class AbstractAWSGatewayApiProcessor extends AbstractAWSCredentialsProviderProcessor<GenericApiGatewayClient> {
|
||||
|
||||
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
||||
private volatile Pattern regexAttributesToSend = null;
|
||||
private volatile AmazonHttpClient providedClient = null;
|
||||
|
||||
public final static String STATUS_CODE = "aws.gateway.api.status.code";
|
||||
public final static String STATUS_MESSAGE = "aws.gateway.api.status.message";
|
||||
public final static String RESPONSE_BODY = "aws.gateway.api.response.body";
|
||||
public final static String RESOURCE_NAME_ATTR = "aws.gateway.api.resource";
|
||||
public final static String ENDPOINT_ATTR = "aws.gateway.api.endpoint";
|
||||
public final static String TRANSACTION_ID = "aws.gateway.api.tx.id";
|
||||
public final static String EXCEPTION_CLASS = "aws.gateway.api.java.exception.class";
|
||||
public final static String EXCEPTION_MESSAGE = "aws.gateway.api.java.exception.message";
|
||||
|
||||
protected static final String REL_RESPONSE_NAME = "Response";
|
||||
protected static final String REL_SUCCESS_REQ_NAME = "Original";
|
||||
protected static final String REL_RETRY_NAME = "Retry";
|
||||
protected static final String REL_NO_RETRY_NAME = "No Retry";
|
||||
protected static final String REL_FAILURE_NAME = "Failure";
|
||||
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
|
||||
|
||||
public AbstractAWSGatewayApiProcessor() {
|
||||
}
|
||||
|
||||
public AbstractAWSGatewayApiProcessor(final AmazonHttpClient client) {
|
||||
providedClient = client;
|
||||
}
|
||||
|
||||
|
||||
// Set of flowfile attributes which we generally always ignore during
|
||||
// processing, including when converting http headers, copying attributes, etc.
|
||||
// This set includes our strings defined above as well as some standard flowfile
|
||||
// attributes.
|
||||
public static final Set<String> IGNORED_ATTRIBUTES = Set.of(STATUS_CODE,
|
||||
STATUS_MESSAGE,
|
||||
RESOURCE_NAME_ATTR,
|
||||
TRANSACTION_ID,
|
||||
CoreAttributes.UUID.key(),
|
||||
CoreAttributes.FILENAME.key(),
|
||||
CoreAttributes.PATH.key());
|
||||
|
||||
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-http-method")
|
||||
.displayName("HTTP Method")
|
||||
.description("HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS)."
|
||||
+ "Methods other than POST, PUT and PATCH will be sent without a message body.")
|
||||
.required(true)
|
||||
.defaultValue("GET")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_AWS_API_KEY = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-api-key")
|
||||
.displayName("Amazon Gateway Api Key")
|
||||
.description("The API Key")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_AWS_GATEWAY_API_ENDPOINT = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-api-endpoint")
|
||||
.displayName("Amazon Gateway Api Endpoint")
|
||||
.description("The Api Endpoint")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_RESOURCE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-resource")
|
||||
.displayName("Amazon Gateway Api ResourceName")
|
||||
.description("The Name of the Gateway API Resource")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||
|
||||
public static final PropertyDescriptor PROP_QUERY_PARAMS = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-query-parameters")
|
||||
.displayName("Query Parameters")
|
||||
.description("The query parameters for this request in the form of Name=Value separated by &")
|
||||
.displayName("Query Parameters")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-attributes-to-send")
|
||||
.displayName("Attributes to Send")
|
||||
.description(
|
||||
"Regular expression that defines which attributes to send as HTTP headers in the request. "
|
||||
+ "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. "
|
||||
+ "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression "
|
||||
+ "language will be the header value.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-put-response-body-in-attribute")
|
||||
.displayName("Put Response Body In Attribute")
|
||||
.description("If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate "
|
||||
+ "FlowFile. The attribute key to put to is determined by evaluating value of this property. ")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-always-output-response")
|
||||
.displayName("Always Output Response")
|
||||
.description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is "
|
||||
+ "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the "
|
||||
+ "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.")
|
||||
.required(false)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-penalize-no-retry")
|
||||
.displayName("Penalize on \"No Retry\"")
|
||||
.description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.")
|
||||
.required(false)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-max-attribute-length")
|
||||
.displayName("Max Length To Put In Attribute")
|
||||
.description(
|
||||
"If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" "
|
||||
+ "property or by receiving an error status code), the number of characters put to the attribute value will be at "
|
||||
+ "most this amount. This is important because attributes are held in memory and large attributes will quickly "
|
||||
+ "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. "
|
||||
+ "Consider making this smaller if able.")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("256")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-content-type")
|
||||
.displayName("Content-Type")
|
||||
.description(
|
||||
"The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. "
|
||||
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to "
|
||||
+ DEFAULT_CONTENT_TYPE)
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_SEND_BODY = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-send-message-body")
|
||||
.displayName("Send Message Body")
|
||||
.description("If true, sends the HTTP message body on POST/PUT/PATCH requests (default). If false, "
|
||||
+ "suppresses the message body and content-type header for these requests.")
|
||||
.defaultValue("true")
|
||||
.allowableValues("true", "false")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder()
|
||||
.name("aws-gateway-add-response-headers-request")
|
||||
.displayName("Add Response Headers To Request")
|
||||
.description("Enabling this property saves all the response "
|
||||
+ "headers to the original request. This may be when the response headers are needed "
|
||||
+ "but a response is not generated due to the status code received.")
|
||||
.required(false)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.required(false)
|
||||
.name(propertyDescriptorName)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.dynamic(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue,
|
||||
final String newValue) {
|
||||
if (descriptor.isDynamic()) {
|
||||
final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
|
||||
if (newValue == null) {
|
||||
newDynamicPropertyNames.remove(descriptor.getName());
|
||||
} else if (oldValue == null) { // new property
|
||||
newDynamicPropertyNames.add(descriptor.getName());
|
||||
}
|
||||
this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
|
||||
} else {
|
||||
// compile the attributes-to-send filter pattern
|
||||
if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
|
||||
if (newValue == null || newValue.isEmpty()) {
|
||||
regexAttributesToSend = null;
|
||||
} else {
|
||||
final String trimmedValue = StringUtils.trimToEmpty(newValue);
|
||||
regexAttributesToSend = Pattern.compile(trimmedValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>(3);
|
||||
results.addAll(super.customValidate(validationContext));
|
||||
final boolean querySet = validationContext.getProperty(PROP_QUERY_PARAMS).isSet();
|
||||
|
||||
if (querySet) {
|
||||
final String input = validationContext.getProperty(PROP_QUERY_PARAMS).getValue();
|
||||
// if we have expressions, we don't do further validation
|
||||
if (!(validationContext.isExpressionLanguageSupported(PROP_QUERY_PARAMS.getName())
|
||||
&& validationContext.isExpressionLanguagePresent(input))) {
|
||||
|
||||
try {
|
||||
final String evaluatedInput = validationContext.newPropertyValue(input)
|
||||
.evaluateAttributeExpressions()
|
||||
.getValue();
|
||||
// user is not expected to encode, that will be done by the aws client
|
||||
// but we may need to when validating
|
||||
final String encodedInput = URLEncoder.encode(evaluatedInput, StandardCharsets.UTF_8);
|
||||
final String url = String.format("http://www.foo.com?%s", encodedInput);
|
||||
URI.create(url).toURL();
|
||||
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(PROP_QUERY_PARAMS.getName())
|
||||
.input(input)
|
||||
.explanation("Valid URL params")
|
||||
.valid(true)
|
||||
.build());
|
||||
} catch (final Exception e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(PROP_QUERY_PARAMS.getName())
|
||||
.input(input)
|
||||
.explanation("Not a valid set of URL params")
|
||||
.valid(false)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final String method = trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue()).toUpperCase();
|
||||
|
||||
// if there are expressions do not validate
|
||||
if (!(validationContext.isExpressionLanguageSupported(PROP_METHOD.getName())
|
||||
&& validationContext.isExpressionLanguagePresent(method))) {
|
||||
try {
|
||||
HttpMethodName.fromValue(method);
|
||||
} catch (final IllegalArgumentException e) {
|
||||
results.add(new ValidationResult.Builder().subject(PROP_METHOD.getName()).input(method)
|
||||
.explanation("Unsupported METHOD")
|
||||
.valid(false).build());
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GenericApiGatewayClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config,
|
||||
final AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
|
||||
GenericApiGatewayClientBuilder builder = new GenericApiGatewayClientBuilder()
|
||||
.withCredentials(credentialsProvider).withClientConfiguration(config)
|
||||
.withEndpoint(context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue())
|
||||
.withRegion(region)
|
||||
.withApiKey(context.getProperty(PROP_AWS_API_KEY).evaluateAttributeExpressions().getValue());
|
||||
|
||||
if (providedClient != null) {
|
||||
builder = builder.withHttpClient(providedClient);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
protected GenericApiGatewayRequest configureRequest(final ProcessContext context,
|
||||
final ProcessSession session,
|
||||
final String resourcePath,
|
||||
final FlowFile requestFlowFile,
|
||||
final Map<String, String> attributes) {
|
||||
final String method = trimToEmpty(
|
||||
context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile)
|
||||
.getValue()).toUpperCase();
|
||||
final HttpMethodName methodName = HttpMethodName.fromValue(method);
|
||||
return configureRequest(context, session, resourcePath, requestFlowFile, methodName, attributes);
|
||||
}
|
||||
|
||||
protected GenericApiGatewayRequest configureRequest(final ProcessContext context,
|
||||
final ProcessSession session,
|
||||
final String resourcePath,
|
||||
final FlowFile requestFlowFile,
|
||||
final HttpMethodName methodName,
|
||||
final Map<String, String> attributes) {
|
||||
|
||||
GenericApiGatewayRequestBuilder builder = new GenericApiGatewayRequestBuilder()
|
||||
.withResourcePath(resourcePath);
|
||||
final Map<String, List<String>> parameters = getParameters(context, attributes);
|
||||
builder = builder.withParameters(parameters);
|
||||
|
||||
InputStream requestBody;
|
||||
switch (methodName) {
|
||||
case GET:
|
||||
builder = builder.withHttpMethod(HttpMethodName.GET);
|
||||
break;
|
||||
case POST:
|
||||
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||
builder = builder.withHttpMethod(HttpMethodName.POST).withBody(requestBody);
|
||||
break;
|
||||
case PUT:
|
||||
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||
builder = builder.withHttpMethod(HttpMethodName.PUT).withBody(requestBody);
|
||||
break;
|
||||
case PATCH:
|
||||
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||
builder = builder.withHttpMethod(HttpMethodName.PATCH).withBody(requestBody);
|
||||
break;
|
||||
case HEAD:
|
||||
builder = builder.withHttpMethod(HttpMethodName.HEAD);
|
||||
break;
|
||||
case DELETE:
|
||||
builder = builder.withHttpMethod(HttpMethodName.DELETE);
|
||||
break;
|
||||
case OPTIONS:
|
||||
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||
builder = builder.withHttpMethod(HttpMethodName.OPTIONS).withBody(requestBody);
|
||||
break;
|
||||
}
|
||||
|
||||
builder = setHeaderProperties(context, builder, methodName, attributes);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
protected InputStream getRequestBodyToSend(final ProcessSession session,
|
||||
final ProcessContext context,
|
||||
final FlowFile requestFlowFile) {
|
||||
|
||||
if (context.getProperty(PROP_SEND_BODY).asBoolean() && requestFlowFile != null) {
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
session.exportTo(requestFlowFile, outputStream);
|
||||
return new ByteArrayInputStream(outputStream.toByteArray());
|
||||
|
||||
} else {
|
||||
return new ByteArrayInputStream(new byte[0]);
|
||||
}
|
||||
}
|
||||
|
||||
protected GenericApiGatewayRequestBuilder setHeaderProperties(final ProcessContext context,
|
||||
GenericApiGatewayRequestBuilder requestBuilder,
|
||||
final HttpMethodName methodName,
|
||||
final Map<String, String> requestAttributes) {
|
||||
|
||||
final Map<String, String> headers = new HashMap<>();
|
||||
for (final String headerKey : dynamicPropertyNames) {
|
||||
final String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestAttributes).getValue();
|
||||
headers.put(headerKey, headerValue);
|
||||
}
|
||||
|
||||
// iterate through the flowfile attributes, adding any attribute that
|
||||
// matches the attributes-to-send pattern. if the pattern is not set
|
||||
// (it's an optional property), ignore that attribute entirely
|
||||
if (regexAttributesToSend != null) {
|
||||
final Matcher m = regexAttributesToSend.matcher("");
|
||||
for (final Map.Entry<String, String> entry : requestAttributes.entrySet()) {
|
||||
final String headerKey = trimToEmpty(entry.getKey());
|
||||
|
||||
// don't include any of the ignored attributes
|
||||
if (IGNORED_ATTRIBUTES.contains(headerKey)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// check if our attribute key matches the pattern
|
||||
// if so, include in the request as a header
|
||||
m.reset(headerKey);
|
||||
if (m.matches()) {
|
||||
String headerVal = trimToEmpty(entry.getValue());
|
||||
headers.put(headerKey, headerVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestAttributes).getValue();
|
||||
final boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
|
||||
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
|
||||
if (methodName == HttpMethodName.PUT || methodName == HttpMethodName.POST
|
||||
|| methodName == HttpMethodName.PATCH) {
|
||||
if (sendBody) {
|
||||
headers.put("Content-Type", contentType);
|
||||
}
|
||||
} else {
|
||||
headers.put("Content-Type", contentType);
|
||||
}
|
||||
|
||||
if (!headers.isEmpty()) {
|
||||
requestBuilder = requestBuilder.withHeaders(headers);
|
||||
}
|
||||
|
||||
return requestBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of Query Parameter Name to Values
|
||||
*
|
||||
* @param context ProcessContext
|
||||
* @param flowFileAttributes map of FlowFile attributes used for EL evaluation
|
||||
* @return Map of names and values
|
||||
*/
|
||||
protected Map<String, List<String>> getParameters(final ProcessContext context, Map<String, String> flowFileAttributes) {
|
||||
|
||||
if (!context.getProperty(PROP_QUERY_PARAMS).isSet()) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
final String queryString = context.getProperty(PROP_QUERY_PARAMS)
|
||||
.evaluateAttributeExpressions(flowFileAttributes).getValue();
|
||||
final List<NameValuePair> params = URLEncodedUtils
|
||||
.parse(queryString, Charsets.toCharset("UTF-8"));
|
||||
|
||||
if (params.isEmpty()) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
final Map<String, List<String>> map = new HashMap<>();
|
||||
|
||||
for (final NameValuePair nvp : params) {
|
||||
if (!map.containsKey(nvp.getName())) {
|
||||
map.put(nvp.getName(), new ArrayList<>());
|
||||
}
|
||||
map.get(nvp.getName()).add(nvp.getValue());
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
|
||||
*/
|
||||
protected Map<String, String> convertAttributesFromHeaders(final GenericApiGatewayResponse responseHttp) {
|
||||
// create a new hashmap to store the values from the connection
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
responseHttp.getHttpResponse().getAllHeaders().forEach((key, headers) -> {
|
||||
if (key == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String joined = headers.stream()
|
||||
.map(String::trim)
|
||||
.filter(str -> !str.isEmpty())
|
||||
.collect(Collectors.joining(","));
|
||||
|
||||
// we ignore any headers with no actual values (rare)
|
||||
if (StringUtils.isBlank(joined)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// put the csv into the map
|
||||
map.put(key, joined);
|
||||
});
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
protected Relationship getRelationshipForName(String name, Set<Relationship> relationships) {
|
||||
for (Relationship relationship : relationships) {
|
||||
if (relationship.getName().equals(name)) {
|
||||
return relationship;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("Unknown relationship " + name);
|
||||
}
|
||||
|
||||
protected void route(FlowFile request, final FlowFile response, final ProcessSession session,
|
||||
final ProcessContext context, final int statusCode, final Set<Relationship> relationships) {
|
||||
// check if we should yield the processor
|
||||
if (!isSuccess(statusCode) && request == null) {
|
||||
context.yield();
|
||||
}
|
||||
|
||||
// If the property to output the response flowfile regardless of status code is set then transfer it
|
||||
boolean responseSent = false;
|
||||
if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
|
||||
session.transfer(response, getRelationshipForName(REL_RESPONSE_NAME, relationships));
|
||||
responseSent = true;
|
||||
}
|
||||
|
||||
// transfer to the correct relationship
|
||||
// 2xx -> SUCCESS
|
||||
if (isSuccess(statusCode)) {
|
||||
// we have two flowfiles to transfer
|
||||
if (request != null) {
|
||||
session.transfer(request, getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships));
|
||||
}
|
||||
if (response != null && !responseSent) {
|
||||
session.transfer(response, getRelationshipForName(REL_RESPONSE_NAME, relationships));
|
||||
}
|
||||
|
||||
// 5xx -> RETRY
|
||||
} else if (statusCode / 100 == 5) {
|
||||
if (request != null) {
|
||||
request = session.penalize(request);
|
||||
session.transfer(request, getRelationshipForName(REL_RETRY_NAME, relationships));
|
||||
}
|
||||
|
||||
// 1xx, 3xx, 4xx -> NO RETRY
|
||||
} else {
|
||||
if (request != null) {
|
||||
if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) {
|
||||
request = session.penalize(request);
|
||||
}
|
||||
session.transfer(request, getRelationshipForName(REL_NO_RETRY_NAME, relationships));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected boolean isSuccess(final int statusCode) {
|
||||
return statusCode / 100 == 2;
|
||||
}
|
||||
|
||||
protected void logRequest(final ComponentLog logger, final URI endpoint, final GenericApiGatewayRequest request) {
|
||||
try {
|
||||
logger.debug("\nRequest to remote service:\n\t{}\t{}\t\n{}",
|
||||
new Object[]{endpoint.toURL().toExternalForm(), request.getHttpMethod(), getLogString(request.getHeaders())});
|
||||
} catch (final MalformedURLException e) {
|
||||
logger.debug(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected void logResponse(final ComponentLog logger, final GenericApiGatewayResponse response) {
|
||||
try {
|
||||
logger.debug("\nResponse from remote service:\n\t{}\n{}",
|
||||
new Object[]{response.getHttpResponse().getHttpRequest().getURI().toURL().toExternalForm(), getLogString(response.getHttpResponse().getAllHeaders())});
|
||||
} catch (MalformedURLException e) {
|
||||
logger.debug(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected String getLogString(final Map<String, ?> map) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
if (map != null && map.size() > 0) {
|
||||
for (Map.Entry<String, ?> entry : map.entrySet()) {
|
||||
final Object value = entry.getValue();
|
||||
sb.append("\t");
|
||||
sb.append(entry.getKey());
|
||||
sb.append(": ");
|
||||
sb.append(value);
|
||||
sb.append("\n");
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -1,401 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.wag;
|
||||
|
||||
import com.amazonaws.http.AmazonHttpClient;
|
||||
import org.apache.http.impl.EnglishReasonPhraseCatalog;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayClient;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayException;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayRequest;
|
||||
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
|
||||
|
||||
@SupportsBatching
|
||||
@InputRequirement(Requirement.INPUT_ALLOWED)
|
||||
@Tags({"Amazon", "AWS", "Client", "Gateway-API", "Rest", "http", "https"})
|
||||
@CapabilityDescription("Client for AWS Gateway API endpoint")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "mime.type", description = "The MIME Type of the flowfiles"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.status.code", description = "The status code that is returned"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.status.message", description = "The status message that is returned"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.response.body", description = "In the instance where the status code received is not a success (2xx)"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.resource", description = "The request resource"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.tx.id", description = "The transaction ID that is returned after reading the response"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.java.exception.class", description = "The Java exception class raised when the processor fails"),
|
||||
@WritesAttribute(attribute = "aws.gateway.api.java.exception.message", description = "The Java exception message raised when the processor fails"),})
|
||||
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
|
||||
description = "Send request header with a key matching the Dynamic Property Key and a value created by evaluating the Attribute Expression Language set in the value of the Dynamic Property.")
|
||||
public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
|
||||
|
||||
private static final Set<String> IDEMPOTENT_METHODS = new HashSet<>(Arrays.asList("GET", "HEAD", "OPTIONS"));
|
||||
|
||||
public static final List<PropertyDescriptor> properties = List.of(
|
||||
PROP_METHOD,
|
||||
REGION,
|
||||
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
TIMEOUT,
|
||||
PROP_RESOURCE_NAME,
|
||||
PROP_AWS_GATEWAY_API_ENDPOINT,
|
||||
PROP_AWS_API_KEY,
|
||||
PROP_ATTRIBUTES_TO_SEND,
|
||||
PROP_PUT_OUTPUT_IN_ATTRIBUTE,
|
||||
PROP_CONTENT_TYPE,
|
||||
PROP_SEND_BODY,
|
||||
PROP_OUTPUT_RESPONSE_REGARDLESS,
|
||||
PROP_PENALIZE_NO_RETRY,
|
||||
PROP_QUERY_PARAMS,
|
||||
PROP_PUT_ATTRIBUTE_MAX_LENGTH,
|
||||
PROP_ADD_HEADERS_TO_REQUEST,
|
||||
PROXY_CONFIGURATION_SERVICE);
|
||||
|
||||
|
||||
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
|
||||
.name(REL_SUCCESS_REQ_NAME)
|
||||
.description("The original FlowFile will be routed upon success (2xx status codes). It will have new "
|
||||
+ "attributes detailing the success of the request.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_RESPONSE = new Relationship.Builder()
|
||||
.name(REL_RESPONSE_NAME)
|
||||
.description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response "
|
||||
+ "Regardless' property is true then the response will be sent to this relationship regardless of "
|
||||
+ "the status code received.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_RETRY = new Relationship.Builder()
|
||||
.name(REL_RETRY_NAME)
|
||||
.description("The original FlowFile will be routed on any status code that can be retried "
|
||||
+ "(5xx status codes). It will have new attributes detailing the request.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_NO_RETRY = new Relationship.Builder()
|
||||
.name(REL_NO_RETRY_NAME)
|
||||
.description("The original FlowFile will be routed on any status code that should NOT be retried "
|
||||
+ "(1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name(REL_FAILURE_NAME)
|
||||
.description("The original FlowFile will be routed on any type of connection failure, timeout or general "
|
||||
+ "exception. It will have new attributes detailing the request.")
|
||||
.build();
|
||||
|
||||
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE);
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
public InvokeAWSGatewayApi() {
|
||||
super();
|
||||
}
|
||||
|
||||
public InvokeAWSGatewayApi(AmazonHttpClient client) {
|
||||
super(client);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final ComponentLog logger = getLogger();
|
||||
FlowFile requestFlowFile = session.get();
|
||||
|
||||
// Checking to see if the property to put the body of the response in an attribute was set
|
||||
boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet();
|
||||
if (requestFlowFile == null) {
|
||||
final String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase();
|
||||
if ("POST".equals(request) || "PUT".equals(request) || "PATCH".equals(request)) {
|
||||
return;
|
||||
} else if (putToAttribute) {
|
||||
requestFlowFile = session.create();
|
||||
}
|
||||
}
|
||||
|
||||
// Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
|
||||
final UUID txId = UUID.randomUUID();
|
||||
FlowFile responseFlowFile = null;
|
||||
|
||||
try {
|
||||
final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
|
||||
|
||||
final String resourceName = context.getProperty(PROP_RESOURCE_NAME).getValue();
|
||||
|
||||
final GenericApiGatewayClient client = getClient(context);
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
final Map<String, String> attributes = requestFlowFile == null ? Collections.emptyMap() : requestFlowFile.getAttributes();
|
||||
final GatewayResponse gatewayResponse = invokeGateway(client, context, session, requestFlowFile, attributes, logger);
|
||||
|
||||
final GenericApiGatewayResponse response = gatewayResponse.response;
|
||||
final GenericApiGatewayException exception = gatewayResponse.exception;
|
||||
final int statusCode = gatewayResponse.statusCode;
|
||||
|
||||
final String endpoint = context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
|
||||
final boolean outputRegardless = context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean();
|
||||
|
||||
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute || outputRegardless);
|
||||
boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
|
||||
boolean bodyExists = response != null && response.getBody() != null;
|
||||
|
||||
final String statusExplanation;
|
||||
if (exception != null) {
|
||||
statusExplanation = EnglishReasonPhraseCatalog.INSTANCE.getReason(statusCode, null);
|
||||
} else {
|
||||
statusExplanation = response.getHttpResponse().getStatusText();
|
||||
}
|
||||
|
||||
// Create a map of the status attributes that are always written to the request and response FlowFiles
|
||||
final Map<String, String> statusAttributes = new HashMap<>();
|
||||
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
|
||||
statusAttributes.put(STATUS_MESSAGE, statusExplanation);
|
||||
statusAttributes.put(ENDPOINT_ATTR, client.getEndpointPrefix());
|
||||
statusAttributes.put(RESOURCE_NAME_ATTR, resourceName);
|
||||
statusAttributes.put(TRANSACTION_ID, txId.toString());
|
||||
|
||||
if (outputBodyToResponseContent) {
|
||||
/*
|
||||
* If successful and putting to response flowfile, store the response body as the flowfile payload
|
||||
* we include additional flowfile attributes including the response headers and the status codes.
|
||||
*/
|
||||
|
||||
// clone the flowfile to capture the response
|
||||
if (requestFlowFile != null) {
|
||||
responseFlowFile = session.create(requestFlowFile);
|
||||
// write attributes to request flowfile
|
||||
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
|
||||
// If the property to add the response headers to the request flowfile is true then add them
|
||||
if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean()) {
|
||||
// write the response headers as attributes
|
||||
// this will overwrite any existing flowfile attributes
|
||||
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(response));
|
||||
}
|
||||
} else {
|
||||
responseFlowFile = session.create();
|
||||
}
|
||||
|
||||
// write attributes to response flowfile
|
||||
responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
|
||||
|
||||
// write the response headers as attributes
|
||||
// this will overwrite any existing flowfile attributes
|
||||
if (response != null) {
|
||||
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(response));
|
||||
} else {
|
||||
responseFlowFile = session.putAllAttributes(responseFlowFile, exception.getHttpHeaders());
|
||||
}
|
||||
// transfer the message body to the payload
|
||||
// can potentially be null in edge cases
|
||||
if (bodyExists) {
|
||||
final List<String> contentTypes = response.getHttpResponse().getHeaderValues("Content-Type");
|
||||
if (contentTypes != null && !contentTypes.isEmpty()) {
|
||||
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentTypes.get(0).trim());
|
||||
}
|
||||
|
||||
responseFlowFile = session.importFrom(new ByteArrayInputStream(response.getBody().getBytes()), responseFlowFile);
|
||||
|
||||
// emit provenance event
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
if (requestFlowFile != null) {
|
||||
session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis);
|
||||
} else {
|
||||
session.getProvenanceReporter().receive(responseFlowFile, endpoint, millis);
|
||||
}
|
||||
} else if (exception != null) {
|
||||
final String contentType = "application/json";
|
||||
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentType.trim());
|
||||
responseFlowFile = session.importFrom(new ByteArrayInputStream(exception.getRawResponse()), responseFlowFile);
|
||||
|
||||
// emit provenance event
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
if (requestFlowFile != null) {
|
||||
session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis);
|
||||
} else {
|
||||
session.getProvenanceReporter().receive(responseFlowFile, endpoint, millis);
|
||||
}
|
||||
}
|
||||
}
|
||||
// if not successful and request flowfile is not null, store the response body into a flowfile attribute
|
||||
if (outputBodyToRequestAttribute) {
|
||||
String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue();
|
||||
if (attributeKey == null) {
|
||||
attributeKey = RESPONSE_BODY;
|
||||
}
|
||||
final byte[] outputBuffer;
|
||||
int size = 0;
|
||||
outputBuffer = new byte[maxAttributeSize];
|
||||
if (bodyExists) {
|
||||
size = StreamUtils.fillBuffer(new ByteArrayInputStream(response.getBody().getBytes()), outputBuffer, false);
|
||||
} else if (exception != null && exception.getRawResponse() != null && exception.getRawResponse().length > 0) {
|
||||
size = StreamUtils.fillBuffer(new ByteArrayInputStream(exception.getRawResponse()), outputBuffer, false);
|
||||
}
|
||||
|
||||
if (size > 0) {
|
||||
String bodyString = new String(outputBuffer, 0, size, StandardCharsets.UTF_8);
|
||||
requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
|
||||
}
|
||||
|
||||
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
|
||||
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().modifyAttributes(requestFlowFile, String
|
||||
.format("The %s has been added. The value of which is the body of a http call to %s%s. It took %s millis,", attributeKey, endpoint, resourceName, millis));
|
||||
}
|
||||
|
||||
route(requestFlowFile, responseFlowFile, session, context, statusCode, getRelationships());
|
||||
} catch (final Exception e) {
|
||||
// penalize or yield
|
||||
if (requestFlowFile != null) {
|
||||
logger.error("Routing to {} due to exception: {}", REL_FAILURE.getName(), e, e);
|
||||
requestFlowFile = session.penalize(requestFlowFile);
|
||||
requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName());
|
||||
requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_MESSAGE, e.getMessage());
|
||||
// transfer original to failure
|
||||
session.transfer(requestFlowFile, getRelationshipForName(REL_FAILURE_NAME, getRelationships()));
|
||||
} else {
|
||||
logger.error("Yielding processor due to exception encountered as a source processor: {}", e);
|
||||
context.yield();
|
||||
}
|
||||
|
||||
// cleanup response flowfile, if applicable
|
||||
try {
|
||||
if (responseFlowFile != null) {
|
||||
session.remove(responseFlowFile);
|
||||
}
|
||||
} catch (final Exception e1) {
|
||||
logger.error("Could not cleanup response flowfile due to exception: {}", e1, e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
|
||||
final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes));
|
||||
|
||||
final String method = context.getProperty(PROP_METHOD).getValue();
|
||||
|
||||
if (!IDEMPOTENT_METHODS.contains(method)) {
|
||||
return results;
|
||||
}
|
||||
|
||||
final String endpoint = context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
|
||||
final String resource = context.getProperty(PROP_RESOURCE_NAME).getValue();
|
||||
try {
|
||||
final GenericApiGatewayClient client = getClient(context);
|
||||
|
||||
final GatewayResponse gatewayResponse = invokeGateway(client, context, null, null, attributes, verificationLogger);
|
||||
|
||||
final String explanation;
|
||||
if (gatewayResponse.exception != null) {
|
||||
final String statusExplanation = EnglishReasonPhraseCatalog.INSTANCE.getReason(gatewayResponse.statusCode, null);
|
||||
explanation = String.format("Successfully invoked AWS Gateway API [%s %s/%s] with blank request body, receiving error response [%s] with status code [%s]",
|
||||
method, endpoint, resource, statusExplanation, gatewayResponse.statusCode);
|
||||
} else {
|
||||
final String statusExplanation = gatewayResponse.response.getHttpResponse().getStatusText();
|
||||
explanation = String.format("Successfully invoked AWS Gateway API [%s %s/%s] with blank request body, receiving success response [%s] with status code [%s]",
|
||||
method, endpoint, resource, statusExplanation, gatewayResponse.statusCode);
|
||||
}
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.outcome(Outcome.SUCCESSFUL)
|
||||
.verificationStepName("Invoke AWS Gateway API")
|
||||
.explanation(explanation)
|
||||
.build());
|
||||
|
||||
} catch (final Exception e) {
|
||||
verificationLogger.error("Failed to invoke AWS Gateway API " + endpoint, e);
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
.outcome(Outcome.FAILED)
|
||||
.verificationStepName("Invoke AWS Gateway API")
|
||||
.explanation(String.format("Failed to invoke AWS Gateway API [%s %s/%s]: %s", method, endpoint, resource, e.getMessage()))
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private GatewayResponse invokeGateway(final GenericApiGatewayClient client, final ProcessContext context, final ProcessSession session,
|
||||
final FlowFile requestFlowFile, final Map<String, String> attributes, final ComponentLog logger) {
|
||||
final String resourceName = context.getProperty(PROP_RESOURCE_NAME).getValue();
|
||||
|
||||
final GenericApiGatewayRequest request = configureRequest(context, session, resourceName, requestFlowFile, attributes);
|
||||
|
||||
logRequest(logger, client.getEndpoint(), request);
|
||||
GenericApiGatewayResponse response = null;
|
||||
GenericApiGatewayException exception = null;
|
||||
try {
|
||||
response = client.execute(request);
|
||||
logResponse(logger, response);
|
||||
} catch (final GenericApiGatewayException gag) {
|
||||
// ERROR response codes may come back as exceptions, 404 for example
|
||||
exception = gag;
|
||||
}
|
||||
|
||||
final int statusCode;
|
||||
if (exception != null) {
|
||||
statusCode = exception.getStatusCode();
|
||||
} else {
|
||||
statusCode = response.getHttpResponse().getStatusCode();
|
||||
}
|
||||
|
||||
if (statusCode == 0) {
|
||||
throw new IllegalStateException(
|
||||
"Status code unknown, connection hasn't been attempted.");
|
||||
}
|
||||
return new GatewayResponse(response, exception, statusCode);
|
||||
}
|
||||
|
||||
private record GatewayResponse(GenericApiGatewayResponse response, GenericApiGatewayException exception, int statusCode) {
|
||||
}
|
||||
}
|
|
@ -30,7 +30,6 @@ org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
|
|||
org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
|
||||
org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream
|
||||
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
|
||||
org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi
|
||||
org.apache.nifi.processors.aws.ml.translate.StartAwsTranslateJob
|
||||
org.apache.nifi.processors.aws.ml.translate.GetAwsTranslateJobStatus
|
||||
org.apache.nifi.processors.aws.ml.polly.StartAwsPollyJob
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,54 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.wag;
|
||||
|
||||
import okhttp3.mockwebserver.MockWebServer;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestInvokeAmazonGatewayApi extends TestInvokeAWSGatewayApiCommon {
|
||||
|
||||
@BeforeEach
|
||||
public void before() throws Exception {
|
||||
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
|
||||
runner.setValidateExpressionUsage(false);
|
||||
setupControllerService();
|
||||
mockWebServer = new MockWebServer();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
runner.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaticCredentials() throws Exception {
|
||||
runner.clearProperties();
|
||||
|
||||
setupAuth();
|
||||
test200();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCredentialsFile() throws Exception {
|
||||
runner.clearProperties();
|
||||
setupCredFile();
|
||||
test200();
|
||||
}
|
||||
}
|
|
@ -1,214 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.wag;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.http.AmazonHttpClient;
|
||||
import com.amazonaws.http.apache.client.impl.SdkHttpClient;
|
||||
import com.amazonaws.internal.TokenBucket;
|
||||
import com.amazonaws.metrics.RequestMetricCollector;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.HttpVersion;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.entity.BasicHttpEntity;
|
||||
import org.apache.http.message.BasicHttpResponse;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processors.aws.testutil.AuthUtils;
|
||||
import org.apache.nifi.processors.aws.util.RegionUtilV1;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
public class TestInvokeAmazonGatewayApiMock {
|
||||
|
||||
private TestRunner runner = null;
|
||||
private SdkHttpClient mockSdkClient = null;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
mockSdkClient = Mockito.mock(SdkHttpClient.class);
|
||||
ClientConfiguration clientConfig = new ClientConfiguration();
|
||||
|
||||
InvokeAWSGatewayApi mockGetApi = new InvokeAWSGatewayApi(
|
||||
new AmazonHttpClient(clientConfig, mockSdkClient, RequestMetricCollector.NONE, new TokenBucket()));
|
||||
runner = TestRunners.newTestRunner(mockGetApi);
|
||||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
AuthUtils.enableAccessKey(runner, "awsAccessKey", "awsSecretKey");
|
||||
|
||||
runner.setProperty(RegionUtilV1.REGION, "us-east-1");
|
||||
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd");
|
||||
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/TEST");
|
||||
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT, "https://foobar.execute-api.us-east-1.amazonaws.com");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApiSimple() throws Exception {
|
||||
|
||||
HttpResponse resp = new BasicHttpResponse(
|
||||
new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
|
||||
BasicHttpEntity entity = new BasicHttpEntity();
|
||||
entity.setContent(new ByteArrayInputStream("test payload".getBytes()));
|
||||
resp.setEntity(entity);
|
||||
Mockito.doReturn(resp).when(mockSdkClient)
|
||||
.execute(any(HttpUriRequest.class), any(HttpContext.class));
|
||||
|
||||
// execute
|
||||
runner.assertValid();
|
||||
runner.run(1);
|
||||
|
||||
// check
|
||||
Mockito.verify(mockSdkClient, times(1))
|
||||
.execute(argThat(argument -> argument.getMethod().equals("GET")
|
||||
&& argument.getFirstHeader("x-api-key").getValue().equals("abcd")
|
||||
&& argument.getFirstHeader("Authorization").getValue().startsWith("AWS4")
|
||||
&& argument.getURI().toString().equals("https://foobar.execute-api.us-east-1.amazonaws.com/TEST")),
|
||||
any(HttpContext.class));
|
||||
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_SUCCESS_REQ, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE, 1);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RETRY, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_NO_RETRY, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_FAILURE, 0);
|
||||
|
||||
final List<MockFlowFile> flowFiles = runner
|
||||
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE);
|
||||
final MockFlowFile ff0 = flowFiles.get(0);
|
||||
|
||||
ff0.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
|
||||
ff0.assertContentEquals("test payload");
|
||||
ff0.assertAttributeExists(InvokeAWSGatewayApi.TRANSACTION_ID);
|
||||
ff0.assertAttributeEquals(InvokeAWSGatewayApi.RESOURCE_NAME_ATTR, "/TEST");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAttributes() throws Exception {
|
||||
|
||||
HttpResponse resp = new BasicHttpResponse(
|
||||
new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
|
||||
BasicHttpEntity entity = new BasicHttpEntity();
|
||||
entity.setContent(new ByteArrayInputStream("test payload".getBytes()));
|
||||
resp.setEntity(entity);
|
||||
Mockito.doReturn(resp).when(mockSdkClient)
|
||||
.execute(any(HttpUriRequest.class), any(HttpContext.class));
|
||||
|
||||
// add dynamic property
|
||||
runner.setProperty("dynamicHeader", "yes!");
|
||||
// set the regex
|
||||
runner.setProperty(InvokeAWSGatewayApi.PROP_ATTRIBUTES_TO_SEND, "F.*");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
|
||||
attributes.put("Foo", "Bar");
|
||||
runner.enqueue("Hello".getBytes(StandardCharsets.UTF_8), attributes);
|
||||
// execute
|
||||
runner.assertValid();
|
||||
runner.run(1);
|
||||
|
||||
Mockito.verify(mockSdkClient, times(1))
|
||||
.execute(argThat(argument -> argument.getMethod().equals("GET")
|
||||
&& argument.getFirstHeader("x-api-key").getValue().equals("abcd")
|
||||
&& argument.getFirstHeader("Authorization").getValue().startsWith("AWS4")
|
||||
&& argument.getFirstHeader("dynamicHeader").getValue().equals("yes!")
|
||||
&& argument.getFirstHeader("Foo").getValue().equals("Bar")
|
||||
&& argument.getURI().toString().equals("https://foobar.execute-api.us-east-1.amazonaws.com/TEST")),
|
||||
any(HttpContext.class));
|
||||
// check
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_SUCCESS_REQ, 1);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE, 1);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RETRY, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_NO_RETRY, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_FAILURE, 0);
|
||||
|
||||
final List<MockFlowFile> flowFiles = runner
|
||||
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE);
|
||||
final MockFlowFile ff0 = flowFiles.get(0);
|
||||
|
||||
ff0.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
|
||||
ff0.assertContentEquals("test payload");
|
||||
ff0.assertAttributeExists(InvokeAWSGatewayApi.TRANSACTION_ID);
|
||||
ff0.assertAttributeEquals(InvokeAWSGatewayApi.RESOURCE_NAME_ATTR, "/TEST");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendQueryParams() throws Exception {
|
||||
|
||||
HttpResponse resp = new BasicHttpResponse(
|
||||
new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
|
||||
BasicHttpEntity entity = new BasicHttpEntity();
|
||||
entity.setContent(new ByteArrayInputStream("test payload".getBytes()));
|
||||
resp.setEntity(entity);
|
||||
Mockito.doReturn(resp).when(mockSdkClient)
|
||||
.execute(any(HttpUriRequest.class), any(HttpContext.class));
|
||||
|
||||
// add dynamic property
|
||||
runner.setProperty("dynamicHeader", "yes!");
|
||||
runner.setProperty(InvokeAWSGatewayApi.PROP_QUERY_PARAMS, "apples=oranges&dogs=cats&filename=${filename}");
|
||||
|
||||
// set the regex
|
||||
runner.setProperty(InvokeAWSGatewayApi.PROP_ATTRIBUTES_TO_SEND, "F.*");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
|
||||
attributes.put("Foo", "Bar");
|
||||
attributes.put("filename", "testfile");
|
||||
runner.enqueue("Hello".getBytes(StandardCharsets.UTF_8), attributes);
|
||||
// execute
|
||||
runner.assertValid();
|
||||
runner.run(1);
|
||||
|
||||
Mockito.verify(mockSdkClient, times(1))
|
||||
.execute(argThat(argument -> argument.getMethod().equals("GET")
|
||||
&& argument.getFirstHeader("x-api-key").getValue().equals("abcd")
|
||||
&& argument.getFirstHeader("Authorization").getValue().startsWith("AWS4")
|
||||
&& argument.getFirstHeader("dynamicHeader").getValue().equals("yes!")
|
||||
&& argument.getFirstHeader("Foo").getValue().equals("Bar")
|
||||
&& argument.getURI().toString().equals("https://foobar.execute-api.us-east-1.amazonaws.com/TEST?filename=testfile&dogs=cats&apples=oranges")),
|
||||
any(HttpContext.class));
|
||||
// check
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_SUCCESS_REQ, 1);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE, 1);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RETRY, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_NO_RETRY, 0);
|
||||
runner.assertTransferCount(InvokeAWSGatewayApi.REL_FAILURE, 0);
|
||||
|
||||
final List<MockFlowFile> flowFiles = runner
|
||||
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE);
|
||||
final MockFlowFile ff0 = flowFiles.get(0);
|
||||
|
||||
ff0.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
|
||||
ff0.assertContentEquals("test payload");
|
||||
ff0.assertAttributeExists(InvokeAWSGatewayApi.TRANSACTION_ID);
|
||||
ff0.assertAttributeEquals(InvokeAWSGatewayApi.RESOURCE_NAME_ATTR, "/TEST");
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue