mirror of https://github.com/apache/nifi.git
NIFI-5022 This closes #2588. InvokeAWSGatewayApi processor
per review, simplify header building and remove new map Per review: - removed apache headers from 3rd party files per https://www.apache.org/legal/src-headers.html#3party - referenced commit/repo in notice file - created rat configuration to account for files without headers Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
e3b0949b6b
commit
946c2fe6b3
25
NOTICE
25
NOTICE
|
@ -64,4 +64,27 @@ This includes derived works from Apache Solr available under Apache Software Lic
|
||||||
|
|
||||||
This includes derived works from Apache Hadoop available under Apache Software License V2. Portions of the code found in
|
This includes derived works from Apache Hadoop available under Apache Software License V2. Portions of the code found in
|
||||||
https://github.com/apache/hadoop/blob/release-2.7.3-RC2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
|
https://github.com/apache/hadoop/blob/release-2.7.3-RC2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
|
||||||
The code can be found nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabUser.java
|
The code can be found nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KeytabUser.java
|
||||||
|
This includes derived works from apigateway-generic-java-sdk (ASLv2 licenced) project (https://github.com/rpgreen/apigateway-generic-java-sdk):
|
||||||
|
The derived work is adapted from
|
||||||
|
main/ca/ryangreen/apigateway/generic/
|
||||||
|
GenericApiGatewayClient.java
|
||||||
|
GenericApiGatewayClientBuilder.java
|
||||||
|
GenericApiGatewayException.java
|
||||||
|
GenericApiGatewayRequest.java
|
||||||
|
GenericApiGatewayRequestBuilder.java
|
||||||
|
test/ca/ryangreen/apigateway/generic/
|
||||||
|
GenericApiGatewayClientTest.java
|
||||||
|
LambdaMatcher.java
|
||||||
|
and can be found in the directories:
|
||||||
|
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/../wag/client/
|
||||||
|
GenericApiGatewayClient.java
|
||||||
|
GenericApiGatewayClientBuilder.java
|
||||||
|
GenericApiGatewayException.java
|
||||||
|
GenericApiGatewayRequest.java
|
||||||
|
GenericApiGatewayRequestBuilder.java
|
||||||
|
Validate.java
|
||||||
|
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/../wag/
|
||||||
|
RequestMatcher.java
|
||||||
|
GetAWSGatewayApiTest.java
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,11 @@
|
||||||
<groupId>com.amazonaws</groupId>
|
<groupId>com.amazonaws</groupId>
|
||||||
<artifactId>aws-java-sdk-sqs</artifactId>
|
<artifactId>aws-java-sdk-sqs</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-io</groupId>
|
||||||
|
<artifactId>commons-io</artifactId>
|
||||||
|
<version>2.6</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-lang3</artifactId>
|
<artifactId>commons-lang3</artifactId>
|
||||||
|
@ -81,5 +86,19 @@
|
||||||
<artifactId>nifi-proxy-configuration-api</artifactId>
|
<artifactId>nifi-proxy-configuration-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.rat</groupId>
|
||||||
|
<artifactId>apache-rat-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<excludes>
|
||||||
|
<!-- 3rd party APL 2.0 code brought in, should not have headers -->
|
||||||
|
<!--https://www.apache.org/legal/src-headers.html#3party-->
|
||||||
|
<exclude>src/main/java/org/apache/nifi/processors/aws/wag/client/*.java</exclude>
|
||||||
|
</excludes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -156,11 +156,11 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
||||||
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
|
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
|
||||||
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
|
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
|
||||||
|
|
||||||
private static AllowableValue createAllowableValue(final Regions region) {
|
protected static AllowableValue createAllowableValue(final Regions region) {
|
||||||
return new AllowableValue(region.getName(), AWSRegions.getRegionDisplayName(region.getName()));
|
return new AllowableValue(region.getName(), AWSRegions.getRegionDisplayName(region.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static AllowableValue[] getAvailableRegions() {
|
protected static AllowableValue[] getAvailableRegions() {
|
||||||
final List<AllowableValue> values = new ArrayList<>();
|
final List<AllowableValue> values = new ArrayList<>();
|
||||||
for (final Regions region : Regions.values()) {
|
for (final Regions region : Regions.values()) {
|
||||||
values.add(createAllowableValue(region));
|
values.add(createAllowableValue(region));
|
||||||
|
|
|
@ -0,0 +1,675 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||||
|
import com.amazonaws.http.AmazonHttpClient;
|
||||||
|
import com.amazonaws.http.HttpMethodName;
|
||||||
|
import com.amazonaws.regions.Region;
|
||||||
|
import com.amazonaws.regions.Regions;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.net.URLEncoder;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import org.apache.commons.io.Charsets;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.http.NameValuePair;
|
||||||
|
import org.apache.http.client.utils.URLEncodedUtils;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
||||||
|
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayClient;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayClientBuilder;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayRequest;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayRequestBuilder;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is the base class for invoking aws gateway api endpoints
|
||||||
|
*/
|
||||||
|
public abstract class AbstractAWSGatewayApiProcessor extends
|
||||||
|
AbstractAWSCredentialsProviderProcessor<GenericApiGatewayClient> {
|
||||||
|
|
||||||
|
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
||||||
|
private volatile Pattern regexAttributesToSend = null;
|
||||||
|
private volatile AmazonHttpClient providedClient = null;
|
||||||
|
|
||||||
|
public final static String STATUS_CODE = "aws.gateway.api.status.code";
|
||||||
|
public final static String STATUS_MESSAGE = "aws.gateway.api.status.message";
|
||||||
|
public final static String RESPONSE_BODY = "aws.gateway.api.response.body";
|
||||||
|
public final static String RESOURCE_NAME_ATTR = "aws.gateway.api.resource";
|
||||||
|
public final static String ENDPOINT_ATTR = "aws.gateway.api.endpoint";
|
||||||
|
public final static String TRANSACTION_ID = "aws.gateway.api.tx.id";
|
||||||
|
public final static String EXCEPTION_CLASS = "aws.gateway.api.java.exception.class";
|
||||||
|
public final static String EXCEPTION_MESSAGE = "aws.gateway.api.java.exception.message";
|
||||||
|
|
||||||
|
protected static final String REL_RESPONSE_NAME = "Response";
|
||||||
|
protected static final String REL_SUCCESS_REQ_NAME = "Original";
|
||||||
|
protected static final String REL_RETRY_NAME = "Retry";
|
||||||
|
protected static final String REL_NO_RETRY_NAME = "No Retry";
|
||||||
|
protected static final String REL_FAILURE_NAME = "Failure";
|
||||||
|
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
|
||||||
|
|
||||||
|
public AbstractAWSGatewayApiProcessor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractAWSGatewayApiProcessor(AmazonHttpClient client) {
|
||||||
|
providedClient = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Set of flowfile attributes which we generally always ignore during
|
||||||
|
// processing, including when converting http headers, copying attributes, etc.
|
||||||
|
// This set includes our strings defined above as well as some standard flowfile
|
||||||
|
// attributes.
|
||||||
|
public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(
|
||||||
|
Arrays.asList(STATUS_CODE, STATUS_MESSAGE, RESOURCE_NAME_ATTR, TRANSACTION_ID, "uuid",
|
||||||
|
"filename", "path")));
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-http-method")
|
||||||
|
.displayName("HTTP Method")
|
||||||
|
.description(
|
||||||
|
"HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS)."
|
||||||
|
+ "Methods other than POST, PUT and PATCH will be sent without a message body.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("GET")
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.addValidator(StandardValidators
|
||||||
|
.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_AWS_API_KEY = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-api-key")
|
||||||
|
.displayName("Amazon Gateway Api Key")
|
||||||
|
.description("The API Key")
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.sensitive(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_AWS_GATEWAY_API_ENDPOINT = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-api-endpoint")
|
||||||
|
.displayName("Amazon Gateway Api Endpoint")
|
||||||
|
.description("The Api Endpoint")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// we use our own region, because the way the base sets the region after the client is created
|
||||||
|
// resets the endpoint and breaks everything
|
||||||
|
public static final PropertyDescriptor PROP_AWS_GATEWAY_API_REGION = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-region")
|
||||||
|
.displayName("Amazon Region")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(AbstractAWSProcessor.getAvailableRegions())
|
||||||
|
.defaultValue(AbstractAWSProcessor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_RESOURCE_NAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-resource")
|
||||||
|
.displayName("Amazon Gateway Api ResourceName")
|
||||||
|
.description("The Name of the Gateway API Resource")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_QUERY_PARAMS = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-query-parameters")
|
||||||
|
.displayName("Query Parameters")
|
||||||
|
.description(
|
||||||
|
"The query parameters for this request in the form of Name=Value separated by &")
|
||||||
|
.displayName("Query Parameters")
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.addValidator(StandardValidators
|
||||||
|
.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-attributes-to-send")
|
||||||
|
.displayName("Attributes to Send")
|
||||||
|
.description(
|
||||||
|
"Regular expression that defines which attributes to send as HTTP headers in the request. "
|
||||||
|
+ "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. "
|
||||||
|
+ "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression "
|
||||||
|
+ "language will be the header value.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-put-response-body-in-attribute")
|
||||||
|
.displayName("Put Response Body In Attribute")
|
||||||
|
.description(
|
||||||
|
"If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate "
|
||||||
|
+ "FlowFile. The attribute key to put to is determined by evaluating value of this property. ")
|
||||||
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
|
||||||
|
AttributeExpression.ResultType.STRING))
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-always-output-response")
|
||||||
|
.displayName("Always Output Response")
|
||||||
|
.description(
|
||||||
|
"Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is "
|
||||||
|
+ "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the "
|
||||||
|
+ "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.")
|
||||||
|
.required(false)
|
||||||
|
.defaultValue("false")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-penalize-no-retry")
|
||||||
|
.displayName("Penalize on \"No Retry\"")
|
||||||
|
.description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" " +
|
||||||
|
"relationship.")
|
||||||
|
.required(false)
|
||||||
|
.defaultValue("false")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-max-attribute-length")
|
||||||
|
.displayName("Max Length To Put In Attribute")
|
||||||
|
.description(
|
||||||
|
"If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" "
|
||||||
|
+ "property or by receiving an error status code), the number of characters put to the attribute value will be at "
|
||||||
|
+ "most this amount. This is important because attributes are held in memory and large attributes will quickly "
|
||||||
|
+ "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. "
|
||||||
|
+ "Consider making this smaller if able.")
|
||||||
|
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||||
|
.defaultValue("256")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-content-type")
|
||||||
|
.displayName("Content-Type")
|
||||||
|
.description(
|
||||||
|
"The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. "
|
||||||
|
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to "
|
||||||
|
+ DEFAULT_CONTENT_TYPE).required(true).expressionLanguageSupported(true)
|
||||||
|
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
|
||||||
|
.addValidator(StandardValidators
|
||||||
|
.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_SEND_BODY = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-send-message-body")
|
||||||
|
.displayName("Send Message Body")
|
||||||
|
.description("If true, sends the HTTP message body on POST/PUT/PATCH requests (default). If false, "
|
||||||
|
+ "suppresses the message body and content-type header for these requests.")
|
||||||
|
.defaultValue("true")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-add-response-headers-request")
|
||||||
|
.displayName("Add Response Headers To Request")
|
||||||
|
.description("Enabling this property saves all the response "
|
||||||
|
+ "headers to the original request. This may be when the response headers are needed "
|
||||||
|
+ "but a response is not generated due to the status code received.")
|
||||||
|
.required(false)
|
||||||
|
.defaultValue("false")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-connection-timeout")
|
||||||
|
.displayName("Connection Timeout")
|
||||||
|
.description("Max wait time for connection to remote service.")
|
||||||
|
.required(false)
|
||||||
|
.defaultValue("10 secs")
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("aws-gateway-read-timeout")
|
||||||
|
.displayName("Read Timeout")
|
||||||
|
.description("Max wait time for response from remote service.")
|
||||||
|
.required(false)
|
||||||
|
.defaultValue("50 secs")
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
|
||||||
|
String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.required(false)
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
|
||||||
|
AttributeExpression.ResultType.STRING,
|
||||||
|
true))
|
||||||
|
.dynamic(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue,
|
||||||
|
final String newValue) {
|
||||||
|
if (descriptor.isDynamic()) {
|
||||||
|
final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
|
||||||
|
if (newValue == null) {
|
||||||
|
newDynamicPropertyNames.remove(descriptor.getName());
|
||||||
|
} else if (oldValue == null) { // new property
|
||||||
|
newDynamicPropertyNames.add(descriptor.getName());
|
||||||
|
}
|
||||||
|
this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
|
||||||
|
} else {
|
||||||
|
// compile the attributes-to-send filter pattern
|
||||||
|
if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
|
||||||
|
if (newValue == null || newValue.isEmpty()) {
|
||||||
|
regexAttributesToSend = null;
|
||||||
|
} else {
|
||||||
|
final String trimmedValue = StringUtils.trimToEmpty(newValue);
|
||||||
|
regexAttributesToSend = Pattern.compile(trimmedValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(
|
||||||
|
final ValidationContext validationContext) {
|
||||||
|
final List<ValidationResult> results = new ArrayList<>(3);
|
||||||
|
results.addAll(super.customValidate(validationContext));
|
||||||
|
final boolean querySet = validationContext.getProperty(PROP_QUERY_PARAMS).isSet();
|
||||||
|
|
||||||
|
if (querySet) {
|
||||||
|
String input = validationContext.getProperty(PROP_QUERY_PARAMS).getValue();
|
||||||
|
// if we have expressions, we don't do further validation
|
||||||
|
if (!(validationContext.isExpressionLanguageSupported(PROP_QUERY_PARAMS.getName())
|
||||||
|
&& validationContext.isExpressionLanguagePresent(input))) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
final String evaluatedInput = validationContext.newPropertyValue(input)
|
||||||
|
.evaluateAttributeExpressions()
|
||||||
|
.getValue();
|
||||||
|
// user is not expected to encode, that will be done by the aws client
|
||||||
|
// but we may need to when validating
|
||||||
|
final String encodedInput = URLEncoder.encode(evaluatedInput, "UTF-8");
|
||||||
|
final String url = String.format("http://www.foo.com?%s", encodedInput);
|
||||||
|
new URL(url);
|
||||||
|
results.add(new ValidationResult.Builder().subject(PROP_QUERY_PARAMS.getName())
|
||||||
|
.input(input)
|
||||||
|
.explanation("Valid URL params")
|
||||||
|
.valid(true).build());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
results.add(new ValidationResult.Builder().subject(PROP_QUERY_PARAMS.getName())
|
||||||
|
.input(input).explanation(
|
||||||
|
"Not a valid set of URL params").valid(false).build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String method = trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue())
|
||||||
|
.toUpperCase();
|
||||||
|
|
||||||
|
// if there are expressions do not validate
|
||||||
|
if (!(validationContext.isExpressionLanguageSupported(PROP_METHOD.getName())
|
||||||
|
&& validationContext.isExpressionLanguagePresent(method))) {
|
||||||
|
try {
|
||||||
|
HttpMethodName methodName = HttpMethodName.fromValue(method);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
results.add(new ValidationResult.Builder().subject(PROP_METHOD.getName()).input(method)
|
||||||
|
.explanation("Unsupported METHOD")
|
||||||
|
.valid(false).build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GenericApiGatewayClient createClient(ProcessContext context,
|
||||||
|
AWSCredentialsProvider awsCredentialsProvider,
|
||||||
|
ClientConfiguration clientConfiguration) {
|
||||||
|
|
||||||
|
GenericApiGatewayClientBuilder builder = new GenericApiGatewayClientBuilder()
|
||||||
|
.withCredentials(awsCredentialsProvider).withClientConfiguration(clientConfiguration)
|
||||||
|
.withEndpoint(context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue()).withRegion(
|
||||||
|
Region.getRegion(
|
||||||
|
Regions.fromName(context.getProperty(PROP_AWS_GATEWAY_API_REGION).getValue())));
|
||||||
|
if (context.getProperty(PROP_AWS_API_KEY).isSet()) {
|
||||||
|
builder = builder.withApiKey(context.getProperty(PROP_AWS_API_KEY).evaluateAttributeExpressions().getValue());
|
||||||
|
}
|
||||||
|
if (providedClient != null) {
|
||||||
|
builder = builder.withHttpClient(providedClient);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Deprecated
|
||||||
|
protected GenericApiGatewayClient createClient(final ProcessContext context,
|
||||||
|
final AWSCredentials credentials,
|
||||||
|
final ClientConfiguration clientConfiguration) {
|
||||||
|
return createClient(context, new AWSStaticCredentialsProvider(credentials),
|
||||||
|
clientConfiguration);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected GenericApiGatewayRequest configureRequest(final ProcessContext context,
|
||||||
|
final ProcessSession session,
|
||||||
|
final String resourcePath,
|
||||||
|
final FlowFile requestFlowFile) {
|
||||||
|
String method = trimToEmpty(
|
||||||
|
context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile)
|
||||||
|
.getValue()).toUpperCase();
|
||||||
|
HttpMethodName methodName = HttpMethodName.fromValue(method);
|
||||||
|
return configureRequest(context, session, resourcePath,requestFlowFile, methodName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected GenericApiGatewayRequest configureRequest(final ProcessContext context,
|
||||||
|
final ProcessSession session,
|
||||||
|
final String resourcePath,
|
||||||
|
final FlowFile requestFlowFile,
|
||||||
|
final HttpMethodName methodName) {
|
||||||
|
|
||||||
|
GenericApiGatewayRequestBuilder builder = new GenericApiGatewayRequestBuilder()
|
||||||
|
.withResourcePath(resourcePath);
|
||||||
|
final Map<String, List<String>> parameters = getParameters(context);
|
||||||
|
builder = builder.withParameters(parameters);
|
||||||
|
|
||||||
|
InputStream requestBody = null;
|
||||||
|
switch (methodName) {
|
||||||
|
case GET:
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.GET);
|
||||||
|
break;
|
||||||
|
case POST:
|
||||||
|
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.POST).withBody(requestBody);
|
||||||
|
break;
|
||||||
|
case PUT:
|
||||||
|
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.PUT).withBody(requestBody);
|
||||||
|
break;
|
||||||
|
case PATCH:
|
||||||
|
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.PATCH).withBody(requestBody);
|
||||||
|
break;
|
||||||
|
case HEAD:
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.HEAD);
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.DELETE);
|
||||||
|
break;
|
||||||
|
case OPTIONS:
|
||||||
|
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
|
||||||
|
builder = builder.withHttpMethod(HttpMethodName.OPTIONS).withBody(requestBody);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
builder = setHeaderProperties(context, builder, methodName, requestFlowFile);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected InputStream getRequestBodyToSend(final ProcessSession session,
|
||||||
|
final ProcessContext context,
|
||||||
|
final FlowFile requestFlowFile) {
|
||||||
|
|
||||||
|
if (context.getProperty(PROP_SEND_BODY).asBoolean() && requestFlowFile != null) {
|
||||||
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
session.exportTo(requestFlowFile, outputStream);
|
||||||
|
return new ByteArrayInputStream(outputStream.toByteArray());
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return new ByteArrayInputStream(new byte[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected GenericApiGatewayRequestBuilder setHeaderProperties(final ProcessContext context,
|
||||||
|
GenericApiGatewayRequestBuilder requestBuilder,
|
||||||
|
HttpMethodName methodName,
|
||||||
|
final FlowFile requestFlowFile) {
|
||||||
|
|
||||||
|
Map<String, String> headers = new HashMap<>();
|
||||||
|
for (String headerKey : dynamicPropertyNames) {
|
||||||
|
String headerValue = context.getProperty(headerKey)
|
||||||
|
.evaluateAttributeExpressions(requestFlowFile).getValue();
|
||||||
|
headers.put(headerKey, headerValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// iterate through the flowfile attributes, adding any attribute that
|
||||||
|
// matches the attributes-to-send pattern. if the pattern is not set
|
||||||
|
// (it's an optional property), ignore that attribute entirely
|
||||||
|
if (regexAttributesToSend != null && requestFlowFile != null) {
|
||||||
|
Map<String, String> attributes = requestFlowFile.getAttributes();
|
||||||
|
Matcher m = regexAttributesToSend.matcher("");
|
||||||
|
for (Map.Entry<String, String> entry : attributes.entrySet()) {
|
||||||
|
String headerKey = trimToEmpty(entry.getKey());
|
||||||
|
|
||||||
|
// don't include any of the ignored attributes
|
||||||
|
if (IGNORED_ATTRIBUTES.contains(headerKey)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if our attribute key matches the pattern
|
||||||
|
// if so, include in the request as a header
|
||||||
|
m.reset(headerKey);
|
||||||
|
if (m.matches()) {
|
||||||
|
String headerVal = trimToEmpty(entry.getValue());
|
||||||
|
headers.put(headerKey, headerVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String contentType = context.getProperty(PROP_CONTENT_TYPE)
|
||||||
|
.evaluateAttributeExpressions(requestFlowFile).getValue();
|
||||||
|
boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
|
||||||
|
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
|
||||||
|
if (methodName == HttpMethodName.PUT || methodName == HttpMethodName.POST
|
||||||
|
|| methodName == HttpMethodName.PATCH) {
|
||||||
|
if (sendBody) {
|
||||||
|
headers.put("Content-Type", contentType);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
headers.put("Content-Type", contentType);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!headers.isEmpty()) {
|
||||||
|
requestBuilder = requestBuilder.withHeaders(headers);
|
||||||
|
}
|
||||||
|
|
||||||
|
return requestBuilder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a map of Query Parameter Name to Values
|
||||||
|
*
|
||||||
|
* @param context ProcessContext
|
||||||
|
* @return Map of names and values
|
||||||
|
*/
|
||||||
|
protected Map<String, List<String>> getParameters(ProcessContext context) {
|
||||||
|
|
||||||
|
if (!context.getProperty(PROP_QUERY_PARAMS).isSet()) {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
final String queryString = context.getProperty(PROP_QUERY_PARAMS)
|
||||||
|
.evaluateAttributeExpressions().getValue();
|
||||||
|
List<NameValuePair> params = URLEncodedUtils
|
||||||
|
.parse(queryString, Charsets.toCharset("UTF-8"));
|
||||||
|
|
||||||
|
if (params.isEmpty()) {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, List<String>> map = new HashMap<>();
|
||||||
|
|
||||||
|
for (NameValuePair nvp : params) {
|
||||||
|
if (!map.containsKey(nvp.getName())) {
|
||||||
|
map.put(nvp.getName(), new ArrayList<>());
|
||||||
|
}
|
||||||
|
map.get(nvp.getName()).add(nvp.getValue());
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
|
||||||
|
*/
|
||||||
|
protected Map<String, String> convertAttributesFromHeaders(
|
||||||
|
GenericApiGatewayResponse responseHttp) {
|
||||||
|
// create a new hashmap to store the values from the connection
|
||||||
|
Map<String, String> map = new HashMap<>();
|
||||||
|
responseHttp.getHttpResponse().getHeaders().entrySet().forEach((entry) -> {
|
||||||
|
|
||||||
|
String key = entry.getKey();
|
||||||
|
String value = entry.getValue();
|
||||||
|
|
||||||
|
if (key == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we ignore any headers with no actual values (rare)
|
||||||
|
if (StringUtils.isBlank(value)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// put the csv into the map
|
||||||
|
map.put(key, value);
|
||||||
|
});
|
||||||
|
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected Relationship getRelationshipForName(String name, Set<Relationship> relationships) {
|
||||||
|
for (Relationship relationship : relationships) {
|
||||||
|
if (relationship.getName().equals(name)) {
|
||||||
|
return relationship;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("Unknown relationship " + name);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void route(FlowFile request, FlowFile response, ProcessSession session,
|
||||||
|
ProcessContext context, int statusCode, Set<Relationship> relationships) {
|
||||||
|
// check if we should yield the processor
|
||||||
|
if (!isSuccess(statusCode) && request == null) {
|
||||||
|
context.yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the property to output the response flowfile regardless of status code is set then transfer it
|
||||||
|
boolean responseSent = false;
|
||||||
|
if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
|
||||||
|
session.transfer(response, getRelationshipForName(REL_RESPONSE_NAME, relationships));
|
||||||
|
responseSent = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// transfer to the correct relationship
|
||||||
|
// 2xx -> SUCCESS
|
||||||
|
if (isSuccess(statusCode)) {
|
||||||
|
// we have two flowfiles to transfer
|
||||||
|
if (request != null) {
|
||||||
|
session
|
||||||
|
.transfer(request, getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships));
|
||||||
|
}
|
||||||
|
if (response != null && !responseSent) {
|
||||||
|
session
|
||||||
|
.transfer(response, getRelationshipForName(REL_RESPONSE_NAME, relationships));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5xx -> RETRY
|
||||||
|
} else if (statusCode / 100 == 5) {
|
||||||
|
if (request != null) {
|
||||||
|
request = session.penalize(request);
|
||||||
|
session.transfer(request, getRelationshipForName(REL_RETRY_NAME, relationships));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1xx, 3xx, 4xx -> NO RETRY
|
||||||
|
} else {
|
||||||
|
if (request != null) {
|
||||||
|
if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) {
|
||||||
|
request = session.penalize(request);
|
||||||
|
}
|
||||||
|
session.transfer(request, getRelationshipForName(REL_NO_RETRY_NAME, relationships));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isSuccess(int statusCode) {
|
||||||
|
return statusCode / 100 == 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void logRequest(ComponentLog logger, URI endpoint, GenericApiGatewayRequest request) {
|
||||||
|
try {
|
||||||
|
logger.debug("\nRequest to remote service:\n\t{}\t{}\t\n{}",
|
||||||
|
new Object[]{endpoint.toURL().toExternalForm(), request.getHttpMethod(), getLogString(request.getHeaders())});
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
logger.debug(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void logResponse(ComponentLog logger, GenericApiGatewayResponse response) {
|
||||||
|
try {
|
||||||
|
logger.debug("\nResponse from remote service:\n\t{}\n{}",
|
||||||
|
new Object[]{response.getHttpResponse().getHttpRequest().getURI().toURL().toExternalForm(), getLogString(response.getHttpResponse().getHeaders())});
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
logger.debug(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getLogString(Map<String, String> map) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
if(map != null && map.size() > 0) {
|
||||||
|
for (Map.Entry<String, String> entry : map.entrySet()) {
|
||||||
|
String value = entry.getValue();
|
||||||
|
sb.append("\t");
|
||||||
|
sb.append(entry.getKey());
|
||||||
|
sb.append(": ");
|
||||||
|
sb.append(value);
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,114 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.AmazonWebServiceClient;
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.DefaultRequest;
|
||||||
|
import com.amazonaws.auth.AWS4Signer;
|
||||||
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
import com.amazonaws.http.AmazonHttpClient;
|
||||||
|
import com.amazonaws.http.ExecutionContext;
|
||||||
|
import com.amazonaws.http.HttpMethodName;
|
||||||
|
import com.amazonaws.http.HttpResponseHandler;
|
||||||
|
import com.amazonaws.http.JsonResponseHandler;
|
||||||
|
import com.amazonaws.internal.auth.DefaultSignerProvider;
|
||||||
|
import com.amazonaws.protocol.json.JsonOperationMetadata;
|
||||||
|
import com.amazonaws.protocol.json.SdkStructuredPlainJsonFactory;
|
||||||
|
import com.amazonaws.regions.Region;
|
||||||
|
import com.amazonaws.transform.JsonErrorUnmarshaller;
|
||||||
|
import com.amazonaws.transform.JsonUnmarshallerContext;
|
||||||
|
import com.amazonaws.transform.Unmarshaller;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class GenericApiGatewayClient extends AmazonWebServiceClient {
|
||||||
|
private static final String API_GATEWAY_SERVICE_NAME = "execute-api";
|
||||||
|
private static final String API_KEY_HEADER = "x-api-key";
|
||||||
|
|
||||||
|
private final JsonResponseHandler<GenericApiGatewayResponse> responseHandler;
|
||||||
|
private final HttpResponseHandler<AmazonServiceException> errorResponseHandler;
|
||||||
|
private final AWSCredentialsProvider credentials;
|
||||||
|
private String apiKey;
|
||||||
|
private final AWS4Signer signer;
|
||||||
|
|
||||||
|
GenericApiGatewayClient(ClientConfiguration clientConfiguration, String endpoint, Region region,
|
||||||
|
AWSCredentialsProvider credentials, String apiKey, AmazonHttpClient httpClient) {
|
||||||
|
super(clientConfiguration);
|
||||||
|
setRegion(region);
|
||||||
|
setEndpoint(endpoint);
|
||||||
|
this.credentials = credentials;
|
||||||
|
this.apiKey = apiKey;
|
||||||
|
this.signer = new AWS4Signer();
|
||||||
|
this.signer.setServiceName(API_GATEWAY_SERVICE_NAME);
|
||||||
|
this.signer.setRegionName(region.getName());
|
||||||
|
|
||||||
|
final JsonOperationMetadata metadata = new JsonOperationMetadata().withHasStreamingSuccessResponse(false).withPayloadJson(false);
|
||||||
|
final Unmarshaller<GenericApiGatewayResponse, JsonUnmarshallerContext> responseUnmarshaller = in -> new GenericApiGatewayResponse(in.getHttpResponse());
|
||||||
|
this.responseHandler = SdkStructuredPlainJsonFactory.SDK_JSON_FACTORY.createResponseHandler(metadata, responseUnmarshaller);
|
||||||
|
JsonErrorUnmarshaller defaultErrorUnmarshaller = new JsonErrorUnmarshaller(GenericApiGatewayException.class, null) {
|
||||||
|
@Override
|
||||||
|
public AmazonServiceException unmarshall(JsonNode jsonContent) throws Exception {
|
||||||
|
return new GenericApiGatewayException(jsonContent.toString());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
this.errorResponseHandler = SdkStructuredPlainJsonFactory.SDK_JSON_FACTORY.createErrorResponseHandler(
|
||||||
|
Collections.singletonList(defaultErrorUnmarshaller), null);
|
||||||
|
|
||||||
|
if (httpClient != null) {
|
||||||
|
super.client = httpClient;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayResponse execute(GenericApiGatewayRequest request) {
|
||||||
|
return execute(request.getHttpMethod(), request.getResourcePath(), request.getHeaders(), request.getParameters(), request.getBody());
|
||||||
|
}
|
||||||
|
|
||||||
|
private GenericApiGatewayResponse execute(HttpMethodName method, String resourcePath, Map<String, String> headers, Map<String,List<String>> parameters, InputStream content) {
|
||||||
|
final ExecutionContext executionContext = buildExecutionContext();
|
||||||
|
|
||||||
|
DefaultRequest request = new DefaultRequest(API_GATEWAY_SERVICE_NAME);
|
||||||
|
request.setHttpMethod(method);
|
||||||
|
request.setContent(content);
|
||||||
|
request.setEndpoint(this.endpoint);
|
||||||
|
request.setResourcePath(resourcePath);
|
||||||
|
request.setHeaders(buildRequestHeaders(headers, apiKey));
|
||||||
|
if (parameters != null) {
|
||||||
|
request.setParameters(parameters);
|
||||||
|
}
|
||||||
|
return this.client.execute(request, responseHandler, errorResponseHandler, executionContext).getAwsResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ExecutionContext buildExecutionContext() {
|
||||||
|
final ExecutionContext executionContext = ExecutionContext.builder().withSignerProvider(
|
||||||
|
new DefaultSignerProvider(this, signer)).build();
|
||||||
|
executionContext.setCredentialsProvider(credentials);
|
||||||
|
executionContext.setSigner(signer);
|
||||||
|
return executionContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> buildRequestHeaders(Map<String, String> headers, String apiKey) {
|
||||||
|
if (headers == null) {
|
||||||
|
headers = new HashMap<>();
|
||||||
|
}
|
||||||
|
if (apiKey != null) {
|
||||||
|
headers.put(API_KEY_HEADER, apiKey);
|
||||||
|
}
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI getEndpoint() {
|
||||||
|
return this.endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getServiceNameIntern() {
|
||||||
|
return API_GATEWAY_SERVICE_NAME;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
import com.amazonaws.http.AmazonHttpClient;
|
||||||
|
import com.amazonaws.regions.Region;
|
||||||
|
|
||||||
|
public class GenericApiGatewayClientBuilder {
|
||||||
|
private String endpoint;
|
||||||
|
private Region region;
|
||||||
|
private AWSCredentialsProvider credentials;
|
||||||
|
private ClientConfiguration clientConfiguration;
|
||||||
|
private String apiKey;
|
||||||
|
private AmazonHttpClient httpClient;
|
||||||
|
|
||||||
|
public GenericApiGatewayClientBuilder withEndpoint(String endpoint) {
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayClientBuilder withRegion(Region region) {
|
||||||
|
this.region = region;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayClientBuilder withClientConfiguration(ClientConfiguration clientConfiguration) {
|
||||||
|
this.clientConfiguration = clientConfiguration;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayClientBuilder withCredentials(AWSCredentialsProvider credentials) {
|
||||||
|
this.credentials = credentials;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayClientBuilder withApiKey(String apiKey) {
|
||||||
|
this.apiKey = apiKey;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayClientBuilder withHttpClient(AmazonHttpClient client) {
|
||||||
|
this.httpClient = client;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AWSCredentialsProvider getCredentials() {
|
||||||
|
return credentials;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getApiKey() {
|
||||||
|
return apiKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmazonHttpClient getHttpClient() {
|
||||||
|
return httpClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getEndpoint() {
|
||||||
|
return endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Region getRegion() {
|
||||||
|
return region;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClientConfiguration getClientConfiguration() {
|
||||||
|
return clientConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayClient build() {
|
||||||
|
Validate.notEmpty(endpoint, "Endpoint");
|
||||||
|
Validate.notNull(region, "Region");
|
||||||
|
return new GenericApiGatewayClient(clientConfiguration, endpoint, region, credentials, apiKey, httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
|
||||||
|
public class GenericApiGatewayException extends AmazonServiceException {
|
||||||
|
public GenericApiGatewayException(String errorMessage) {
|
||||||
|
super(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayException(String errorMessage, Exception cause) {
|
||||||
|
super(errorMessage, cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.http.HttpMethodName;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class GenericApiGatewayRequest {
|
||||||
|
|
||||||
|
private final HttpMethodName httpMethod;
|
||||||
|
private final String resourcePath;
|
||||||
|
private final InputStream body;
|
||||||
|
private final Map<String, String> headers;
|
||||||
|
private final Map<String, List<String>> parameters;
|
||||||
|
|
||||||
|
public GenericApiGatewayRequest(HttpMethodName httpMethod, String resourcePath,
|
||||||
|
InputStream body, Map<String, String> headers,
|
||||||
|
Map<String, List<String>> parameters) {
|
||||||
|
this.httpMethod = httpMethod;
|
||||||
|
this.resourcePath = resourcePath;
|
||||||
|
this.body = body;
|
||||||
|
this.headers = headers;
|
||||||
|
this.parameters = parameters;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpMethodName getHttpMethod() {
|
||||||
|
return httpMethod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getResourcePath() {
|
||||||
|
return resourcePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InputStream getBody() {
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, String> getHeaders() {
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, List<String>> getParameters() {
|
||||||
|
return parameters;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.http.HttpMethodName;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class GenericApiGatewayRequestBuilder {
|
||||||
|
private HttpMethodName httpMethod;
|
||||||
|
private String resourcePath;
|
||||||
|
private InputStream body;
|
||||||
|
private Map<String, String> headers;
|
||||||
|
private Map<String, List<String>> parameters;
|
||||||
|
|
||||||
|
public GenericApiGatewayRequestBuilder withHttpMethod(HttpMethodName name) {
|
||||||
|
httpMethod = name;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayRequestBuilder withResourcePath(String path) {
|
||||||
|
resourcePath = path;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayRequestBuilder withBody(InputStream content) {
|
||||||
|
this.body = content;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayRequestBuilder withHeaders(Map<String, String> headers) {
|
||||||
|
this.headers = headers;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayRequestBuilder withParameters(Map<String,List<String>> parameters) {
|
||||||
|
this.parameters = parameters;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasBody() {
|
||||||
|
return this.body != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericApiGatewayRequest build() {
|
||||||
|
Validate.notNull(httpMethod, "HTTP method");
|
||||||
|
Validate.notEmpty(resourcePath, "Resource path");
|
||||||
|
return new GenericApiGatewayRequest(httpMethod, resourcePath, body, headers, parameters);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.http.HttpResponse;
|
||||||
|
import com.amazonaws.util.IOUtils;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class GenericApiGatewayResponse {
|
||||||
|
private final HttpResponse httpResponse;
|
||||||
|
private final String body;
|
||||||
|
|
||||||
|
public GenericApiGatewayResponse(HttpResponse httpResponse) throws IOException {
|
||||||
|
this.httpResponse = httpResponse;
|
||||||
|
if(httpResponse.getContent() != null) {
|
||||||
|
this.body = IOUtils.toString(httpResponse.getContent());
|
||||||
|
}else {
|
||||||
|
this.body = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpResponse getHttpResponse() {
|
||||||
|
return httpResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBody() {
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package org.apache.nifi.processors.aws.wag.client;
|
||||||
|
|
||||||
|
import com.amazonaws.util.StringUtils;
|
||||||
|
|
||||||
|
public class Validate {
|
||||||
|
public static void notEmpty(String in, String fieldName) {
|
||||||
|
if (StringUtils.isNullOrEmpty(in)) {
|
||||||
|
throw new IllegalArgumentException(String.format("%s cannot be empty", fieldName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void notNull(Object in, String fieldName) {
|
||||||
|
if (in == null) {
|
||||||
|
throw new IllegalArgumentException(String.format("%s cannot be null", fieldName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -92,3 +92,28 @@ The following binary components are provided under the Apache Software License v
|
||||||
A list of contributors may be found from CREDITS file, which is included
|
A list of contributors may be found from CREDITS file, which is included
|
||||||
in some artifacts (usually source distributions); but is always available
|
in some artifacts (usually source distributions); but is always available
|
||||||
from the source code management (SCM) system project uses.
|
from the source code management (SCM) system project uses.
|
||||||
|
|
||||||
|
|
||||||
|
(ASLv2) This includes derived works from apigateway-generic-java-sdk project (https://github.com/rpgreen/apigateway-generic-java-sdk)
|
||||||
|
https://github.com/rpgreen/apigateway-generic-java-sdk/commit/32eea44cc855a530c9b4a28b9f3601a41bc85618 as the point reference:
|
||||||
|
The derived work is adapted from
|
||||||
|
main/ca/ryangreen/apigateway/generic/
|
||||||
|
GenericApiGatewayClient.java
|
||||||
|
GenericApiGatewayClientBuilder.java
|
||||||
|
GenericApiGatewayException.java
|
||||||
|
GenericApiGatewayRequest.java
|
||||||
|
GenericApiGatewayRequestBuilder.java
|
||||||
|
test/ca/ryangreen/apigateway/generic/
|
||||||
|
GenericApiGatewayClientTest.java
|
||||||
|
LambdaMatcher.java
|
||||||
|
and can be found in the directories:
|
||||||
|
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/../wag/client/
|
||||||
|
GenericApiGatewayClient.java
|
||||||
|
GenericApiGatewayClientBuilder.java
|
||||||
|
GenericApiGatewayException.java
|
||||||
|
GenericApiGatewayRequest.java
|
||||||
|
GenericApiGatewayRequestBuilder.java
|
||||||
|
Validate.java
|
||||||
|
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/../wag/
|
||||||
|
RequestMatcher.java
|
||||||
|
GetAWSGatewayApiTest.java
|
|
@ -50,6 +50,23 @@
|
||||||
<version>1.7.0-SNAPSHOT</version>
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-standard-processors</artifactId>
|
||||||
|
<version>1.7.0-SNAPSHOT</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.jetty</groupId>
|
||||||
|
<artifactId>jetty-server</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-simple</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||||
|
|
|
@ -0,0 +1,385 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import com.amazonaws.http.AmazonHttpClient;
|
||||||
|
import org.apache.http.impl.EnglishReasonPhraseCatalog;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayClient;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayException;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayRequest;
|
||||||
|
import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse;
|
||||||
|
import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@TriggerWhenEmpty
|
||||||
|
@SupportsBatching
|
||||||
|
@InputRequirement(Requirement.INPUT_ALLOWED)
|
||||||
|
@Tags({"Amazon", "AWS", "Client", "Gateway-API", "Rest", "http", "https"})
|
||||||
|
@CapabilityDescription("Client for AWS Gateway API endpoint")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "mime.type", description = "The MIME Type of the flowfiles"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.status.code", description = "The status code that is returned"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.status.message", description = "The status message that is returned"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.response.body", description = "In the instance where the status code received is not a success (2xx)"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.resource", description = "The request resource"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.tx.id", description = "The transaction ID that is returned after reading the response"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.java.exception.class", description = "The Java exception class raised when the processor fails"),
|
||||||
|
@WritesAttribute(attribute = "aws.gateway.api.java.exception.message", description = "The Java exception message raised when the processor fails"),})
|
||||||
|
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", supportsExpressionLanguage = true, description =
|
||||||
|
"Send request header "
|
||||||
|
+ "with a key matching the Dynamic Property Key and a value created by evaluating the Attribute Expression Language set in the value "
|
||||||
|
+ "of the Dynamic Property.")
|
||||||
|
public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
|
||||||
|
|
||||||
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays
|
||||||
|
.asList(
|
||||||
|
PROP_METHOD,
|
||||||
|
PROP_AWS_GATEWAY_API_REGION,
|
||||||
|
ACCESS_KEY,
|
||||||
|
SECRET_KEY,
|
||||||
|
CREDENTIALS_FILE,
|
||||||
|
AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||||
|
TIMEOUT,
|
||||||
|
PROP_RESOURCE_NAME,
|
||||||
|
PROP_AWS_GATEWAY_API_ENDPOINT,
|
||||||
|
PROP_AWS_API_KEY,
|
||||||
|
PROP_ATTRIBUTES_TO_SEND,
|
||||||
|
PROP_PUT_OUTPUT_IN_ATTRIBUTE,
|
||||||
|
PROP_CONTENT_TYPE,
|
||||||
|
PROP_SEND_BODY,
|
||||||
|
PROP_OUTPUT_RESPONSE_REGARDLESS,
|
||||||
|
PROP_PENALIZE_NO_RETRY,
|
||||||
|
PROXY_HOST,
|
||||||
|
PROXY_HOST_PORT,
|
||||||
|
PROXY_USERNAME,
|
||||||
|
PROXY_PASSWORD,
|
||||||
|
PROP_QUERY_PARAMS,
|
||||||
|
PROP_PUT_ATTRIBUTE_MAX_LENGTH,
|
||||||
|
PROP_ADD_HEADERS_TO_REQUEST,
|
||||||
|
PROXY_CONFIGURATION_SERVICE));
|
||||||
|
|
||||||
|
|
||||||
|
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
|
||||||
|
.name(REL_SUCCESS_REQ_NAME)
|
||||||
|
.description("The original FlowFile will be routed upon success (2xx status codes). It will have new "
|
||||||
|
+ "attributes detailing the success of the request.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_RESPONSE = new Relationship.Builder()
|
||||||
|
.name(REL_RESPONSE_NAME)
|
||||||
|
.description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response "
|
||||||
|
+ "Regardless' property is true then the response will be sent to this relationship regardless of "
|
||||||
|
+ "the status code received.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_RETRY = new Relationship.Builder()
|
||||||
|
.name(REL_RETRY_NAME)
|
||||||
|
.description("The original FlowFile will be routed on any status code that can be retried "
|
||||||
|
+ "(5xx status codes). It will have new attributes detailing the request.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_NO_RETRY = new Relationship.Builder()
|
||||||
|
.name(REL_NO_RETRY_NAME)
|
||||||
|
.description("The original FlowFile will be routed on any status code that should NOT be retried "
|
||||||
|
+ "(1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
|
.name(REL_FAILURE_NAME)
|
||||||
|
.description("The original FlowFile will be routed on any type of connection failure, timeout or general "
|
||||||
|
+ "exception. It will have new attributes detailing the request.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(
|
||||||
|
Arrays.asList(REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return RELATIONSHIPS;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InvokeAWSGatewayApi() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public InvokeAWSGatewayApi(AmazonHttpClient client) {
|
||||||
|
super(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
|
ComponentLog logger = getLogger();
|
||||||
|
FlowFile requestFlowFile = session.get();
|
||||||
|
|
||||||
|
// Checking to see if the property to put the body of the response in an attribute was set
|
||||||
|
boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet();
|
||||||
|
if (requestFlowFile == null) {
|
||||||
|
String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions()
|
||||||
|
.getValue().toUpperCase();
|
||||||
|
if ("POST".equals(request) || "PUT".equals(request) || "PATCH".equals(request)) {
|
||||||
|
return;
|
||||||
|
} else if (putToAttribute) {
|
||||||
|
requestFlowFile = session.create();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
|
||||||
|
final UUID txId = UUID.randomUUID();
|
||||||
|
FlowFile responseFlowFile = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH)
|
||||||
|
.asInteger();
|
||||||
|
|
||||||
|
final String resourceName = context.getProperty(PROP_RESOURCE_NAME).getValue();
|
||||||
|
|
||||||
|
final GenericApiGatewayClient client = getClient();
|
||||||
|
|
||||||
|
final GenericApiGatewayRequest request = configureRequest(context, session,
|
||||||
|
resourceName,
|
||||||
|
requestFlowFile);
|
||||||
|
|
||||||
|
logRequest(logger, client.getEndpoint(), request);
|
||||||
|
final long startNanos = System.nanoTime();
|
||||||
|
GenericApiGatewayResponse response = null;
|
||||||
|
GenericApiGatewayException exception = null;
|
||||||
|
try {
|
||||||
|
response = client.execute(request);
|
||||||
|
logResponse(logger, response);
|
||||||
|
} catch (GenericApiGatewayException gag) {
|
||||||
|
// ERROR response codes may come back as exceptions, 404 for example
|
||||||
|
exception = gag;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int statusCode;
|
||||||
|
if (exception != null) {
|
||||||
|
statusCode = exception.getStatusCode();
|
||||||
|
} else {
|
||||||
|
statusCode = response.getHttpResponse().getStatusCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (statusCode == 0) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Status code unknown, connection hasn't been attempted.");
|
||||||
|
}
|
||||||
|
final String endpoint = context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
|
||||||
|
boolean outputRegardless = context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS)
|
||||||
|
.asBoolean();
|
||||||
|
|
||||||
|
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute
|
||||||
|
|| outputRegardless);
|
||||||
|
boolean outputBodyToRequestAttribute =
|
||||||
|
(!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
|
||||||
|
boolean bodyExists = response != null && response.getBody() != null;
|
||||||
|
|
||||||
|
final String statusExplanation;
|
||||||
|
if (exception != null) {
|
||||||
|
statusExplanation = EnglishReasonPhraseCatalog.INSTANCE.getReason(statusCode, null);
|
||||||
|
} else {
|
||||||
|
statusExplanation = response.getHttpResponse().getStatusText();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a map of the status attributes that are always written to the request and response FlowFiles
|
||||||
|
final Map<String, String> statusAttributes = new HashMap<>();
|
||||||
|
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
|
||||||
|
statusAttributes.put(STATUS_MESSAGE, statusExplanation);
|
||||||
|
statusAttributes.put(ENDPOINT_ATTR, client.getEndpointPrefix());
|
||||||
|
statusAttributes.put(RESOURCE_NAME_ATTR, resourceName);
|
||||||
|
statusAttributes.put(TRANSACTION_ID, txId.toString());
|
||||||
|
|
||||||
|
if (outputBodyToResponseContent) {
|
||||||
|
/*
|
||||||
|
* If successful and putting to response flowfile, store the response body as the flowfile payload
|
||||||
|
* we include additional flowfile attributes including the response headers and the status codes.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// clone the flowfile to capture the response
|
||||||
|
if (requestFlowFile != null) {
|
||||||
|
responseFlowFile = session.create(requestFlowFile);
|
||||||
|
// write attributes to request flowfile
|
||||||
|
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
|
||||||
|
// If the property to add the response headers to the request flowfile is true then add them
|
||||||
|
if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean()) {
|
||||||
|
// write the response headers as attributes
|
||||||
|
// this will overwrite any existing flowfile attributes
|
||||||
|
requestFlowFile = session.putAllAttributes(requestFlowFile,
|
||||||
|
convertAttributesFromHeaders(
|
||||||
|
response));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
responseFlowFile = session.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
// write attributes to response flowfile
|
||||||
|
responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
|
||||||
|
|
||||||
|
// write the response headers as attributes
|
||||||
|
// this will overwrite any existing flowfile attributes
|
||||||
|
if (response != null) {
|
||||||
|
responseFlowFile = session
|
||||||
|
.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(response));
|
||||||
|
} else {
|
||||||
|
responseFlowFile = session
|
||||||
|
.putAllAttributes(responseFlowFile, exception.getHttpHeaders());
|
||||||
|
}
|
||||||
|
// transfer the message body to the payload
|
||||||
|
// can potentially be null in edge cases
|
||||||
|
if (bodyExists) {
|
||||||
|
final String contentType = response.getHttpResponse().getHeaders()
|
||||||
|
.get("Content-Type");
|
||||||
|
if (!(contentType == null) && !contentType.trim().isEmpty()) {
|
||||||
|
responseFlowFile = session
|
||||||
|
.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(),
|
||||||
|
contentType.trim());
|
||||||
|
}
|
||||||
|
|
||||||
|
responseFlowFile = session
|
||||||
|
.importFrom(new ByteArrayInputStream(response.getBody().getBytes()),
|
||||||
|
responseFlowFile);
|
||||||
|
|
||||||
|
// emit provenance event
|
||||||
|
final long millis = TimeUnit.NANOSECONDS
|
||||||
|
.toMillis(System.nanoTime() - startNanos);
|
||||||
|
if (requestFlowFile != null) {
|
||||||
|
session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis);
|
||||||
|
} else {
|
||||||
|
session.getProvenanceReporter().receive(responseFlowFile, endpoint, millis);
|
||||||
|
}
|
||||||
|
} else if (exception != null) {
|
||||||
|
final String contentType = "application/json";
|
||||||
|
responseFlowFile = session
|
||||||
|
.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(),
|
||||||
|
contentType.trim());
|
||||||
|
|
||||||
|
responseFlowFile = session
|
||||||
|
.importFrom(new ByteArrayInputStream(exception.getRawResponse()),
|
||||||
|
responseFlowFile);
|
||||||
|
|
||||||
|
// emit provenance event
|
||||||
|
final long millis = TimeUnit.NANOSECONDS
|
||||||
|
.toMillis(System.nanoTime() - startNanos);
|
||||||
|
if (requestFlowFile != null) {
|
||||||
|
session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis);
|
||||||
|
} else {
|
||||||
|
session.getProvenanceReporter().receive(responseFlowFile, endpoint, millis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if not successful and request flowfile is not null, store the response body into a flowfile attribute
|
||||||
|
if (outputBodyToRequestAttribute) {
|
||||||
|
String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE)
|
||||||
|
.evaluateAttributeExpressions(requestFlowFile)
|
||||||
|
.getValue();
|
||||||
|
if (attributeKey == null) {
|
||||||
|
attributeKey = RESPONSE_BODY;
|
||||||
|
}
|
||||||
|
byte[] outputBuffer;
|
||||||
|
int size = 0;
|
||||||
|
outputBuffer = new byte[maxAttributeSize];
|
||||||
|
if (bodyExists) {
|
||||||
|
size = StreamUtils
|
||||||
|
.fillBuffer(new ByteArrayInputStream(response.getBody().getBytes()),
|
||||||
|
outputBuffer, false);
|
||||||
|
} else if (exception != null && exception.getRawResponse() != null
|
||||||
|
&& exception.getRawResponse().length > 0) {
|
||||||
|
size = StreamUtils
|
||||||
|
.fillBuffer(new ByteArrayInputStream(exception.getRawResponse()),
|
||||||
|
outputBuffer, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (size > 0) {
|
||||||
|
String bodyString = new String(outputBuffer, 0, size, "UTF-8");
|
||||||
|
requestFlowFile = session
|
||||||
|
.putAttribute(requestFlowFile, attributeKey, bodyString);
|
||||||
|
}
|
||||||
|
|
||||||
|
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
|
||||||
|
|
||||||
|
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
|
session.getProvenanceReporter().modifyAttributes(requestFlowFile,
|
||||||
|
"The " + attributeKey
|
||||||
|
+ " has been added. The value of which is the body of a http call to "
|
||||||
|
+ endpoint + resourceName
|
||||||
|
+ ". It took " + millis
|
||||||
|
+ "millis,");
|
||||||
|
}
|
||||||
|
|
||||||
|
route(requestFlowFile, responseFlowFile, session, context, statusCode,
|
||||||
|
getRelationships());
|
||||||
|
} catch (Exception e) {
|
||||||
|
// penalize or yield
|
||||||
|
if (requestFlowFile != null) {
|
||||||
|
logger.error("Routing to {} due to exception: {}",
|
||||||
|
new Object[]{REL_FAILURE.getName(), e}, e);
|
||||||
|
requestFlowFile = session.penalize(requestFlowFile);
|
||||||
|
requestFlowFile = session
|
||||||
|
.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName());
|
||||||
|
requestFlowFile = session
|
||||||
|
.putAttribute(requestFlowFile, EXCEPTION_MESSAGE, e.getMessage());
|
||||||
|
// transfer original to failure
|
||||||
|
session.transfer(requestFlowFile,
|
||||||
|
getRelationshipForName(REL_FAILURE_NAME, getRelationships()));
|
||||||
|
} else {
|
||||||
|
logger.error(
|
||||||
|
"Yielding processor due to exception encountered as a source processor: {}", e);
|
||||||
|
context.yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanup response flowfile, if applicable
|
||||||
|
try {
|
||||||
|
if (responseFlowFile != null) {
|
||||||
|
session.remove(responseFlowFile);
|
||||||
|
}
|
||||||
|
} catch (final Exception e1) {
|
||||||
|
logger.error("Could not cleanup response flowfile due to exception: {}",
|
||||||
|
new Object[]{e1}, e1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,4 +27,5 @@ org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
|
||||||
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
|
org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
|
||||||
org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
|
org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
|
||||||
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
|
org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
|
||||||
|
org.apache.nifi.processors.aws.wag.InvokeAWSGatewayApi
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import org.hamcrest.BaseMatcher;
|
||||||
|
import org.hamcrest.Description;
|
||||||
|
|
||||||
|
public class RequestMatcher<T> extends BaseMatcher<T> {
|
||||||
|
|
||||||
|
private final Predicate<T> matcher;
|
||||||
|
private final Optional<String> description;
|
||||||
|
|
||||||
|
public RequestMatcher(Predicate<T> matcher) {
|
||||||
|
this(matcher, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RequestMatcher(Predicate<T> matcher, String description) {
|
||||||
|
this.matcher = matcher;
|
||||||
|
this.description = Optional.ofNullable(description);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public boolean matches(Object argument) {
|
||||||
|
return matcher.test((T) argument);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void describeTo(Description description) {
|
||||||
|
this.description.ifPresent(description::appendText);
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,223 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.http.AmazonHttpClient;
|
||||||
|
import com.amazonaws.http.apache.client.impl.SdkHttpClient;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.http.HttpResponse;
|
||||||
|
import org.apache.http.HttpVersion;
|
||||||
|
import org.apache.http.client.methods.HttpUriRequest;
|
||||||
|
import org.apache.http.entity.BasicHttpEntity;
|
||||||
|
import org.apache.http.message.BasicHttpResponse;
|
||||||
|
import org.apache.http.message.BasicStatusLine;
|
||||||
|
import org.apache.http.protocol.HttpContext;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestInvokeAmazonGatewayApiMock {
|
||||||
|
|
||||||
|
private TestRunner runner = null;
|
||||||
|
private InvokeAWSGatewayApi mockGetApi = null;
|
||||||
|
private SdkHttpClient mockSdkClient = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
mockSdkClient = Mockito.mock(SdkHttpClient.class);
|
||||||
|
ClientConfiguration clientConfig = new ClientConfiguration();
|
||||||
|
|
||||||
|
mockGetApi = new InvokeAWSGatewayApi(
|
||||||
|
new AmazonHttpClient(clientConfig, mockSdkClient, null));
|
||||||
|
runner = TestRunners.newTestRunner(mockGetApi);
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
|
||||||
|
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
|
||||||
|
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||||
|
runner.setProperty(serviceImpl, InvokeAWSGatewayApi.ACCESS_KEY, "awsAccessKey");
|
||||||
|
runner.setProperty(serviceImpl, InvokeAWSGatewayApi.SECRET_KEY, "awsSecretKey");
|
||||||
|
runner.enableControllerService(serviceImpl);
|
||||||
|
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||||
|
"awsCredentialsProvider");
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_REGION, "us-east-1");
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_API_KEY, "abcd");
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_RESOURCE_NAME, "/TEST");
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_AWS_GATEWAY_API_ENDPOINT,
|
||||||
|
"https://foobar.execute-api.us-east-1.amazonaws.com");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetApiSimple() throws Exception {
|
||||||
|
|
||||||
|
HttpResponse resp = new BasicHttpResponse(
|
||||||
|
new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
|
||||||
|
BasicHttpEntity entity = new BasicHttpEntity();
|
||||||
|
entity.setContent(new ByteArrayInputStream("test payload".getBytes()));
|
||||||
|
resp.setEntity(entity);
|
||||||
|
Mockito.doReturn(resp).when(mockSdkClient)
|
||||||
|
.execute(any(HttpUriRequest.class), any(HttpContext.class));
|
||||||
|
|
||||||
|
// execute
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
// check
|
||||||
|
Mockito.verify(mockSdkClient, times(1))
|
||||||
|
.execute(argThat(new RequestMatcher<HttpUriRequest>(x -> {
|
||||||
|
return x.getMethod().equals("GET") && x.getFirstHeader("x-api-key").getValue()
|
||||||
|
.equals("abcd") && x
|
||||||
|
.getFirstHeader("Authorization").getValue().startsWith("AWS4") && x.getURI()
|
||||||
|
.toString()
|
||||||
|
.equals(
|
||||||
|
"https://foobar.execute-api.us-east-1.amazonaws.com/TEST");
|
||||||
|
})), any(HttpContext.class));
|
||||||
|
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_SUCCESS_REQ, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE, 1);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RETRY, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_NO_RETRY, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
final List<MockFlowFile> flowFiles = runner
|
||||||
|
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE);
|
||||||
|
final MockFlowFile ff0 = flowFiles.get(0);
|
||||||
|
|
||||||
|
ff0.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
|
||||||
|
ff0.assertContentEquals("test payload");
|
||||||
|
ff0.assertAttributeExists(InvokeAWSGatewayApi.TRANSACTION_ID);
|
||||||
|
ff0.assertAttributeEquals(InvokeAWSGatewayApi.RESOURCE_NAME_ATTR, "/TEST");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendAttributes() throws Exception {
|
||||||
|
|
||||||
|
HttpResponse resp = new BasicHttpResponse(
|
||||||
|
new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
|
||||||
|
BasicHttpEntity entity = new BasicHttpEntity();
|
||||||
|
entity.setContent(new ByteArrayInputStream("test payload".getBytes()));
|
||||||
|
resp.setEntity(entity);
|
||||||
|
Mockito.doReturn(resp).when(mockSdkClient)
|
||||||
|
.execute(any(HttpUriRequest.class), any(HttpContext.class));
|
||||||
|
|
||||||
|
// add dynamic property
|
||||||
|
runner.setProperty("dynamicHeader", "yes!");
|
||||||
|
// set the regex
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_ATTRIBUTES_TO_SEND, "F.*");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
|
||||||
|
attributes.put("Foo", "Bar");
|
||||||
|
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
|
||||||
|
// execute
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
Mockito.verify(mockSdkClient, times(1))
|
||||||
|
.execute(argThat(new RequestMatcher<HttpUriRequest>(x -> {
|
||||||
|
return x.getMethod().equals("GET") && x.getFirstHeader("x-api-key").getValue()
|
||||||
|
.equals("abcd") && x
|
||||||
|
.getFirstHeader("Authorization").getValue().startsWith("AWS4") && x
|
||||||
|
.getFirstHeader("dynamicHeader").getValue().equals("yes!") && x
|
||||||
|
.getFirstHeader("Foo").getValue().equals("Bar") && x.getURI().toString()
|
||||||
|
.equals(
|
||||||
|
"https://foobar.execute-api.us-east-1.amazonaws.com/TEST");
|
||||||
|
})), any(HttpContext.class));
|
||||||
|
// check
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_SUCCESS_REQ, 1);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE, 1);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RETRY, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_NO_RETRY, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
final List<MockFlowFile> flowFiles = runner
|
||||||
|
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE);
|
||||||
|
final MockFlowFile ff0 = flowFiles.get(0);
|
||||||
|
|
||||||
|
ff0.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
|
||||||
|
ff0.assertContentEquals("test payload");
|
||||||
|
ff0.assertAttributeExists(InvokeAWSGatewayApi.TRANSACTION_ID);
|
||||||
|
ff0.assertAttributeEquals(InvokeAWSGatewayApi.RESOURCE_NAME_ATTR, "/TEST");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendQueryParams() throws Exception {
|
||||||
|
|
||||||
|
HttpResponse resp = new BasicHttpResponse(
|
||||||
|
new BasicStatusLine(HttpVersion.HTTP_1_1, 200, "OK"));
|
||||||
|
BasicHttpEntity entity = new BasicHttpEntity();
|
||||||
|
entity.setContent(new ByteArrayInputStream("test payload".getBytes()));
|
||||||
|
resp.setEntity(entity);
|
||||||
|
Mockito.doReturn(resp).when(mockSdkClient)
|
||||||
|
.execute(any(HttpUriRequest.class), any(HttpContext.class));
|
||||||
|
|
||||||
|
// add dynamic property
|
||||||
|
runner.setProperty("dynamicHeader", "yes!");
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_QUERY_PARAMS, "apples=oranges&dogs=cats");
|
||||||
|
|
||||||
|
// set the regex
|
||||||
|
runner.setProperty(InvokeAWSGatewayApi.PROP_ATTRIBUTES_TO_SEND, "F.*");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
|
||||||
|
attributes.put("Foo", "Bar");
|
||||||
|
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
|
||||||
|
// execute
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
Mockito.verify(mockSdkClient, times(1))
|
||||||
|
.execute(argThat(new RequestMatcher<HttpUriRequest>(x -> {
|
||||||
|
return x.getMethod().equals("GET") && x.getFirstHeader("x-api-key").getValue()
|
||||||
|
.equals("abcd") && x
|
||||||
|
.getFirstHeader("Authorization").getValue().startsWith("AWS4") && x
|
||||||
|
.getFirstHeader("dynamicHeader").getValue().equals("yes!") && x
|
||||||
|
.getFirstHeader("Foo").getValue().equals("Bar") && x.getURI().toString()
|
||||||
|
.equals(
|
||||||
|
"https://foobar.execute-api.us-east-1.amazonaws.com/TEST?dogs=cats&apples=oranges");
|
||||||
|
})), any(HttpContext.class));
|
||||||
|
// check
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_SUCCESS_REQ, 1);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RESPONSE, 1);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_RETRY, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_NO_RETRY, 0);
|
||||||
|
runner.assertTransferCount(InvokeAWSGatewayApi.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
final List<MockFlowFile> flowFiles = runner
|
||||||
|
.getFlowFilesForRelationship(InvokeAWSGatewayApi.REL_RESPONSE);
|
||||||
|
final MockFlowFile ff0 = flowFiles.get(0);
|
||||||
|
|
||||||
|
ff0.assertAttributeEquals(InvokeAWSGatewayApi.STATUS_CODE, "200");
|
||||||
|
ff0.assertContentEquals("test payload");
|
||||||
|
ff0.assertAttributeExists(InvokeAWSGatewayApi.TRANSACTION_ID);
|
||||||
|
ff0.assertAttributeEquals(InvokeAWSGatewayApi.RESOURCE_NAME_ATTR, "/TEST");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.nifi.processors.standard.TestServer;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestInvokeInvokeAmazonGatewayApiWithControllerService extends
|
||||||
|
TestInvokeAWSGatewayApiCommon {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
// useful for verbose logging output
|
||||||
|
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
|
||||||
|
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
|
||||||
|
|
||||||
|
// create a Jetty server on a random port
|
||||||
|
server = createServer();
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
// this is the base url with the random port
|
||||||
|
url = server.getUrl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
server.shutdownServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
server.clearHandlers();
|
||||||
|
setupControllerService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
runner.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TestServer createServer() throws IOException {
|
||||||
|
return new TestServer();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.nifi.processors.standard.TestServer;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestInvokeInvokeAmazonGatewayApiWithCredFile extends
|
||||||
|
TestInvokeAWSGatewayApiCommon {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
// useful for verbose logging output
|
||||||
|
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
|
||||||
|
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
|
||||||
|
|
||||||
|
// create a Jetty server on a random port
|
||||||
|
server = createServer();
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
// this is the base url with the random port
|
||||||
|
url = server.getUrl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
server.shutdownServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
server.clearHandlers();
|
||||||
|
setupCredFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
runner.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TestServer createServer() throws IOException {
|
||||||
|
return new TestServer();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.wag;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.nifi.processors.standard.TestServer;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
|
||||||
|
public class TestInvokeInvokeAmazonGatewayApiWithStaticAuth extends
|
||||||
|
TestInvokeAWSGatewayApiCommon {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
// useful for verbose logging output
|
||||||
|
// don't commit this with this property enabled, or any 'mvn test' will be really verbose
|
||||||
|
// System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard", "debug");
|
||||||
|
|
||||||
|
// create a Jetty server on a random port
|
||||||
|
server = createServer();
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
// this is the base url with the random port
|
||||||
|
url = server.getUrl();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
server.shutdownServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
runner = TestRunners.newTestRunner(InvokeAWSGatewayApi.class);
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
server.clearHandlers();
|
||||||
|
setupAuth();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
runner.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TestServer createServer() throws IOException {
|
||||||
|
return new TestServer();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue