NIFI-7304 Removed default value for nifi.web.max.content.size.

Added Bundle#toString() method.
Refactored implementation of filter addition logic.
Added logging.
Added unit tests to check for filter enablement.
Introduced content-length exception handling in StandardPublicPort.
Added filter bypass functionality for framework requests in ContentLengthFilter.
Updated property documentation in Admin Guide.
Renamed methods & added Javadoc to clarify purpose of filters in JettyServer.
Cleaned up conditional logic in StandardPublicPort.
Moved ContentLengthFilterTest to correct module.
Refactored unit tests for accuracy and clarity.
Fixed remaining merge conflict due to method renaming.

Signed-off-by: Joe Witt <joe.witt@gmail.com>
This commit is contained in:
Andy LoPresto 2020-04-01 20:20:38 -07:00
parent 0fa8776f4d
commit dbee774c5b
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
15 changed files with 708 additions and 259 deletions

View File

@ -653,8 +653,15 @@ public abstract class NiFiProperties {
return getProperty(WEB_MAX_HEADER_SIZE, DEFAULT_WEB_MAX_HEADER_SIZE); return getProperty(WEB_MAX_HEADER_SIZE, DEFAULT_WEB_MAX_HEADER_SIZE);
} }
/**
* Returns the {@code nifi.web.max.content.size} value from {@code nifi.properties}.
* Does not provide a default value because the presence of any value here enables the
* {@code ContentLengthFilter}.
*
* @return the specified max content-length and units for an incoming HTTP request
*/
public String getWebMaxContentSize() { public String getWebMaxContentSize() {
return getProperty(WEB_MAX_CONTENT_SIZE, DEFAULT_WEB_MAX_CONTENT_SIZE); return getProperty(WEB_MAX_CONTENT_SIZE);
} }
public String getMaxWebRequestsPerSecond() { public String getMaxWebRequestsPerSecond() {

View File

@ -17,8 +17,9 @@
package org.apache.nifi.util; package org.apache.nifi.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -269,7 +270,7 @@ public class NiFiPropertiesTest {
}}); }});
// Assert defaults match expectations: // Assert defaults match expectations:
assertEquals(properties.getWebMaxContentSize(), "20 MB"); assertNull(properties.getWebMaxContentSize());
// Re-arrange with specific values: // Re-arrange with specific values:
final String size = "size value"; final String size = "size value";

View File

@ -3275,7 +3275,7 @@ For example, when running in a Docker container or behind a proxy (e.g. localhos
host[:port] that NiFi is bound to. host[:port] that NiFi is bound to.
|`nifi.web.proxy.context.path`|A comma separated list of allowed HTTP X-ProxyContextPath, X-Forwarded-Context, or X-Forwarded-Prefix header values to consider. By default, this value is |`nifi.web.proxy.context.path`|A comma separated list of allowed HTTP X-ProxyContextPath, X-Forwarded-Context, or X-Forwarded-Prefix header values to consider. By default, this value is
blank meaning all requests containing a proxy context path are rejected. Configuring this property would allow requests where the proxy path is contained in this listing. blank meaning all requests containing a proxy context path are rejected. Configuring this property would allow requests where the proxy path is contained in this listing.
|`nifi.web.max.content.size`|The maximum size for PUT and POST requests. The default value is `20 MB`. |`nifi.web.max.content.size`|The maximum size (HTTP `Content-Length`) for PUT and POST requests. No default value is set for backward compatibility. Providing a value for this property enables the `Content-Length` filter on all incoming API requests (except Site-to-Site and cluster communications). A suggested value is `20 MB`.
|`nifi.web.max.requests.per.second`|The maximum number of requests from a connection per second. Requests in excess of this are first delayed, then throttled. |`nifi.web.max.requests.per.second`|The maximum number of requests from a connection per second. Requests in excess of this are first delayed, then throttled.
|==== |====

View File

@ -45,4 +45,9 @@ public class Bundle {
public ClassLoader getClassLoader() { public ClassLoader getClassLoader() {
return classLoader; return classLoader;
} }
@Override
public String toString() {
return bundleDetails.toString();
}
} }

View File

@ -991,4 +991,20 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase {
assert niFiProperties.size() == 1 assert niFiProperties.size() == 1
assert niFiProperties.getPropertyKeys() == ["key"] as Set assert niFiProperties.getPropertyKeys() == ["key"] as Set
} }
@Test
void testWebMaxContentSizeShouldDefaultToEmpty() {
// Arrange
Properties rawProps = new Properties(["nifi.web.max.content.size": ""])
NiFiProperties props = new StandardNiFiProperties(rawProps)
logger.info("Created a NiFiProperties instance with empty web max content size property")
// Act
String webMaxContentSize = props.getWebMaxContentSize()
logger.info("Read from NiFiProperties instance: ${webMaxContentSize}")
// Assert
assert webMaxContentSize == ""
}
} }

View File

@ -144,7 +144,7 @@
<nifi.web.max.header.size>16 KB</nifi.web.max.header.size> <nifi.web.max.header.size>16 KB</nifi.web.max.header.size>
<nifi.web.proxy.context.path /> <nifi.web.proxy.context.path />
<nifi.web.proxy.host /> <nifi.web.proxy.host />
<nifi.web.max.content.size>20 MB</nifi.web.max.content.size> <nifi.web.max.content.size/>
<nifi.web.max.requests.per.second>30000</nifi.web.max.requests.per.second> <nifi.web.max.requests.per.second>30000</nifi.web.max.requests.per.second>
<nifi.web.should.send.server.version>true</nifi.web.should.send.server.version> <nifi.web.should.send.server.version>true</nifi.web.should.send.server.version>
<!-- nifi.properties: security properties --> <!-- nifi.properties: security properties -->

View File

