diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index c7842d968a..a446eb68a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -63,7 +63,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; @Tags({"ingest", "http", "https", "rest", "listen"}) -@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener") +@CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener") public class ListenHTTP extends AbstractSessionFactoryProcessor { private Set relationships; @@ -74,6 +74,14 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .description("Relationship for successfully received FlowFiles") .build(); + public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder() + .name("Base Path") + .description("Base path for incoming connections") + .required(true) + .defaultValue("contentListener") + .addValidator(StandardValidators.URI_VALIDATOR) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with / + .build(); public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() .name("Listening Port") .description("The Port to listen on for incoming connections") @@ -113,7 +121,6 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .required(false) .build(); - public static final String URI = "/contentListener"; public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder"; @@ -122,6 +129,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern"; public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; + public static final String CONTEXT_ATTRIBUTE_BASE_PATH = "basePath"; private volatile Server server = null; private final ConcurrentMap flowFileMap = new ConcurrentHashMap<>(); @@ -134,6 +142,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { this.relationships = Collections.unmodifiableSet(relationships); final List descriptors = new ArrayList<>(); + descriptors.add(BASE_PATH); descriptors.add(PORT); descriptors.add(MAX_DATA_RATE); descriptors.add(SSL_CONTEXT_SERVICE); @@ -170,6 +179,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { } private void createHttpServerFromService(final ProcessContext context) throws Exception { + final String basePath = context.getProperty(BASE_PATH).getValue(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); @@ -230,12 +240,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null)); for (final Class cls : getServerClasses()) { final Path path = cls.getAnnotation(Path.class); - if (path == null) { - contextHandler.addServlet(cls, "/*"); - } else { + // Note: servlets must have a path annotation - this will NPE otherwise + // also, servlets other than ListenHttpServlet must have a path starting with / + if(basePath.isEmpty() && !path.value().isEmpty()){ + // Note: this is to handle the condition of an empty uri, otherwise pathSpec would start with // contextHandler.addServlet(cls, path.value()); } + else{ + contextHandler.addServlet(cls, "/" + basePath + path.value()); + } } + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger()); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference); @@ -243,6 +258,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_BASE_PATH, basePath); if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); @@ -259,6 +275,8 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { protected Set> getServerClasses() { final Set> s = new HashSet<>(); + // NOTE: Servlets added below MUST have a Path annotation + // any servlets other than ListenHTTPServlet must have a Path annotation start with / s.add(ListenHTTPServlet.class); s.add(ContentAcknowledgmentServlet.class); return s; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java index 7dd6797148..3252aea7f0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java @@ -38,10 +38,9 @@ import org.apache.nifi.processors.standard.ListenHTTP; import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper; import org.apache.nifi.util.FormatUtils; -@Path(ContentAcknowledgmentServlet.URI) +@Path("/holds/*") public class ContentAcknowledgmentServlet extends HttpServlet { - public static final String URI = ListenHTTP.URI + "/holds/*"; public static final String DEFAULT_FOUND_SUBJECT = "none"; private static final long serialVersionUID = -2675148117984902978L; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 6a8f32fc5c..79d38877d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -61,7 +61,8 @@ import org.apache.nifi.util.FlowFileUnpackagerV3; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -@Path(ListenHTTP.URI) + +@Path("") public class ListenHTTPServlet extends HttpServlet { private static final long serialVersionUID = 5329940480987723163L; @@ -93,6 +94,7 @@ public class ListenHTTPServlet extends HttpServlet { private Pattern headerPattern; private ConcurrentMap flowFileMap; private StreamThrottler streamThrottler; + private String basePath; @SuppressWarnings("unchecked") @Override @@ -105,6 +107,7 @@ public class ListenHTTPServlet extends HttpServlet { this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN); this.flowFileMap = (ConcurrentMap) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER); + this.basePath = (String) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_BASE_PATH); } @Override @@ -291,7 +294,7 @@ public class ListenHTTPServlet extends HttpServlet { } while (previousWrapper != null); response.setStatus(HttpServletResponse.SC_SEE_OTHER); - final String ackUri = ListenHTTP.URI + "/holds/" + uuid; + final String ackUri = "/" + basePath + "/holds/" + uuid; response.addHeader(LOCATION_HEADER_NAME, ackUri); response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE); response.getOutputStream().write(ackUri.getBytes("UTF-8"));