Merge branch 'NIFI-201' of https://github.com/gresockj/incubator-nifi into develop

This commit is contained in:
joewitt 2015-01-03 01:31:17 -05:00
commit 5b4211d9af
2 changed files with 24 additions and 1 deletions

View File

@ -106,12 +106,19 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
.name("HTTP Headers to receive as Attributes (Regex)")
.description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.required(false)
.build();
public static final String URI = "/contentListener"; public static final String URI = "/contentListener";
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder"; public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern"; public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
@ -131,6 +138,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(AUTHORIZED_DN_PATTERN); descriptors.add(AUTHORIZED_DN_PATTERN);
descriptors.add(MAX_UNCONFIRMED_TIME); descriptors.add(MAX_UNCONFIRMED_TIME);
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
this.properties = Collections.unmodifiableList(descriptors); this.properties = Collections.unmodifiableList(descriptors);
} }
@ -236,6 +244,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
}
server.start(); server.start();
this.server = server; this.server = server;

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -56,7 +57,6 @@ import org.apache.nifi.util.FlowFileUnpackager;
import org.apache.nifi.util.FlowFileUnpackagerV1; import org.apache.nifi.util.FlowFileUnpackagerV1;
import org.apache.nifi.util.FlowFileUnpackagerV2; import org.apache.nifi.util.FlowFileUnpackagerV2;
import org.apache.nifi.util.FlowFileUnpackagerV3; import org.apache.nifi.util.FlowFileUnpackagerV3;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -88,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet {
private ProcessorLog logger; private ProcessorLog logger;
private AtomicReference<ProcessSessionFactory> sessionFactoryHolder; private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
private Pattern authorizedPattern; private Pattern authorizedPattern;
private Pattern headerPattern;
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
private StreamThrottler streamThrottler; private StreamThrottler streamThrottler;
@ -103,6 +104,7 @@ public class ListenHTTPServlet extends HttpServlet {
this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER); this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
} }
@ -243,6 +245,16 @@ public class ListenHTTPServlet extends HttpServlet {
attributes.put(CoreAttributes.FILENAME.key(), nameVal); attributes.put(CoreAttributes.FILENAME.key(), nameVal);
} }
// put arbitrary headers on flow file
for(Enumeration<String> headerEnum = request.getHeaderNames();
headerEnum.hasMoreElements(); ) {
String headerName = headerEnum.nextElement();
if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
String headerValue = request.getHeader(headerName);
attributes.put(headerName, headerValue);
}
}
String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key()); String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
if (sourceSystemFlowFileIdentifier != null) { if (sourceSystemFlowFileIdentifier != null) {
sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier; sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;