mirror of https://github.com/apache/nifi.git
NIFI-201: Supporting arbitrary HTTP headers as flow file attributes in ListenHTTP
This commit is contained in:
parent
ac3c3bb4d5
commit
b233eaf20d
|
@ -106,12 +106,19 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
|
||||
.name("HTTP Headers to receive as Attributes (Regex)")
|
||||
.description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final String URI = "/contentListener";
|
||||
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
|
||||
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
|
||||
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
|
||||
public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
|
||||
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
|
||||
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
|
||||
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
|
||||
|
||||
|
@ -131,6 +138,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(AUTHORIZED_DN_PATTERN);
|
||||
descriptors.add(MAX_UNCONFIRMED_TIME);
|
||||
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
|
||||
this.properties = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
|
@ -236,6 +244,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
|
||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
|
||||
|
||||
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
|
||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
|
||||
}
|
||||
server.start();
|
||||
|
||||
this.server = server;
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
@ -56,7 +57,6 @@ import org.apache.nifi.util.FlowFileUnpackager;
|
|||
import org.apache.nifi.util.FlowFileUnpackagerV1;
|
||||
import org.apache.nifi.util.FlowFileUnpackagerV2;
|
||||
import org.apache.nifi.util.FlowFileUnpackagerV3;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
|
@ -88,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
private ProcessorLog logger;
|
||||
private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
|
||||
private Pattern authorizedPattern;
|
||||
private Pattern headerPattern;
|
||||
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
|
||||
private StreamThrottler streamThrottler;
|
||||
|
||||
|
@ -103,6 +104,7 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
|
||||
this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
|
||||
this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
|
||||
this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
|
||||
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
|
||||
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
|
||||
}
|
||||
|
@ -242,6 +244,16 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
if (StringUtils.isNotBlank(nameVal)) {
|
||||
attributes.put(CoreAttributes.FILENAME.key(), nameVal);
|
||||
}
|
||||
|
||||
// put arbitrary headers on flow file
|
||||
for(Enumeration<String> headerEnum = request.getHeaderNames();
|
||||
headerEnum.hasMoreElements(); ) {
|
||||
String headerName = headerEnum.nextElement();
|
||||
if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
|
||||
String headerValue = request.getHeader(headerName);
|
||||
attributes.put(headerName, headerValue);
|
||||
}
|
||||
}
|
||||
|
||||
String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
|
||||
if (sourceSystemFlowFileIdentifier != null) {
|
||||
|
|
Loading…
Reference in New Issue