From b233eaf20d36cd17de9d79a36a4fe3bafe8f99ba Mon Sep 17 00:00:00 2001 From: gresockj Date: Fri, 26 Dec 2014 22:41:12 -0500 Subject: [PATCH] NIFI-201: Supporting arbitrary HTTP headers as flow file attributes in ListenHTTP --- .../nifi/processors/standard/ListenHTTP.java | 11 +++++++++++ .../standard/servlets/ListenHTTPServlet.java | 14 +++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 5e7ce56e03..2b0b437140 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -106,12 +106,19 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder() + .name("HTTP Headers to receive as Attributes (Regex)") + .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .required(false) + .build(); public static final String URI = "/contentListener"; public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder"; public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern"; + public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; @@ -131,6 +138,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(AUTHORIZED_DN_PATTERN); descriptors.add(MAX_UNCONFIRMED_TIME); + descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX); this.properties = Collections.unmodifiableList(descriptors); } @@ -236,6 +244,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); + if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); + } server.start(); this.server = server; diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index fb52b80734..cae61f0ff3 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.security.cert.X509Certificate; +import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -56,7 +57,6 @@ import org.apache.nifi.util.FlowFileUnpackager; import org.apache.nifi.util.FlowFileUnpackagerV1; import org.apache.nifi.util.FlowFileUnpackagerV2; import org.apache.nifi.util.FlowFileUnpackagerV3; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -88,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet { private ProcessorLog logger; private AtomicReference sessionFactoryHolder; private Pattern authorizedPattern; + private Pattern headerPattern; private ConcurrentMap flowFileMap; private StreamThrottler streamThrottler; @@ -103,6 +104,7 @@ public class ListenHTTPServlet extends HttpServlet { this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); this.sessionFactoryHolder = (AtomicReference) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); + this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); this.flowFileMap = (ConcurrentMap) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); } @@ -242,6 +244,16 @@ public class ListenHTTPServlet extends HttpServlet { if (StringUtils.isNotBlank(nameVal)) { attributes.put(CoreAttributes.FILENAME.key(), nameVal); } + + // put arbitrary headers on flow file + for(Enumeration headerEnum = request.getHeaderNames(); + headerEnum.hasMoreElements(); ) { + String headerName = headerEnum.nextElement(); + if (headerPattern != null && headerPattern.matcher(headerName).matches()) { + String headerValue = request.getHeader(headerName); + attributes.put(headerName, headerValue); + } + } String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); if (sourceSystemFlowFileIdentifier != null) {