@ -16,45 +16,7 @@
*/ */
package org.apache.nifi.remote; package org.apache.nifi.remote;
import org.apache.commons.lang3.builder.ToStringBuilder; import static java.util.Objects.requireNonNull;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataTransferAuthorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.StandardNiFiUser.Builder;
import org.apache.nifi.authorization.util.IdentityMapping;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.authorization.util.UserGroupUtil;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -71,8 +33,48 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.builder.ToStringBuilder;
import static java.util.Objects.requireNonNull; import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataTransferAuthorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.StandardNiFiUser.Builder;
import org.apache.nifi.authorization.util.IdentityMapping;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.authorization.util.UserGroupUtil;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardPublicPort extends AbstractPort implements PublicPort { public class StandardPublicPort extends AbstractPort implements PublicPort {
@ -262,9 +264,20 @@ public class StandardPublicPort extends AbstractPort implements PublicPort {
return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec); return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec);
} }
/**
* Returns {@code true} if the port is not a <em>local</em> input port (remote input ports are
* handled by {@link StandardRemoteGroupPort}), or if the local input port has at least one
* available connection.
*
* @return true if this port is valid
*/
@Override @Override
public boolean isValid() { public boolean isValid() {
return getConnectableType() == ConnectableType.INPUT_PORT ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true; if (getConnectableType() == ConnectableType.INPUT_PORT) {
Set<Connection> availableConnections = getConnections(Relationship.ANONYMOUS);
return !availableConnections.isEmpty();
}
return true;
} }
@Override @Override
@ -561,6 +574,16 @@ public class StandardPublicPort extends AbstractPort implements PublicPort {
throw e; throw e;
} catch (final ProtocolException e) { } catch (final ProtocolException e) {
throw new BadRequestException(e); throw new BadRequestException(e);
} catch (final IOException | FlowFileAccessException e) {
// The content length filter might be blocking the transmission
final String REQUEST_TOO_LONG_MSG = "Request input stream longer than";
if (e.getMessage() != null && e.getMessage().contains(REQUEST_TOO_LONG_MSG)) {
logger.error("The content length filter (configured with {}) is blocking the site-to-site connection: {}", NiFiProperties.WEB_MAX_CONTENT_SIZE, e.getMessage());
// Perhaps BRE causes the sender to back off?
throw new BadRequestException(e);
} else {
throw new ProcessException(e);
}
} catch (final Exception e) { } catch (final Exception e) {
throw new ProcessException(e); throw new ProcessException(e);
} }

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote
import org.apache.nifi.authorization.Authorizer
import org.apache.nifi.connectable.Connectable
import org.apache.nifi.connectable.ConnectableType
import org.apache.nifi.controller.ProcessScheduler
import org.apache.nifi.properties.StandardNiFiProperties
import org.apache.nifi.remote.protocol.CommunicationsSession
import org.apache.nifi.remote.protocol.ServerProtocol
import org.apache.nifi.reporting.BulletinRepository
import org.apache.nifi.util.NiFiProperties
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Ignore
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@RunWith(JUnit4.class)
class StandardPublicPortGroovyTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(StandardPublicPortGroovyTest.class)
@BeforeClass
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() {
}
@After
void tearDown() {
}
private static PublicPort createPublicPort(NiFiProperties niFiProperties) {
Authorizer mockAuthorizer = [:] as Authorizer
BulletinRepository mockBulletinRepository = [:] as BulletinRepository
ProcessScheduler mockProcessScheduler = [registerEvent: { Connectable worker ->
logger.mock("Registered event for worker: ${worker}")
}] as ProcessScheduler
StandardPublicPort spp = new StandardPublicPort("id", "name", TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, mockAuthorizer, mockBulletinRepository, mockProcessScheduler, false, niFiProperties.getBoredYieldDuration(), [])
logger.info("Created SPP with mocked collaborators: ${spp}")
spp
}
// TODO: Implement test
@Ignore("Not yet implemented")
@Test
void testReceiveFlowFilesShouldHandleBlockedRequestDueToContentLength() {
// Arrange
Map badProps = [
(NiFiProperties.WEB_HTTP_HOST) : "localhost",
(NiFiProperties.WEB_HTTPS_HOST): "secure.host.com",
(NiFiProperties.WEB_THREADS) : NiFiProperties.DEFAULT_WEB_THREADS
]
NiFiProperties mockProps = [
getPort : { -> 8080 },
getSslPort : { -> 8443 },
getProperty: { String prop ->
String value = badProps[prop] ?: "no_value"
logger.mock("getProperty(${prop}) -> ${value}")
value
},
] as StandardNiFiProperties
StandardPublicPort port = createPublicPort(mockProps)
final int LISTEN_SECS = 5
PeerDescription peerDescription = new PeerDescription("localhost", 8080, false)
CommunicationsSession mockCommunicationsSession = [:] as CommunicationsSession
Peer peer = new Peer(peerDescription, mockCommunicationsSession, "http://localhost", "")
ServerProtocol mockServerProtocol = [getRequestExpiration: { -> 500L }] as ServerProtocol
// Act
port.onSchedulingStart()
logger.info("Listening on port for ${LISTEN_SECS} seconds")
long end = System.nanoTime() + LISTEN_SECS * 1_000_000_000
def responses = []
while (System.nanoTime() < end) {
responses << port.receiveFlowFiles(peer, mockServerProtocol)
logger.info("Received ${responses[-1]} flowfiles")
}
logger.info("Stopped listening on port")
logger.info("Received ${responses.sum()} total flowfiles")
// Assert
assert !responses.isEmpty()
}
}

View File

