mirror of
https://github.com/apache/nifi.git
synced 2025-02-16 15:06:00 +00:00
NIFI-747 This closes #104. PR from Venkatesh Sellappa <VS186031@outlook.com> was modified, then code reviewed by Joe Witt (comments in ticket)
This commit is contained in:
parent
ad73a23aff
commit
88b1b844fb
@ -63,7 +63,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
|||||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
|
||||||
@Tags({"ingest", "http", "https", "rest", "listen"})
|
@Tags({"ingest", "http", "https", "rest", "listen"})
|
||||||
@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener")
|
@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener")
|
||||||
public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
private Set<Relationship> relationships;
|
private Set<Relationship> relationships;
|
||||||
@ -74,6 +74,14 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
.description("Relationship for successfully received FlowFiles")
|
.description("Relationship for successfully received FlowFiles")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
|
||||||
|
.name("Base Path")
|
||||||
|
.description("Base path for incoming connections")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("contentListener")
|
||||||
|
.addValidator(StandardValidators.URI_VALIDATOR)
|
||||||
|
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
|
||||||
.name("Listening Port")
|
.name("Listening Port")
|
||||||
.description("The Port to listen on for incoming connections")
|
.description("The Port to listen on for incoming connections")
|
||||||
@ -113,7 +121,6 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final String URI = "/contentListener";
|
|
||||||
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
|
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
|
||||||
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
|
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
|
||||||
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
|
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
|
||||||
@ -122,6 +129,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
|
public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
|
||||||
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
|
public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
|
||||||
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
|
public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
|
||||||
|
public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath";
|
||||||
|
|
||||||
private volatile Server server = null;
|
private volatile Server server = null;
|
||||||
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
|
||||||
@ -134,6 +142,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
this.relationships = Collections.unmodifiableSet(relationships);
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
|
||||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||||
|
descriptors.add(BASE_PATH);
|
||||||
descriptors.add(PORT);
|
descriptors.add(PORT);
|
||||||
descriptors.add(MAX_DATA_RATE);
|
descriptors.add(MAX_DATA_RATE);
|
||||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||||
@ -170,6 +179,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void createHttpServerFromService(final ProcessContext context) throws Exception {
|
private void createHttpServerFromService(final ProcessContext context) throws Exception {
|
||||||
|
final String basePath = context.getProperty(BASE_PATH).getValue();
|
||||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||||
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
|
final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
|
||||||
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
|
final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
|
||||||
@ -230,12 +240,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
|
final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null));
|
||||||
for (final Class<? extends Servlet> cls : getServerClasses()) {
|
for (final Class<? extends Servlet> cls : getServerClasses()) {
|
||||||
final Path path = cls.getAnnotation(Path.class);
|
final Path path = cls.getAnnotation(Path.class);
|
||||||
if (path == null) {
|
// Note: servlets must have a path annotation - this will NPE otherwise
|
||||||
contextHandler.addServlet(cls, "/*");
|
// also, servlets other than ListenHttpServlet must have a path starting with /
|
||||||
} else {
|
if(basePath.isEmpty() && !path.value().isEmpty()){
|
||||||
|
// Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with //
|
||||||
contextHandler.addServlet(cls, path.value());
|
contextHandler.addServlet(cls, path.value());
|
||||||
}
|
}
|
||||||
|
else{
|
||||||
|
contextHandler.addServlet(cls, "/" + basePath + path.value());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
|
||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
|
||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference);
|
||||||
@ -243,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap);
|
||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
|
||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
|
||||||
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath);
|
||||||
|
|
||||||
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
|
if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
|
||||||
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
|
contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
|
||||||
@ -259,6 +275,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||||||
|
|
||||||
protected Set<Class<? extends Servlet>> getServerClasses() {
|
protected Set<Class<? extends Servlet>> getServerClasses() {
|
||||||
final Set<Class<? extends Servlet>> s = new HashSet<>();
|
final Set<Class<? extends Servlet>> s = new HashSet<>();
|
||||||
|
// NOTE: Servlets added below MUST have a Path annotation
|
||||||
|
// any servlets other than ListenHTTPServlet must have a Path annotation start with /
|
||||||
s.add(ListenHTTPServlet.class);
|
s.add(ListenHTTPServlet.class);
|
||||||
s.add(ContentAcknowledgmentServlet.class);
|
s.add(ContentAcknowledgmentServlet.class);
|
||||||
return s;
|
return s;
|
||||||
|
@ -38,10 +38,9 @@ import org.apache.nifi.processors.standard.ListenHTTP;
|
|||||||
import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
|
import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
|
||||||
@Path(ContentAcknowledgmentServlet.URI)
|
@Path("/holds/*")
|
||||||
public class ContentAcknowledgmentServlet extends HttpServlet {
|
public class ContentAcknowledgmentServlet extends HttpServlet {
|
||||||
|
|
||||||
public static final String URI = ListenHTTP.URI + "/holds/*";
|
|
||||||
public static final String DEFAULT_FOUND_SUBJECT = "none";
|
public static final String DEFAULT_FOUND_SUBJECT = "none";
|
||||||
private static final long serialVersionUID = -2675148117984902978L;
|
private static final long serialVersionUID = -2675148117984902978L;
|
||||||
|
|
||||||
|
@ -61,7 +61,8 @@ import org.apache.nifi.util.FlowFileUnpackagerV3;
|
|||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
@Path(ListenHTTP.URI)
|
|
||||||
|
@Path("")
|
||||||
public class ListenHTTPServlet extends HttpServlet {
|
public class ListenHTTPServlet extends HttpServlet {
|
||||||
|
|
||||||
private static final long serialVersionUID = 5329940480987723163L;
|
private static final long serialVersionUID = 5329940480987723163L;
|
||||||
@ -93,6 +94,7 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||||||
private Pattern headerPattern;
|
private Pattern headerPattern;
|
||||||
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
|
private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
|
||||||
private StreamThrottler streamThrottler;
|
private StreamThrottler streamThrottler;
|
||||||
|
private String basePath;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
@ -105,6 +107,7 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||||||
this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
|
this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
|
||||||
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
|
this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
|
||||||
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
|
this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
|
||||||
|
this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -291,7 +294,7 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||||||
} while (previousWrapper != null);
|
} while (previousWrapper != null);
|
||||||
|
|
||||||
response.setStatus(HttpServletResponse.SC_SEE_OTHER);
|
response.setStatus(HttpServletResponse.SC_SEE_OTHER);
|
||||||
final String ackUri = ListenHTTP.URI + "/holds/" + uuid;
|
final String ackUri = "/" + basePath + "/holds/" + uuid;
|
||||||
response.addHeader(LOCATION_HEADER_NAME, ackUri);
|
response.addHeader(LOCATION_HEADER_NAME, ackUri);
|
||||||
response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
|
response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
|
||||||
response.getOutputStream().write(ackUri.getBytes("UTF-8"));
|
response.getOutputStream().write(ackUri.getBytes("UTF-8"));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user