NIFI-7761 Allow HandleHttpRequest to add specified form data to FlowFile attributes

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4513.
This commit is contained in:
Otto Fowler 2020-09-07 01:10:31 -04:00 committed by Pierre Villard
parent e7c6bdad42
commit ceb9dff3b9
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 112 additions and 5 deletions

View File

@ -89,6 +89,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -115,6 +116,8 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "http.principal.name", description = "The name of the authenticated user making the request"),
@WritesAttribute(attribute = "http.query.param.XXX", description = "Each of query parameters in the request will be added as an attribute, "
+ "prefixed with \"http.query.param.\""),
@WritesAttribute(attribute = "http.param.XXX", description = "Form parameters in the request that are configured by \"Parameters to Attributes List\" will be added as an attribute, "
+ "prefixed with \"http.param.\". Putting form parameters of large size is not recommended."),
@WritesAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "The Distinguished Name of the requestor. This value will not be populated "
+ "unless the Processor is configured to use an SSLContext Service"),
@WritesAttribute(attribute = "http.issuer.dn", description = "The Distinguished Name of the entity that issued the Subject's certificate. "
@ -122,7 +125,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "http.headers.XXX", description = "Each of the HTTP Headers that is received in the request will be added as an "
+ "attribute, prefixed with \"http.headers.\" For example, if the request contains an HTTP Header named \"x-my-header\", then the value "
+ "will be added to an attribute named \"http.headers.x-my-header\""),
@WritesAttribute(attribute = "http.headers.multipart.XXX", description = "Each of the HTTP Headers that is received in the mulipart request will be added as an "
@WritesAttribute(attribute = "http.headers.multipart.XXX", description = "Each of the HTTP Headers that is received in the multipart request will be added as an "
+ "attribute, prefixed with \"http.headers.multipart.\" For example, if the multipart request contains an HTTP Header named \"content-disposition\", then the value "
+ "will be added to an attribute named \"http.headers.multipart.content-disposition\""),
@WritesAttribute(attribute = "http.multipart.size",
@ -247,6 +250,14 @@ public class HandleHttpRequest extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor PARAMETERS_TO_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("parameters-to-attributes")
.displayName("Parameters to Attributes List")
.description("A comma-separated list of HTTP parameters or form data to output as attributes")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Authentication")
.description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> "
@ -304,6 +315,7 @@ public class HandleHttpRequest extends AbstractProcessor {
descriptors.add(CONTAINER_QUEUE_SIZE);
descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
descriptors.add(MULTIPART_READ_BUFFER_SIZE);
descriptors.add(PARAMETERS_TO_ATTRIBUTES);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@ -312,6 +324,7 @@ public class HandleHttpRequest extends AbstractProcessor {
private AtomicBoolean initialized = new AtomicBoolean(false);
private volatile BlockingQueue<HttpRequestContainer> containerQueue;
private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
private AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -427,6 +440,18 @@ public class HandleHttpRequest extends AbstractProcessor {
}
}
final Set<String> parametersToMakeAttributes = new HashSet<>();
final String parametersToAttributesPropertyValue = context.getProperty(PARAMETERS_TO_ATTRIBUTES).getValue();
if (parametersToAttributesPropertyValue != null) {
for (final String paremeterName : parametersToAttributesPropertyValue.split(",")) {
final String trimmed = paremeterName.trim();
if (!trimmed.isEmpty()) {
parametersToMakeAttributes.add(trimmed);
}
}
parameterToAttributesReference.set(parametersToMakeAttributes);
}
final String pathRegex = context.getProperty(PATH_REGEX).getValue();
final Pattern pathPattern = (pathRegex == null) ? null : Pattern.compile(pathRegex);
@ -740,6 +765,17 @@ public class HandleHttpRequest extends AbstractProcessor {
putAttribute(attributes, "http.server.name", request.getServerName());
putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort());
Set<String> parametersToAttributes = parameterToAttributesReference.get();
if (parametersToAttributes != null && !parametersToAttributes.isEmpty()){
final Enumeration<String> paramEnumeration = request.getParameterNames();
while (paramEnumeration.hasMoreElements()) {
final String paramName = paramEnumeration.nextElement();
if (parametersToAttributes.contains(paramName)){
attributes.put("http.param." + paramName, request.getParameter(paramName));
}
}
}
final Cookie[] cookies = request.getCookies();
if (cookies != null) {
for (final Cookie cookie : cookies) {

View File

@ -19,10 +19,6 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -46,6 +42,11 @@ import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
@ -303,6 +304,76 @@ public class ITestHandleHttpRequest {
mff.assertAttributeExists("http.headers.multipart.content-disposition");
}
@Test(timeout = 30000)
public void testMultipartFormDataRequestCaptureFormAttributes() throws InitializationException, IOException,
InterruptedException {
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(1);
processor = createProcessor(serverReady, requestSent);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
runner.setProperty(HandleHttpRequest.PARAMETERS_TO_ATTRIBUTES, "p1,p2");
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
MultipartBody multipartBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("p1", "v1")
.addFormDataPart("p2", "v2")
.addFormDataPart("p3", "v3")
.build();
Request request = new Request.Builder()
.url(String.format("http://localhost:%s/my/path", port))
.post(multipartBody).build();
OkHttpClient client =
new OkHttpClient.Builder()
.readTimeout(3000, TimeUnit.MILLISECONDS)
.writeTimeout(3000, TimeUnit.MILLISECONDS)
.build();
sendRequest(client, request, requestSent);
} catch (Exception e) {
// Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
httpThread.start();
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 3);
assertEquals(1, contextMap.size());
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS);
// Part fragments are not processed in the order we submitted them.
// We cannot rely on the order we sent them in.
for (int i = 1; i < 4; i++) {
MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", String.format("p%d", i));
String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
mff.assertAttributeEquals("http.multipart.name", String.format("p%d", i));
mff.assertAttributeExists("http.param.p1");
mff.assertAttributeEquals("http.param.p1", "v1");
mff.assertAttributeExists("http.param.p2");
mff.assertAttributeEquals("http.param.p2", "v2");
mff.assertAttributeNotExists("http.param.p3");
}
}
@Test(timeout = 30000)
public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, IOException, InterruptedException {
CountDownLatch serverReady = new CountDownLatch(1);