@ -209,7 +209,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
/** /**
* Instantiates this object but does not perform any configuration. Used for unit testing. * Instantiates this object but does not perform any configuration. Used for unit testing.
*/ */
JettyServer(Server server, NiFiProperties properties) { JettyServer(Server server, NiFiProperties properties) {
this.server = server; this.server = server;
this.props = properties; this.props = properties;
} }
@ -226,7 +226,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
File webDocsWar = null; File webDocsWar = null;
File webContentViewerWar = null; File webContentViewerWar = null;
Map<File, Bundle> otherWars = new HashMap<>(); Map<File, Bundle> otherWars = new HashMap<>();
for (Map.Entry<File,Bundle> warBundleEntry : warToBundleLookup.entrySet()) { for (Map.Entry<File, Bundle> warBundleEntry : warToBundleLookup.entrySet()) {
final File war = warBundleEntry.getKey(); final File war = warBundleEntry.getKey();
final Bundle warBundle = warBundleEntry.getValue(); final Bundle warBundle = warBundleEntry.getValue();
@ -308,7 +308,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
@Override @Override
public void loadExtensionUis(final Set<Bundle> bundles) { public void loadExtensionUis(final Set<Bundle> bundles) {
// Find and load any WARs contained within the set of bundles... // Find and load any WARs contained within the set of bundles...
final Map<File, Bundle> warToBundleLookup = findWars(bundles); final Map<File, Bundle> warToBundleLookup = findWars(bundles);
final ExtensionUiInfo extensionUiInfo = loadWars(warToBundleLookup); final ExtensionUiInfo extensionUiInfo = loadWars(warToBundleLookup);
@ -350,7 +350,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
} }
private ExtensionUiInfo loadWars(final Map<File, Bundle> warToBundleLookup) { private ExtensionUiInfo loadWars(final Map<File, Bundle> warToBundleLookup) {
// handlers for each war and init params for the web api // handlers for each war and init params for the web api
final List<WebAppContext> webAppContexts = new ArrayList<>(); final List<WebAppContext> webAppContexts = new ArrayList<>();
final Map<String, String> mimeMappings = new HashMap<>(); final Map<String, String> mimeMappings = new HashMap<>();
final Collection<WebAppContext> componentUiExtensionWebContexts = new ArrayList<>(); final Collection<WebAppContext> componentUiExtensionWebContexts = new ArrayList<>();
@ -363,7 +363,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
// deploy the other wars // deploy the other wars
if (!warToBundleLookup.isEmpty()) { if (!warToBundleLookup.isEmpty()) {
// ui extension organized by component type // ui extension organized by component type
for (Map.Entry<File,Bundle> warBundleEntry : warToBundleLookup.entrySet()) { for (Map.Entry<File, Bundle> warBundleEntry : warToBundleLookup.entrySet()) {
final File war = warBundleEntry.getKey(); final File war = warBundleEntry.getKey();
final Bundle warBundle = warBundleEntry.getValue(); final Bundle warBundle = warBundleEntry.getValue();
@ -477,6 +477,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
bundles.forEach(bundle -> { bundles.forEach(bundle -> {
final BundleDetails details = bundle.getBundleDetails(); final BundleDetails details = bundle.getBundleDetails();
final File narDependencies = new File(details.getWorkingDirectory(), "NAR-INF/bundled-dependencies"); final File narDependencies = new File(details.getWorkingDirectory(), "NAR-INF/bundled-dependencies");
logger.debug("Attempting to load bundle {} from {}", details, narDependencies.getAbsolutePath());
if (narDependencies.isDirectory()) { if (narDependencies.isDirectory()) {
// list the wars from this nar // list the wars from this nar
final File[] narDependencyDirs = narDependencies.listFiles(WAR_FILTER); final File[] narDependencyDirs = narDependencies.listFiles(WAR_FILTER);
@ -484,6 +485,13 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
throw new IllegalStateException(String.format("Unable to access working directory for NAR dependencies in: %s", narDependencies.getAbsolutePath())); throw new IllegalStateException(String.format("Unable to access working directory for NAR dependencies in: %s", narDependencies.getAbsolutePath()));
} }
if (logger.isDebugEnabled()) {
logger.debug("Found {} available WARs in {}", narDependencyDirs.length, narDependencies.getAbsolutePath());
for (File f : narDependencyDirs) {
logger.debug("\t" + f.getAbsolutePath());
}
}
// add each war // add each war
for (final File war : narDependencyDirs) { for (final File war : narDependencyDirs) {
wars.put(war, bundle); wars.put(war, bundle);
@ -593,6 +601,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
webappContext.setMaxFormContentSize(600000); webappContext.setMaxFormContentSize(600000);
// add HTTP security headers to all responses // add HTTP security headers to all responses
// TODO: Allow more granular path configuration (e.g. /nifi-api/site-to-site/ vs. /nifi-api/process-groups)
final String ALL_PATHS = "/*"; final String ALL_PATHS = "/*";
ArrayList<Class<? extends Filter>> filters = ArrayList<Class<? extends Filter>> filters =
new ArrayList<>(Arrays.asList( new ArrayList<>(Arrays.asList(
@ -604,8 +613,8 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
if(props.isHTTPSConfigured()) { if(props.isHTTPSConfigured()) {
filters.add(StrictTransportSecurityFilter.class); filters.add(StrictTransportSecurityFilter.class);
} }
filters.forEach( (filter) -> addFilters(filter, ALL_PATHS, webappContext)); filters.forEach((filter) -> addFilters(filter, ALL_PATHS, webappContext));
addFiltersWithProps(ALL_PATHS, webappContext); addDenialOfServiceFilters(ALL_PATHS, webappContext, props);
try { try {
// configure the class loader - webappClassLoader -> jetty nar -> web app's nar -> ... // configure the class loader - webappClassLoader -> jetty nar -> web app's nar -> ...
@ -667,11 +676,27 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
} }
/** /**
* Adds configurable filters to the given context. Currently, this implementation adds `DosFilter` and `ContentLengthFilter` filters. * Adds configurable filters relating to preventing denial of service attacks to the given context. Currently, this implementation adds {@link org.eclipse.jetty.servlets.DoSFilter} and {@link ContentLengthFilter} filters.
* @param path path spec for filters *
* @param webappContext context to which filters will be added * @param path path spec for filters ({@code /*} by convention in this class)
* @param webAppContext context to which filters will be added
* @param props the {@link NiFiProperties}
*/ */
private void addFiltersWithProps(String path, WebAppContext webappContext) { private static void addDenialOfServiceFilters(String path, WebAppContext webAppContext, NiFiProperties props) {
// Add the requests rate limiting filter to all requests
int maxWebRequestsPerSecond = determineMaxWebRequestsPerSecond(props);
addWebRequestRateLimitingFilter(path, webAppContext, maxWebRequestsPerSecond);
// Only add the ContentLengthFilter if the property is explicitly set (empty by default)
int maxRequestSize = determineMaxRequestSize(props);
if (maxRequestSize > 0) {
addContentLengthFilter(path, webAppContext, maxRequestSize);
} else {
logger.info("Not adding content-length filter because {} is not set in nifi.properties", NiFiProperties.WEB_MAX_CONTENT_SIZE);
}
}
private static int determineMaxWebRequestsPerSecond(NiFiProperties props) {
int defaultMaxRequestsPerSecond = Integer.parseInt(NiFiProperties.DEFAULT_WEB_MAX_REQUESTS_PER_SECOND); int defaultMaxRequestsPerSecond = Integer.parseInt(NiFiProperties.DEFAULT_WEB_MAX_REQUESTS_PER_SECOND);
int configuredMaxRequestsPerSecond = 0; int configuredMaxRequestsPerSecond = 0;
try { try {
@ -680,31 +705,52 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
logger.warn("Exception parsing property " + NiFiProperties.WEB_MAX_REQUESTS_PER_SECOND + "; using default value: " + defaultMaxRequestsPerSecond); logger.warn("Exception parsing property " + NiFiProperties.WEB_MAX_REQUESTS_PER_SECOND + "; using default value: " + defaultMaxRequestsPerSecond);
} }
int maxRequestsPerSecond = configuredMaxRequestsPerSecond > 0 ? configuredMaxRequestsPerSecond : defaultMaxRequestsPerSecond; return configuredMaxRequestsPerSecond > 0 ? configuredMaxRequestsPerSecond : defaultMaxRequestsPerSecond;
}
/**
* Adds the {@link org.eclipse.jetty.servlets.DoSFilter} to the specified context and path. Limits incoming web requests to {@code maxWebRequestsPerSecond} per second.
*
* @param path the path to apply this filter
* @param webAppContext the context to apply this filter
* @param maxWebRequestsPerSecond the maximum number of allowed requests per second
*/
private static void addWebRequestRateLimitingFilter(String path, WebAppContext webAppContext, int maxWebRequestsPerSecond) {
FilterHolder holder = new FilterHolder(DoSFilter.class); FilterHolder holder = new FilterHolder(DoSFilter.class);
holder.setInitParameters(new HashMap<String, String>(){{ holder.setInitParameters(new HashMap<String, String>() {{
put("maxRequestsPerSec", String.valueOf(maxRequestsPerSecond)); put("maxRequestsPerSec", String.valueOf(maxWebRequestsPerSecond));
}}); }});
holder.setName(DoSFilter.class.getSimpleName()); holder.setName(DoSFilter.class.getSimpleName());
logger.debug("Adding DoSFilter to context at path: " + path + " with max req/sec: " + configuredMaxRequestsPerSecond); logger.debug("Adding DoSFilter to context at path: " + path + " with max req/sec: " + maxWebRequestsPerSecond);
webappContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class)); webAppContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class));
}
int defaultMaxRequestSize = DataUnit.parseDataSize(NiFiProperties.DEFAULT_WEB_MAX_CONTENT_SIZE, DataUnit.B).intValue(); private static int determineMaxRequestSize(NiFiProperties props) {
int configuredMaxRequestSize = 0;
try { try {
configuredMaxRequestSize = DataUnit.parseDataSize(props.getWebMaxContentSize(), DataUnit.B).intValue(); final String webMaxContentSize = props.getWebMaxContentSize();
logger.debug("Read {} as {}", NiFiProperties.WEB_MAX_CONTENT_SIZE, webMaxContentSize);
if (StringUtils.isNotBlank(webMaxContentSize)) {
int configuredMaxRequestSize = DataUnit.parseDataSize(webMaxContentSize, DataUnit.B).intValue();
logger.debug("Parsed max content length as {} bytes", configuredMaxRequestSize);
return configuredMaxRequestSize;
} else {
logger.info("Can't parse valid max content length from {}", webMaxContentSize);
}
} catch (final IllegalArgumentException e) { } catch (final IllegalArgumentException e) {
logger.warn("Exception parsing property " + NiFiProperties.WEB_MAX_CONTENT_SIZE + "; using default value: " + defaultMaxRequestSize); logger.warn("Exception parsing property {}; disabling content length filter", NiFiProperties.WEB_MAX_CONTENT_SIZE);
logger.debug("Error during parsing: ", e);
} }
return -1;
}
int maxRequestSize = configuredMaxRequestSize > 0 ? configuredMaxRequestSize : defaultMaxRequestSize; private static void addContentLengthFilter(String path, WebAppContext webAppContext, int maxContentLength) {
holder = new FilterHolder(ContentLengthFilter.class); FilterHolder holder = new FilterHolder(ContentLengthFilter.class);
holder.setInitParameters(new HashMap<String, String>() {{ holder.setInitParameters(new HashMap<String, String>() {{
put("maxContentLength", String.valueOf(maxRequestSize)); put("maxContentLength", String.valueOf(maxContentLength));
}}); }});
holder.setName(FilterHolder.class.getSimpleName()); holder.setName(ContentLengthFilter.class.getSimpleName());
logger.debug("Adding ContentLengthFilter to context at path: " + path + " with max request size: " + maxRequestSize + "B"); logger.debug("Adding ContentLengthFilter to context at path: " + path + " with max request size: " + maxContentLength + "B");
webappContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class)); webAppContext.addFilter(holder, path, EnumSet.allOf(DispatcherType.class));
} }
/** /**

View File

@ -19,6 +19,7 @@ package org.apache.nifi.web.server
import org.apache.log4j.AppenderSkeleton import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.spi.LoggingEvent
import org.apache.nifi.bundle.Bundle import org.apache.nifi.bundle.Bundle
import org.apache.nifi.processor.DataUnit
import org.apache.nifi.properties.StandardNiFiProperties import org.apache.nifi.properties.StandardNiFiProperties
import org.apache.nifi.security.util.CertificateUtils import org.apache.nifi.security.util.CertificateUtils
import org.apache.nifi.security.util.TlsConfiguration import org.apache.nifi.security.util.TlsConfiguration
@ -29,7 +30,9 @@ import org.eclipse.jetty.server.HttpConfiguration
import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector import org.eclipse.jetty.server.ServerConnector
import org.eclipse.jetty.server.SslConnectionFactory import org.eclipse.jetty.server.SslConnectionFactory
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.util.ssl.SslContextFactory import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.webapp.WebAppContext
import org.junit.After import org.junit.After
import org.junit.AfterClass import org.junit.AfterClass
import org.junit.Assume import org.junit.Assume
@ -48,6 +51,7 @@ import org.slf4j.LoggerFactory
import javax.net.ssl.SSLSocket import javax.net.ssl.SSLSocket
import javax.net.ssl.SSLSocketFactory import javax.net.ssl.SSLSocketFactory
import javax.servlet.DispatcherType
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import java.security.Security import java.security.Security
@ -388,6 +392,83 @@ class JettyServerGroovyTest extends GroovyTestCase {
assert (sslContextFactory._excludeProtocols as List<String>).containsAll(LEGACY_TLS_PROTOCOLS) assert (sslContextFactory._excludeProtocols as List<String>).containsAll(LEGACY_TLS_PROTOCOLS)
assert sslContextFactory._selectedProtocols == EXPECTED_SELECTED_PROTOCOLS as String[] assert sslContextFactory._selectedProtocols == EXPECTED_SELECTED_PROTOCOLS as String[]
} }
@Test
void testShouldEnableContentLengthFilterIfWebMaxContentSizeSet() {
// Arrange
Map defaultProps = [
(NiFiProperties.WEB_HTTP_PORT) : "8080",
(NiFiProperties.WEB_HTTP_HOST) : "localhost",
(NiFiProperties.WEB_MAX_CONTENT_SIZE): "1 MB",
]
NiFiProperties mockProps = new StandardNiFiProperties(new Properties(defaultProps))
List<FilterHolder> filters = []
def mockWebContext = [
addFilter: { FilterHolder fh, String path, EnumSet<DispatcherType> d ->
logger.mock("Called addFilter(${fh.name}, ${path}, ${d})")
filters.add(fh)
fh
}] as WebAppContext
JettyServer jettyServer = new JettyServer(new Server(), mockProps)
logger.info("Created JettyServer: ${jettyServer.dump()}")
String path = "/mock"
final int MAX_CONTENT_LENGTH_BYTES = DataUnit.parseDataSize(defaultProps[NiFiProperties.WEB_MAX_CONTENT_SIZE], DataUnit.B).intValue()
// Act
jettyServer.addDenialOfServiceFilters(path, mockWebContext, mockProps)
// Assert
assert filters.size() == 2
def filterNames = filters*.name
logger.info("Web API Context has ${filters.size()} filters: ${filterNames.join(", ")}".toString())
assert filterNames.contains("DoSFilter")
assert filterNames.contains("ContentLengthFilter")
FilterHolder clfHolder = filters.find { it.name == "ContentLengthFilter" }
String maxContentLength = clfHolder.getInitParameter("maxContentLength")
assert maxContentLength == MAX_CONTENT_LENGTH_BYTES as String
// Filter is not instantiated just by adding it
// ContentLengthFilter clf = filters?.find { it.className == "ContentLengthFilter" }?.filter as ContentLengthFilter
// assert clf.getMaxContentLength() == MAX_CONTENT_LENGTH_BYTES
}
@Test
void testShouldNotEnableContentLengthFilterIfWebMaxContentSizeEmpty() {
// Arrange
Map defaultProps = [
(NiFiProperties.WEB_HTTP_PORT): "8080",
(NiFiProperties.WEB_HTTP_HOST): "localhost",
]
NiFiProperties mockProps = new StandardNiFiProperties(new Properties(defaultProps))
List<FilterHolder> filters = []
def mockWebContext = [
addFilter: { FilterHolder fh, String path, EnumSet<DispatcherType> d ->
logger.mock("Called addFilter(${fh.name}, ${path}, ${d})")
filters.add(fh)
fh
}] as WebAppContext
JettyServer jettyServer = new JettyServer(new Server(), mockProps)
logger.info("Created JettyServer: ${jettyServer.dump()}")
String path = "/mock"
// Act
jettyServer.addDenialOfServiceFilters(path, mockWebContext, mockProps)
// Assert
assert filters.size() == 1
def filterNames = filters*.name
logger.info("Web API Context has ${filters.size()} filters: ${filterNames.join(", ")}".toString())
assert filterNames.contains("DoSFilter")
assert !filterNames.contains("ContentLengthFilter")
}
} }
class TestAppender extends AppenderSkeleton { class TestAppender extends AppenderSkeleton {

View File

@ -1,185 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.filter;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.web.security.requests.ContentLengthFilter;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ContentLengthFilterTest {
private static final Logger logger = LoggerFactory.getLogger(ContentLengthFilterTest.class);
private static final int MAX_CONTENT_LENGTH = 1000;
private static final int SERVER_IDLE_TIMEOUT = 2500; // only one request needed + value large enough for slow systems
private static final String POST_REQUEST = "POST / HTTP/1.1\r\nContent-Length: %d\r\nHost: h\r\n\r\n%s";
private static final String FORM_REQUEST = "POST / HTTP/1.1\r\nContent-Length: %d\r\nHost: h\r\nContent-Type: application/x-www-form-urlencoded\r\nAccept-Charset: UTF-8\r\n\r\n%s";
public static final int FORM_CONTENT_SIZE = 128;
private Server serverUnderTest;
private LocalConnector localConnector;
private ServletContextHandler contextUnderTest;
@After
public void stopServer() throws Exception {
if (serverUnderTest != null && serverUnderTest.isRunning()) {
serverUnderTest.stop();
}
}
private void configureAndStartServer(HttpServlet servlet, int maxFormContentSize) throws Exception {
serverUnderTest = new Server();
localConnector = new LocalConnector(serverUnderTest);
localConnector.setIdleTimeout(SERVER_IDLE_TIMEOUT);
serverUnderTest.addConnector(localConnector);
contextUnderTest = new ServletContextHandler(serverUnderTest, "/");
if (maxFormContentSize > 0) {
contextUnderTest.setMaxFormContentSize(maxFormContentSize);
}
contextUnderTest.addServlet(new ServletHolder(servlet), "/*");
// This only adds the ContentLengthFilter if a valid maxFormContentSize is not provided
if (maxFormContentSize < 0) {
FilterHolder holder = contextUnderTest.addFilter(ContentLengthFilter.class, "/*", EnumSet.of(DispatcherType.REQUEST));
holder.setInitParameter(ContentLengthFilter.MAX_LENGTH_INIT_PARAM, String.valueOf(MAX_CONTENT_LENGTH));
}
serverUnderTest.start();
}
@Test
public void testRequestsWithMissingContentLengthHeader() throws Exception {
configureAndStartServer(new HttpServlet() {
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
ServletInputStream input = req.getInputStream();
while (!input.isFinished()) {
input.read();
}
resp.setStatus(HttpServletResponse.SC_OK);
}
}, -1);
// This shows that the ContentLengthFilter allows a request that does not have a content-length header.
String response = localConnector.getResponse("POST / HTTP/1.0\r\n\r\n");
Assert.assertFalse(StringUtils.containsIgnoreCase(response, "411 Length Required"));
}
@Test
public void testRequestsWithContentLengthHeader() throws Exception {
configureAndStartServer(new HttpServlet() {
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
ServletInputStream input = req.getInputStream();
while (!input.isFinished()) {
input.read();
}
resp.setStatus(HttpServletResponse.SC_OK);
}
}, -1);
int smallClaim = 150;
int largeClaim = 2000;
String incompletePayload = StringUtils.repeat("1", 10);
String largePayload = StringUtils.repeat("1", largeClaim + 200);
// This shows that the ContentLengthFilter rejects a request when the client claims more than the max + sends more than the max:
String response = localConnector.getResponse(String.format(POST_REQUEST, largeClaim, largePayload));
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "413 Payload Too Large"));
// This shows that the ContentLengthFilter rejects a request when the client claims more than the max + sends less the max:
response = localConnector.getResponse(String.format(POST_REQUEST, largeClaim, incompletePayload));
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "413 Payload Too Large"));
// This shows that the ContentLengthFilter allows a request when it claims less than the max + sends more than the max:
response = localConnector.getResponse(String.format(POST_REQUEST, smallClaim, largePayload));
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "200 OK"));
// This shows that the server times out when the client claims less than the max + sends less than the max + sends less than it claims to send:
response = localConnector.getResponse(String.format(POST_REQUEST, smallClaim, incompletePayload), 500, TimeUnit.MILLISECONDS);
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "500 Server Error"));
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "Timeout"));
}
@Test
public void testJettyMaxFormSize() throws Exception {
// This shows that the jetty server option for 'maxFormContentSize' is insufficient for our needs because it
// catches requests like this:
// Configure the server but do not apply the CLF because the FORM_CONTENT_SIZE > 0
configureAndStartServer(new HttpServlet() {
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
req.getParameterMap();
ServletInputStream input = req.getInputStream();
int count = 0;
while (!input.isFinished()) {
input.read();
count += 1;
}
final int FORM_LIMIT_BYTES = FORM_CONTENT_SIZE + "a=\n".length();
if (count > FORM_LIMIT_BYTES) {
logger.warn("Bytes read ({}) is larger than the limit ({})", count, FORM_LIMIT_BYTES);
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Should not reach this code.");
} else {
logger.warn("Bytes read ({}) is less than or equal to the limit ({})", count, FORM_LIMIT_BYTES);
resp.sendError(HttpServletResponse.SC_EXPECTATION_FAILED, "Read Too Many Bytes");
}
} catch (final Exception e) {
// This is the jetty context returning a 400 from the maxFormContentSize setting:
if (StringUtils.containsIgnoreCase(e.getCause().toString(), "Form is larger than max length " + FORM_CONTENT_SIZE)) {
logger.warn("Exception thrown by input stream: ", e);
resp.sendError(HttpServletResponse.SC_REQUEST_ENTITY_TOO_LARGE, "Payload Too Large");
} else {
logger.warn("Exception thrown by input stream: ", e);
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Should not reach this code, either.");
}
}
}
}, FORM_CONTENT_SIZE);
// Test to catch a form submission that exceeds the FORM_CONTENT_SIZE limit
String form = "a=" + StringUtils.repeat("1", FORM_CONTENT_SIZE);
String response = localConnector.getResponse(String.format(FORM_REQUEST, form.length(), form));
logger.info("Response: " + response);
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "413 Payload Too Large"));
// But it does not catch requests like this:
response = localConnector.getResponse(String.format(POST_REQUEST, form.length(), form+form));
Assert.assertTrue(StringUtils.containsIgnoreCase(response, "417 Read Too Many Bytes"));
}
}

View File

@ -31,6 +31,8 @@
<logger name="org.apache.nifi" level="INFO"/> <logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.web.api" level="DEBUG"/> <logger name="org.apache.nifi.web.api" level="DEBUG"/>
<logger name="org.apache.nifi.web.server" level="DEBUG"/>
<logger name="org.apache.nifi.web.security.requests" level="TRACE"/>
<root level="INFO"> <root level="INFO">
<appender-ref ref="CONSOLE"/> <appender-ref ref="CONSOLE"/>
</root> </root>

View File

@ -17,6 +17,8 @@
package org.apache.nifi.web.security.requests; package org.apache.nifi.web.security.requests;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import javax.servlet.Filter; import javax.servlet.Filter;
import javax.servlet.FilterChain; import javax.servlet.FilterChain;
import javax.servlet.FilterConfig; import javax.servlet.FilterConfig;
@ -40,6 +42,8 @@ public class ContentLengthFilter implements Filter {
public final static int MAX_LENGTH_DEFAULT = 10_000_000; public final static int MAX_LENGTH_DEFAULT = 10_000_000;
private int maxContentLength; private int maxContentLength;
private static final List<String> BYPASS_URI_PREFIXES = Arrays.asList("/nifi-api/data-transfer", "/nifi-api/site-to-site");
public void init() { public void init() {
maxContentLength = MAX_LENGTH_DEFAULT; maxContentLength = MAX_LENGTH_DEFAULT;
logger.debug("Filter initialized without configuration and set max content length: " + formatSize(maxContentLength)); logger.debug("Filter initialized without configuration and set max content length: " + formatSize(maxContentLength));
@ -61,6 +65,13 @@ public class ContentLengthFilter implements Filter {
HttpServletRequest httpRequest = (HttpServletRequest) request; HttpServletRequest httpRequest = (HttpServletRequest) request;
String httpMethod = httpRequest.getMethod(); String httpMethod = httpRequest.getMethod();
// If the request is in the framework allow list, do not evaluate or block based on content length
if (!isSubjectToFilter(httpRequest)) {
logger.trace("Request {} is not subject to content length checks", httpRequest.getRequestURI());
chain.doFilter(request, response);
return;
}
// Check the HTTP method because the spec says clients don't have to send a content-length header for methods // Check the HTTP method because the spec says clients don't have to send a content-length header for methods
// that don't use it. So even though an attacker may provide a large body in a GET request, the body should go // that don't use it. So even though an attacker may provide a large body in a GET request, the body should go
// unread and a size filter is unneeded at best. See RFC 2616 section 14.13, and RFC 1945 section 10.4. // unread and a size filter is unneeded at best. See RFC 2616 section 14.13, and RFC 1945 section 10.4.
@ -90,6 +101,31 @@ public class ContentLengthFilter implements Filter {
public void destroy() { public void destroy() {
} }
/**
* Returns the currently configured max content length in bytes.
*
* @return the max content length
*/
public int getMaxContentLength() {
return maxContentLength;
}
/**
* Returns {@code true} if this request is subject to the filter operation, {@code false} if not.
*
* @param request the incoming request
* @return true if this request should be filtered
*/
private boolean isSubjectToFilter(HttpServletRequest request) {
for (String uriPrefix : BYPASS_URI_PREFIXES) {
if (request.getRequestURI().startsWith(uriPrefix)) {
logger.debug("Incoming request {} matches filter bypass prefix {}; content length filter is not applied", request.getRequestURI(), uriPrefix);
return false;
}
}
return true;
}
/** /**
* Formats a value like {@code 1048576} to {@code 1 MB} for easier human consumption. * Formats a value like {@code 1048576} to {@code 1 MB} for easier human consumption.
* *

View File

@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.security.requests
import org.apache.commons.lang3.StringUtils
import org.apache.nifi.stream.io.StreamUtils
import org.eclipse.jetty.server.LocalConnector
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.servlet.DispatcherType
import javax.servlet.ServletException
import javax.servlet.ServletInputStream
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import java.util.concurrent.TimeUnit
@RunWith(JUnit4.class)
class ContentLengthFilterTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(ContentLengthFilterTest.class)
private static final int MAX_CONTENT_LENGTH = 1000
private static final int SERVER_IDLE_TIMEOUT = 2500 // only one request needed + value large enough for slow systems
private static final String POST_REQUEST = "POST / HTTP/1.1\r\nContent-Length: %d\r\nHost: h\r\n\r\n%s"
private static final String FORM_REQUEST = "POST / HTTP/1.1\r\nContent-Length: %d\r\nHost: h\r\nContent-Type: application/x-www-form-urlencoded\r\nAccept-Charset: UTF-8\r\n\r\n%s"
public static final int FORM_CONTENT_SIZE = 128
// These variables hold data for content small enough to be allowed
private static final int SMALL_CLAIM_SIZE_BYTES = 150
private static final String SMALL_PAYLOAD = "1" * SMALL_CLAIM_SIZE_BYTES
// These variables hold data for content too large to be allowed
private static final int LARGE_CLAIM_SIZE_BYTES = 2000
private static final String LARGE_PAYLOAD = "1" * LARGE_CLAIM_SIZE_BYTES
private Server serverUnderTest
private LocalConnector localConnector
private ServletContextHandler contextUnderTest
@BeforeClass
static void setUpOnce() {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() {
}
@After
void tearDown() {
stopServer()
}
void stopServer() throws Exception {
if (serverUnderTest && serverUnderTest.isRunning()) {
serverUnderTest.stop()
}
}
private void configureAndStartServer(HttpServlet servlet, int maxFormContentSize) throws Exception {
serverUnderTest = new Server()
localConnector = new LocalConnector(serverUnderTest)
localConnector.setIdleTimeout(SERVER_IDLE_TIMEOUT)
serverUnderTest.addConnector(localConnector)
contextUnderTest = new ServletContextHandler(serverUnderTest, "/")
if (maxFormContentSize > 0) {
contextUnderTest.setMaxFormContentSize(maxFormContentSize)
}
contextUnderTest.addServlet(new ServletHolder(servlet), "/*")
// This only adds the ContentLengthFilter if a valid maxFormContentSize is not provided
if (maxFormContentSize < 0) {
FilterHolder holder = contextUnderTest.addFilter(ContentLengthFilter.class, "/*", EnumSet.of(DispatcherType.REQUEST) as EnumSet<DispatcherType>)
holder.setInitParameter(ContentLengthFilter.MAX_LENGTH_INIT_PARAM, String.valueOf(MAX_CONTENT_LENGTH))
}
serverUnderTest.start()
}
/**
* Initializes a server which consumes any provided request input stream and returns HTTP 200. It has no
* {@code maxFormContentSize}, so the {@link ContentLengthFilter} is applied. The response contains a header and the
* response body indicating the total number of request content bytes read.
*
* @throws Exception if there is a problem setting up the server
*/
private void createSimpleReadServer() throws Exception {
HttpServlet mockServlet = [
doPost: { HttpServletRequest req, HttpServletResponse resp ->
byte[] byteBuffer = new byte[2048]
int bytesRead = StreamUtils.fillBuffer(req.getInputStream(), byteBuffer, false)
resp.setHeader("Bytes-Read", bytesRead as String)
resp.setStatus(HttpServletResponse.SC_OK)
resp.getWriter().write("Read ${bytesRead} bytes of request input")
}
] as HttpServlet
configureAndStartServer(mockServlet, -1)
}
private static void logResponse(String response, String s = "Response: ") {
String responseId = String.valueOf(System.currentTimeMillis() % 100)
final String delimiterLine = "\n-----" + responseId + "-----\n"
String formattedResponse = s + delimiterLine + response + delimiterLine
logger.info(formattedResponse)
}
@Test
void testRequestsWithMissingContentLengthHeader() throws Exception {
createSimpleReadServer()
// This shows that the ContentLengthFilter allows a request that does not have a content-length header.
String response = localConnector.getResponse("POST / HTTP/1.0\r\n\r\n")
Assert.assertFalse(StringUtils.containsIgnoreCase(response, "411 Length Required"))
}
/**
* This shows that the ContentLengthFilter rejects a request when the client claims more than the max + sends more than
* the max.
*/
@Test
void testShouldRejectRequestWithLongContentLengthHeader() throws Exception {
// Arrange
createSimpleReadServer()
final String requestBody = String.format(POST_REQUEST, LARGE_CLAIM_SIZE_BYTES, LARGE_PAYLOAD)
logger.info("Making request with CL: ${LARGE_CLAIM_SIZE_BYTES} and actual length: ${LARGE_PAYLOAD.length()}")
// Act
String response = localConnector.getResponse(requestBody)
logResponse(response)
// Assert
assert response =~ "413 Payload Too Large"
}
/**
* This shows that the ContentLengthFilter rejects a request when the client claims more than the max + sends less than
* the claim.
*/
@Test
void testShouldRejectRequestWithLongContentLengthHeaderAndSmallPayload() throws Exception {
// Arrange
createSimpleReadServer()
String incompletePayload = "1" * (SMALL_CLAIM_SIZE_BYTES / 2)
final String requestBody = String.format(POST_REQUEST, LARGE_CLAIM_SIZE_BYTES, incompletePayload)
logger.info("Making request with CL: ${LARGE_CLAIM_SIZE_BYTES} and actual length: ${incompletePayload.length()}")
// Act
String response = localConnector.getResponse(requestBody)
logResponse(response)
// Assert
assert response =~ "413 Payload Too Large"
}
/**
* This shows that the ContentLengthFilter <em>allows</em> a request when the client claims less
* than the max + sends more than the max, but restricts the request body to the stated content
* length size.
*/
@Test
void testShouldRejectRequestWithSmallContentLengthHeaderAndLargePayload() throws Exception {
// Arrange
createSimpleReadServer()
final String requestBody = String.format(POST_REQUEST, SMALL_CLAIM_SIZE_BYTES, LARGE_PAYLOAD)
logger.info("Making request with CL: ${SMALL_CLAIM_SIZE_BYTES} and actual length: ${LARGE_PAYLOAD.length()}")
// Act
String response = localConnector.getResponse(requestBody)
logResponse(response)
// Assert
assert response =~ "200"
assert response =~ "Bytes-Read: ${SMALL_CLAIM_SIZE_BYTES}"
assert response =~ "Read ${SMALL_CLAIM_SIZE_BYTES} bytes"
}
/**
* This shows that the server times out when the client claims less than the max + sends less than the max + sends
* less than it claims to send.
*/
@Test
void testShouldTimeoutRequestWithSmallContentLengthHeaderAndSmallerPayload() throws Exception {
// Arrange
createSimpleReadServer()
String smallerPayload = SMALL_PAYLOAD[0..(SMALL_PAYLOAD.length() / 2)]
final String requestBody = String.format(POST_REQUEST, SMALL_CLAIM_SIZE_BYTES, smallerPayload)
logger.info("Making request with CL: ${SMALL_CLAIM_SIZE_BYTES} and actual length: ${smallerPayload.length()}")
// Act
String response = localConnector.getResponse(requestBody, 500, TimeUnit.MILLISECONDS)
logResponse(response)
// Assert
assert response =~ "500 Server Error"
assert response =~ "Timeout"
}
@Test
void testFilterShouldAllowSiteToSiteTransfer() throws Exception {
// Arrange
createSimpleReadServer()
final String SITE_TO_SITE_POST_REQUEST = "POST /nifi-api/data-transfer/input-ports HTTP/1.1\r\nContent-Length: %d\r\nHost: h\r\n\r\n%s"
final String siteToSiteRequest = String.format(SITE_TO_SITE_POST_REQUEST, LARGE_CLAIM_SIZE_BYTES, LARGE_PAYLOAD)
logResponse(siteToSiteRequest, "Request: ")
// Act
String response = localConnector.getResponse(siteToSiteRequest)
logResponse(response)
// Assert
assert response =~ "200 OK"
}
@Test
void testJettyMaxFormSize() throws Exception {
// This shows that the jetty server option for 'maxFormContentSize' is insufficient for our needs because it
// catches requests like this:
// Configure the server but do not apply the CLF because the FORM_CONTENT_SIZE > 0
configureAndStartServer(new HttpServlet() {
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
req.getParameterMap()
ServletInputStream input = req.getInputStream()
int count = 0
while (!input.isFinished()) {
input.read()
count += 1
}
final int FORM_LIMIT_BYTES = FORM_CONTENT_SIZE + "a=\n".length()
if (count > FORM_LIMIT_BYTES) {
logger.warn("Bytes read ({}) is larger than the limit ({})", count, FORM_LIMIT_BYTES)
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Should not reach this code.")
} else {
logger.warn("Bytes read ({}) is less than or equal to the limit ({})", count, FORM_LIMIT_BYTES)
resp.sendError(HttpServletResponse.SC_EXPECTATION_FAILED, "Read Too Many Bytes")
}
} catch (final Exception e) {
// This is the jetty context returning a 400 from the maxFormContentSize setting:
if (StringUtils.containsIgnoreCase(e.getCause().toString(), "Form is larger than max length " + FORM_CONTENT_SIZE)) {
logger.warn("Exception thrown by input stream: ", e)
resp.sendError(HttpServletResponse.SC_REQUEST_ENTITY_TOO_LARGE, "Payload Too Large")
} else {
logger.warn("Exception thrown by input stream: ", e)
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Should not reach this code, either.")
}
}
}
}, FORM_CONTENT_SIZE)
// Test to catch a form submission that exceeds the FORM_CONTENT_SIZE limit
String form = "a=" + "1" * FORM_CONTENT_SIZE
String response = localConnector.getResponse(String.format(FORM_REQUEST, form.length(), form))
logResponse(response)
assert response =~ "413 Payload Too Large"
// But it does not catch requests like this:
response = localConnector.getResponse(String.format(POST_REQUEST, form.length(), form + form))
assert response =~ "417 Read Too Many Bytes"
}
}

View File

@ -29,7 +29,11 @@
</appender> </appender>
<logger name="org.apache.nifi" level="TRACE"/> <logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.web.api" level="DEBUG"/>
<logger name="org.apache.nifi.web.server" level="DEBUG"/>
<logger name="org.apache.nifi.web.security.requests" level="DEBUG"/>
<root level="INFO"> <root level="INFO">
<appender-ref ref="CONSOLE"/> <appender-ref ref="CONSOLE"/>
</root> </root>