NIFI-8782 Added Rate-Limiting for Access Token Requests

- Added Jetty DoSFilter configured for /access/token
- Added nifi.web.max.access.token.requests.per.second property with default value of 25

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5215.
This commit is contained in:
exceptionfactory 2021-07-13 20:42:05 -05:00 committed by Nathan Gough
parent 7356332852
commit c668d3df1b
6 changed files with 79 additions and 76 deletions

View File

@ -223,6 +223,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String WEB_PROXY_HOST = "nifi.web.proxy.host";
public static final String WEB_MAX_CONTENT_SIZE = "nifi.web.max.content.size";
public static final String WEB_MAX_REQUESTS_PER_SECOND = "nifi.web.max.requests.per.second";
public static final String WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND = "nifi.web.max.access.token.requests.per.second";
public static final String WEB_REQUEST_TIMEOUT = "nifi.web.request.timeout";
public static final String WEB_REQUEST_IP_WHITELIST = "nifi.web.request.ip.whitelist";
public static final String WEB_SHOULD_SEND_SERVER_VERSION = "nifi.web.should.send.server.version";
@ -312,7 +313,8 @@ public class NiFiProperties extends ApplicationProperties {
public static final String DEFAULT_WEB_MAX_HEADER_SIZE = "16 KB";
public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty";
public static final String DEFAULT_WEB_MAX_CONTENT_SIZE = "20 MB";
public static final String DEFAULT_WEB_MAX_REQUESTS_PER_SECOND = "30000";
public static final int DEFAULT_WEB_MAX_REQUESTS_PER_SECOND = 30000;
public static final int DEFAULT_WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND = 25;
public static final String DEFAULT_WEB_REQUEST_TIMEOUT = "60 secs";
public static final String DEFAULT_NAR_WORKING_DIR = "./work/nar";
public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components";
@ -673,8 +675,12 @@ public class NiFiProperties extends ApplicationProperties {
return getProperty(WEB_MAX_CONTENT_SIZE);
}
public String getMaxWebRequestsPerSecond() {
return getProperty(WEB_MAX_REQUESTS_PER_SECOND, DEFAULT_WEB_MAX_REQUESTS_PER_SECOND);
public Integer getMaxWebRequestsPerSecond() {
return getIntegerProperty(WEB_MAX_REQUESTS_PER_SECOND, DEFAULT_WEB_MAX_REQUESTS_PER_SECOND);
}
public Integer getMaxWebAccessTokenRequestsPerSecond() {
return getIntegerProperty(WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND, DEFAULT_WEB_MAX_ACCESS_TOKEN_REQUESTS_PER_SECOND);
}
public String getWebRequestTimeout() {

View File

@ -3599,6 +3599,7 @@ host[:port] that NiFi is bound to.
blank meaning all requests containing a proxy context path are rejected. Configuring this property would allow requests where the proxy path is contained in this listing.
|`nifi.web.max.content.size`|The maximum size (HTTP `Content-Length`) for PUT and POST requests. No default value is set for backward compatibility. Providing a value for this property enables the `Content-Length` filter on all incoming API requests (except Site-to-Site and cluster communications). A suggested value is `20 MB`.
|`nifi.web.max.requests.per.second`|The maximum number of requests from a connection per second. Requests in excess of this are first delayed, then throttled.
|`nifi.web.max.access.token.requests.per.second`|The maximum number of requests for login Access Tokens from a connection per second. Requests in excess of this are rejected with HTTP 429.
|`nifi.web.request.ip.whitelist`|A comma separated list of IP addresses. Used to specify the IP addresses of clients which can exceed the maximum requests per second (`nifi.web.max.requests.per.second`). Does not apply to web request timeout.
|`nifi.web.request.timeout`|The request timeout for web requests. Requests running longer than this time will be forced to end with a HTTP 503 Service Unavailable response. Default value is `60 secs`.
|====

View File

@ -145,6 +145,7 @@
<nifi.web.proxy.host />
<nifi.web.max.content.size />
<nifi.web.max.requests.per.second>30000</nifi.web.max.requests.per.second>
<nifi.web.max.access.token.requests.per.second>25</nifi.web.max.access.token.requests.per.second>
<nifi.web.request.timeout>60 secs</nifi.web.request.timeout>
<nifi.web.request.ip.whitelist />
<nifi.web.should.send.server.version>true</nifi.web.should.send.server.version>

View File

@ -166,6 +166,7 @@ nifi.web.proxy.context.path=${nifi.web.proxy.context.path}
nifi.web.proxy.host=${nifi.web.proxy.host}
nifi.web.max.content.size=${nifi.web.max.content.size}
nifi.web.max.requests.per.second=${nifi.web.max.requests.per.second}
nifi.web.max.access.token.requests.per.second=${nifi.web.max.access.token.requests.per.second}
nifi.web.request.timeout=${nifi.web.request.timeout}
nifi.web.request.ip.whitelist=${nifi.web.request.ip.whitelist}
nifi.web.should.send.server.version=${nifi.web.should.send.server.version}

View File

@ -136,6 +136,16 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private static final String CONTAINER_INCLUDE_PATTERN_KEY = "org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern";
private static final String CONTAINER_INCLUDE_PATTERN_VALUE = ".*/[^/]*servlet-api-[^/]*\\.jar$|.*/javax.servlet.jsp.jstl-.*\\\\.jar$|.*/[^/]*taglibs.*\\.jar$";
private static final String CONTEXT_PATH_ALL = "/*";
private static final String CONTEXT_PATH_ROOT = "/";
private static final String CONTEXT_PATH_NIFI = "/nifi";
private static final String CONTEXT_PATH_NIFI_API = "/nifi-api";
private static final String CONTEXT_PATH_NIFI_CONTENT_VIEWER = "/nifi-content-viewer";
private static final String CONTEXT_PATH_NIFI_DOCS = "/nifi-docs";
private static final String RELATIVE_PATH_ACCESS_TOKEN = "/access/token";
private static final int DOS_FILTER_REJECT_REQUEST = -1;
private static final FileFilter WAR_FILTER = pathname -> {
final String nameToTest = pathname.getName().toLowerCase();
return nameToTest.endsWith(".war") && pathname.isFile();
@ -197,8 +207,8 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
// Only restrict the host header if running in HTTPS mode
if (props.isHTTPSConfigured()) {
// Create a handler for the host header and add it to the server
HostHeaderHandler hostHeaderHandler = new HostHeaderHandler(props);
logger.info("Created HostHeaderHandler [" + hostHeaderHandler.toString() + "]");
final HostHeaderHandler hostHeaderHandler = new HostHeaderHandler(props);
logger.info("Created HostHeaderHandler [{}}]", hostHeaderHandler);
// Add this before the WAR handlers
allHandlers.addHandler(hostHeaderHandler);
@ -283,7 +293,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
final ClassLoader frameworkClassLoader = getClass().getClassLoader();
// load the web ui app
final WebAppContext webUiContext = loadWar(webUiWar, "/nifi", frameworkClassLoader);
final WebAppContext webUiContext = loadWar(webUiWar, CONTEXT_PATH_NIFI, frameworkClassLoader);
webUiContext.getInitParams().put("oidc-supported", String.valueOf(props.isOidcEnabled()));
webUiContext.getInitParams().put("knox-supported", String.valueOf(props.isKnoxSsoEnabled()));
webUiContext.getInitParams().put("saml-supported", String.valueOf(props.isSamlEnabled()));
@ -292,19 +302,16 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
webAppContextHandlers.addHandler(webUiContext);
// load the web api app
webApiContext = loadWar(webApiWar, "/nifi-api", frameworkClassLoader);
webApiContext = loadWar(webApiWar, CONTEXT_PATH_NIFI_API, frameworkClassLoader);
webAppContextHandlers.addHandler(webApiContext);
// load the content viewer app
webContentViewerContext = loadWar(webContentViewerWar, "/nifi-content-viewer", frameworkClassLoader);
webContentViewerContext = loadWar(webContentViewerWar, CONTEXT_PATH_NIFI_CONTENT_VIEWER, frameworkClassLoader);
webContentViewerContext.getInitParams().putAll(extensionUiInfo.getMimeMappings());
webAppContextHandlers.addHandler(webContentViewerContext);
// create a web app for the docs
final String docsContextPath = "/nifi-docs";
// load the documentation war
webDocsContext = loadWar(webDocsWar, docsContextPath, frameworkClassLoader);
webDocsContext = loadWar(webDocsWar, CONTEXT_PATH_NIFI_DOCS, frameworkClassLoader);
// add the servlets which serve the HTML documentation within the documentation web app
addDocsServlets(webDocsContext);
@ -312,7 +319,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
webAppContextHandlers.addHandler(webDocsContext);
// load the web error app
final WebAppContext webErrorContext = loadWar(webErrorWar, "/", frameworkClassLoader);
final WebAppContext webErrorContext = loadWar(webErrorWar, CONTEXT_PATH_ROOT, frameworkClassLoader);
webErrorContext.getInitParams().put("allowedContextPaths", props.getAllowedContextPaths());
webAppContextHandlers.addHandler(webErrorContext);
@ -617,7 +624,6 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
// add HTTP security headers to all responses
// TODO: Allow more granular path configuration (e.g. /nifi-api/site-to-site/ vs. /nifi-api/process-groups)
final String ALL_PATHS = "/*";
ArrayList<Class<? extends Filter>> filters =
new ArrayList<>(Arrays.asList(
XFrameOptionsFilter.class,
@ -628,8 +634,12 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
if(props.isHTTPSConfigured()) {
filters.add(StrictTransportSecurityFilter.class);
}
filters.forEach((filter) -> addFilters(filter, ALL_PATHS, webappContext));
addDenialOfServiceFilters(ALL_PATHS, webappContext, props);
filters.forEach((filter) -> addFilters(filter, webappContext));
addDenialOfServiceFilters(webappContext, props);
if (CONTEXT_PATH_NIFI_API.equals(contextPath)) {
addAccessTokenRequestFilter(webappContext, props);
}
try {
// configure the class loader - webappClassLoader -> jetty nar -> web app's nar -> ...
@ -642,10 +652,10 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
return webappContext;
}
private void addFilters(Class<? extends Filter> clazz, String path, WebAppContext webappContext) {
private void addFilters(Class<? extends Filter> clazz, WebAppContext webappContext) {
FilterHolder holder = new FilterHolder(clazz);
holder.setName(clazz.getSimpleName());
webappContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class));
webappContext.addFilter(holder, CONTEXT_PATH_ALL, EnumSet.allOf(DispatcherType.class));
}
private void addDocsServlets(WebAppContext docsContext) {
@ -695,40 +705,23 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
* Currently, this implementation adds
* {@link org.eclipse.jetty.servlets.DoSFilter} and {@link ContentLengthFilter} filters.
*
* @param path path spec for filters ({@code /*} by convention in this class)
* @param webAppContext context to which filters will be added
* @param props the {@link NiFiProperties}
*/
private static void addDenialOfServiceFilters(String path, WebAppContext webAppContext, NiFiProperties props) {
// Add the requests rate limiting filter to all requests
final int maxWebRequestsPerSecond = determineMaxWebRequestsPerSecond(props);
final long requestTimeoutInMilliseconds = determineRequestTimeoutInMilliseconds(props);
final String ipWhitelist = props.getWebRequestIpWhitelist();
addWebRequestLimitingFilter(path, webAppContext, maxWebRequestsPerSecond, ipWhitelist, requestTimeoutInMilliseconds);
private static void addDenialOfServiceFilters(final WebAppContext webAppContext, final NiFiProperties props) {
addWebRequestLimitingFilter(webAppContext, props.getMaxWebRequestsPerSecond(), getWebRequestTimeoutMs(props), props.getWebRequestIpWhitelist());
// Only add the ContentLengthFilter if the property is explicitly set (empty by default)
int maxRequestSize = determineMaxRequestSize(props);
final int maxRequestSize = determineMaxRequestSize(props);
if (maxRequestSize > 0) {
addContentLengthFilter(path, webAppContext, maxRequestSize);
addContentLengthFilter(webAppContext, maxRequestSize);
} else {
logger.debug("Not adding content-length filter because {} is not set in nifi.properties", NiFiProperties.WEB_MAX_CONTENT_SIZE);
}
}
private static int determineMaxWebRequestsPerSecond(final NiFiProperties props) {
int defaultMaxRequestsPerSecond = Integer.parseInt(NiFiProperties.DEFAULT_WEB_MAX_REQUESTS_PER_SECOND);
int configuredMaxRequestsPerSecond = 0;
try {
configuredMaxRequestsPerSecond = Integer.parseInt(props.getMaxWebRequestsPerSecond());
} catch (final NumberFormatException e) {
logger.warn("Exception parsing property [{}]; using default value: [{}]", NiFiProperties.WEB_MAX_REQUESTS_PER_SECOND, defaultMaxRequestsPerSecond);
}
return configuredMaxRequestsPerSecond > 0 ? configuredMaxRequestsPerSecond : defaultMaxRequestsPerSecond;
}
private static long determineRequestTimeoutInMilliseconds(final NiFiProperties props) {
long defaultRequestTimeout = Math.round(FormatUtils.getPreciseTimeDuration(NiFiProperties.DEFAULT_WEB_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS));
private static long getWebRequestTimeoutMs(final NiFiProperties props) {
final long defaultRequestTimeout = Math.round(FormatUtils.getPreciseTimeDuration(NiFiProperties.DEFAULT_WEB_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS));
long configuredRequestTimeout = 0L;
try {
configuredRequestTimeout = Math.round(FormatUtils.getPreciseTimeDuration(props.getWebRequestTimeout(), TimeUnit.MILLISECONDS));
@ -744,24 +737,41 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
* In order to allow clients to make more requests than the maximum rate, clients can be added to the {@code ipWhitelist}.
* The {@code requestTimeoutInMilliseconds} value limits requests to the given request timeout amount, and will close connections that run longer than this time.
*
* @param path the path to apply this filter
* @param webAppContext the context to apply this filter
* @param maxWebRequestsPerSecond the maximum number of allowed requests per second
* @param ipWhitelist a whitelist of IP addresses that should not be rate limited. Does not apply to request timeout
* @param requestTimeoutInMilliseconds the amount of time before a connection will be automatically closed
* @param webAppContext Web Application Context where Filter will be added
* @param maxRequestsPerSec Maximum number of allowed requests per second
* @param maxRequestMs Maximum amount of time in milliseconds before a connection will be automatically closed
* @param allowed Comma-separated string of IP addresses that should not be rate limited. Does not apply to request timeout
*/
private static void addWebRequestLimitingFilter(String path, WebAppContext webAppContext, int maxWebRequestsPerSecond, final String ipWhitelist, long requestTimeoutInMilliseconds) {
FilterHolder holder = new FilterHolder(DoSFilter.class);
private static void addWebRequestLimitingFilter(final WebAppContext webAppContext, final int maxRequestsPerSec, final long maxRequestMs, final String allowed) {
final FilterHolder holder = new FilterHolder(DoSFilter.class);
holder.setInitParameters(new HashMap<String, String>() {{
put("maxRequestsPerSec", String.valueOf(maxWebRequestsPerSecond));
put("maxRequestMs", String.valueOf(requestTimeoutInMilliseconds));
put("ipWhitelist", String.valueOf(ipWhitelist));
put("maxRequestsPerSec", Integer.toString(maxRequestsPerSec));
put("maxRequestMs", Long.toString(maxRequestMs));
put("ipWhitelist", allowed);
}});
holder.setName(DoSFilter.class.getSimpleName());
logger.debug("Adding DoSFilter to context at path: [{}] with max req/sec: [{}], request timeout: [{}] ms. Whitelisted IPs not subject to filter: [{}]",
path, maxWebRequestsPerSecond, requestTimeoutInMilliseconds, StringUtils.defaultIfEmpty(ipWhitelist, "none"));
webAppContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class));
webAppContext.addFilter(holder, CONTEXT_PATH_ALL, EnumSet.allOf(DispatcherType.class));
logger.debug("Added DoSFilter Path [{}] Max Requests Per Second [{}] Request Timeout [{} ms] Allowed [{}]", CONTEXT_PATH_ALL, maxRequestsPerSec, maxRequestMs, allowed);
}
private static void addAccessTokenRequestFilter(final WebAppContext webAppContext, final NiFiProperties properties) {
final int maxRequestsPerSec = properties.getMaxWebAccessTokenRequestsPerSecond();
final long maxRequestMs = getWebRequestTimeoutMs(properties);
final String webRequestAllowed = properties.getWebRequestIpWhitelist();
final FilterHolder holder = new FilterHolder(DoSFilter.class);
holder.setInitParameters(new HashMap<String, String>() {{
put("maxRequestsPerSec", Integer.toString(maxRequestsPerSec));
put("maxRequestMs", Long.toString(maxRequestMs));
put("ipWhitelist", webRequestAllowed);
put("maxWaitMs", Integer.toString(DOS_FILTER_REJECT_REQUEST));
put("delayMs", Integer.toString(DOS_FILTER_REJECT_REQUEST));
}});
holder.setName("AccessTokenRequest-DoSFilter");
webAppContext.addFilter(holder, RELATIVE_PATH_ACCESS_TOKEN, EnumSet.allOf(DispatcherType.class));
logger.debug("Added DoSFilter Path [{}] Max Requests Per Second [{}] Request Timeout [{} ms] Allowed [{}]", RELATIVE_PATH_ACCESS_TOKEN, maxRequestsPerSec, maxRequestMs, webRequestAllowed);
}
private static int determineMaxRequestSize(NiFiProperties props) {
@ -782,14 +792,14 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
return -1;
}
private static void addContentLengthFilter(String path, WebAppContext webAppContext, int maxContentLength) {
FilterHolder holder = new FilterHolder(ContentLengthFilter.class);
private static void addContentLengthFilter(final WebAppContext webAppContext, int maxContentLength) {
final FilterHolder holder = new FilterHolder(ContentLengthFilter.class);
holder.setInitParameters(new HashMap<String, String>() {{
put("maxContentLength", String.valueOf(maxContentLength));
}});
holder.setName(ContentLengthFilter.class.getSimpleName());
logger.debug("Adding ContentLengthFilter to context at path: " + path + " with max request size: " + maxContentLength + "B");
webAppContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class));
logger.debug("Adding ContentLengthFilter to Path [{}] with Maximum Content Length [{}B]", CONTEXT_PATH_ALL, maxContentLength);
webAppContext.addFilter(holder, CONTEXT_PATH_ALL, EnumSet.allOf(DispatcherType.class));
}
/**

View File

@ -38,9 +38,7 @@ import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.junit.After
import org.junit.AfterClass
import org.junit.Assume
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.Test
@ -116,16 +114,6 @@ class JettyServerGroovyTest extends GroovyTestCase {
TestAppender.reset()
}
@AfterClass
static void tearDownOnce() throws Exception {
}
@Before
void setUp() throws Exception {
}
@After
void tearDown() throws Exception {
// Cleans up the EMH so it can be reinitialized when a new Jetty server starts
@ -483,12 +471,10 @@ class JettyServerGroovyTest extends GroovyTestCase {
JettyServer jettyServer = new JettyServer(new Server(), mockProps)
logger.info("Created JettyServer: ${jettyServer.dump()}")
String path = "/mock"
final int MAX_CONTENT_LENGTH_BYTES = DataUnit.parseDataSize(defaultProps[NiFiProperties.WEB_MAX_CONTENT_SIZE], DataUnit.B).intValue()
// Act
jettyServer.addDenialOfServiceFilters(path, mockWebContext, mockProps)
jettyServer.addDenialOfServiceFilters(mockWebContext, mockProps)
// Assert
assert filters.size() == 2
@ -526,10 +512,8 @@ class JettyServerGroovyTest extends GroovyTestCase {
JettyServer jettyServer = new JettyServer(new Server(), mockProps)
logger.info("Created JettyServer: ${jettyServer.dump()}")
String path = "/mock"
// Act
jettyServer.addDenialOfServiceFilters(path, mockWebContext, mockProps)
jettyServer.addDenialOfServiceFilters(mockWebContext, mockProps)
// Assert
assert filters.size() == 1