* Issue #844 DoS Handler First cut at implementation of Thread limiting handler for #844 * Added modules, simplified API, added IP exemptions * fixed xml * fixed preallocation * simplified code
This commit is contained in:
parent
6d485b2777
commit
28ca70378e
|
@ -0,0 +1,21 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
|
||||||
|
|
||||||
|
<!-- =============================================================== -->
|
||||||
|
<!-- Mixin the Thread Limit Handler to the entire server -->
|
||||||
|
<!-- =============================================================== -->
|
||||||
|
|
||||||
|
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||||
|
<Call name="insertHandler">
|
||||||
|
<Arg>
|
||||||
|
<New id="ThreadLimitHandler" class="org.eclipse.jetty.server.handler.ThreadLimitHandler">
|
||||||
|
<Arg name="forwardedHeader"><Property name="jetty.threadlimit.forwardedHeader"/></Arg>
|
||||||
|
<Set name="enabled"><Property name="jetty.threadlimit.enabled" default="true"/></Set>
|
||||||
|
<Set name="blockForMs"><Property name="jetty.threadlimit.blockForMs" default="0"/></Set>
|
||||||
|
<Set name="threadLimit"><Property name="jetty.threadlimit.threadLimit" default="10"/></Set>
|
||||||
|
</New>
|
||||||
|
</Arg>
|
||||||
|
</Call>
|
||||||
|
</Configure>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
#
|
||||||
|
# Thread Limit module
|
||||||
|
# Applies ThreadLimiteHandler to entire server
|
||||||
|
#
|
||||||
|
|
||||||
|
[depend]
|
||||||
|
server
|
||||||
|
|
||||||
|
[xml]
|
||||||
|
etc/jetty-threadlimit.xml
|
||||||
|
|
||||||
|
[ini-template]
|
||||||
|
## Select style of proxy forwarded header
|
||||||
|
#jetty.threadlimit.forwardedHeader=X-Forwarded-For
|
||||||
|
#jetty.threadlimit.forwardedHeader=Forwarded
|
||||||
|
|
||||||
|
## Enabled by default?
|
||||||
|
#jetty.threadlimit.enabled=true
|
||||||
|
|
||||||
|
## MS to block for waiting for available thread
|
||||||
|
#jetty.threadlimit.blockForMs=0
|
||||||
|
|
||||||
|
## Thread limit per remote IP
|
||||||
|
#jetty.threadlimit.threadLimit=10
|
||||||
|
|
|
@ -0,0 +1,440 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.server.handler;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.Deque;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import javax.servlet.AsyncContext;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.http.HostPortHttpField;
|
||||||
|
import org.eclipse.jetty.http.HttpField;
|
||||||
|
import org.eclipse.jetty.http.HttpFields;
|
||||||
|
import org.eclipse.jetty.http.HttpHeader;
|
||||||
|
import org.eclipse.jetty.http.QuotedCSV;
|
||||||
|
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
|
||||||
|
import org.eclipse.jetty.server.Request;
|
||||||
|
import org.eclipse.jetty.util.IncludeExcludeSet;
|
||||||
|
import org.eclipse.jetty.util.InetAddressSet;
|
||||||
|
import org.eclipse.jetty.util.StringUtil;
|
||||||
|
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||||
|
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||||
|
import org.eclipse.jetty.util.annotation.Name;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
import org.eclipse.jetty.util.thread.Locker;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Handler to limit the threads per IP address for DOS protection</p>
|
||||||
|
* <p>The ThreadLimitHandler applies a limit to the number of Threads
|
||||||
|
* that can be used simultaneously per remote IP address.
|
||||||
|
* </p>
|
||||||
|
* <p>The handler makes a determination of the remote IP separately to
|
||||||
|
* any that may be made by the {@link ForwardedRequestCustomizer} or similar:
|
||||||
|
* <ul>
|
||||||
|
* <li>This handler will use either only a single style
|
||||||
|
* of forwarded header. This is on the assumption that a trusted local proxy
|
||||||
|
* will produce only a single forwarded header and that any additional
|
||||||
|
* headers are likely from untrusted client side proxies.</li>
|
||||||
|
* <li>If multiple instances of a forwarded header are provided, this
|
||||||
|
* handler will use the right-most instance, which will have been set from
|
||||||
|
* the trusted local proxy</li>
|
||||||
|
* </ul>
|
||||||
|
* Requests in excess of the limit will be asynchronously suspended until
|
||||||
|
* a thread is available.
|
||||||
|
* <p>This is a simpler alternative to DosFilter</p>
|
||||||
|
*/
|
||||||
|
public class ThreadLimitHandler extends HandlerWrapper
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(ThreadLimitHandler.class);
|
||||||
|
|
||||||
|
private final static String REMOTE = "o.e.j.s.h.TLH.REMOTE";
|
||||||
|
private final static String PERMIT = "o.e.j.s.h.TLH.PASS";
|
||||||
|
private final boolean _rfc7239;
|
||||||
|
private final String _forwardedHeader;
|
||||||
|
private final IncludeExcludeSet<String, InetAddress> _includeExcludeSet = new IncludeExcludeSet<>(InetAddressSet.class);
|
||||||
|
private final ConcurrentMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
||||||
|
private volatile boolean _enabled;
|
||||||
|
private int _threadLimit=10;
|
||||||
|
|
||||||
|
public ThreadLimitHandler()
|
||||||
|
{
|
||||||
|
this(null,false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ThreadLimitHandler(@Name("forwardedHeader") String forwardedHeader)
|
||||||
|
{
|
||||||
|
this(forwardedHeader,HttpHeader.FORWARDED.is(forwardedHeader));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ThreadLimitHandler(@Name("forwardedHeader") String forwardedHeader, @Name("rfc7239") boolean rfc7239)
|
||||||
|
{
|
||||||
|
super();
|
||||||
|
_rfc7239 = rfc7239;
|
||||||
|
_forwardedHeader = forwardedHeader;
|
||||||
|
_enabled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doStart() throws Exception
|
||||||
|
{
|
||||||
|
super.doStart();
|
||||||
|
LOG.info(String.format("ThreadLimitHandler enable=%b limit=%d include=%s",_enabled,_threadLimit,_includeExcludeSet));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ManagedAttribute("true if this handler is enabled")
|
||||||
|
public boolean isEnabled()
|
||||||
|
{
|
||||||
|
return _enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnabled(boolean enabled)
|
||||||
|
{
|
||||||
|
_enabled = enabled;
|
||||||
|
LOG.info(String.format("ThreadLimitHandler enable=%b limit=%d include=%s",_enabled,_threadLimit,_includeExcludeSet));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ManagedAttribute("The maximum threads that can be dispatched per remote IP")
|
||||||
|
public int getThreadLimit()
|
||||||
|
{
|
||||||
|
return _threadLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setThreadLimit(int threadLimit)
|
||||||
|
{
|
||||||
|
if (threadLimit<=0)
|
||||||
|
throw new IllegalArgumentException("limit must be >0");
|
||||||
|
_threadLimit = threadLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ManagedOperation("Include IP in thread limits")
|
||||||
|
public void include(String inetAddressPattern)
|
||||||
|
{
|
||||||
|
_includeExcludeSet.include(inetAddressPattern);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ManagedOperation("Exclude IP from thread limits")
|
||||||
|
public void exclude(String inetAddressPattern)
|
||||||
|
{
|
||||||
|
_includeExcludeSet.exclude(inetAddressPattern);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
// Allow ThreadLimit to be enabled dynamically without restarting server
|
||||||
|
if (!_enabled)
|
||||||
|
{
|
||||||
|
// if disabled, handle normally
|
||||||
|
super.handle(target,baseRequest,request,response);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Get the remote address of the request
|
||||||
|
Remote remote = getRemote(baseRequest);
|
||||||
|
if (remote==null)
|
||||||
|
{
|
||||||
|
// if remote is not known, handle normally
|
||||||
|
super.handle(target,baseRequest,request,response);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Do we already have a future permit from a previous invocation?
|
||||||
|
Closeable permit = (Closeable)baseRequest.getAttribute(PERMIT);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (permit!=null)
|
||||||
|
{
|
||||||
|
// Yes, remove it from any future async cycles.
|
||||||
|
baseRequest.removeAttribute(PERMIT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// No, then lets try to acquire one
|
||||||
|
CompletableFuture<Closeable> future_permit=remote.acquire();
|
||||||
|
|
||||||
|
// Did we get a permit?
|
||||||
|
if (future_permit.isDone())
|
||||||
|
{
|
||||||
|
// yes
|
||||||
|
permit=future_permit.get();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Threadlimited {} {}",remote,target);
|
||||||
|
// No, lets asynchronously suspend the request
|
||||||
|
AsyncContext async = baseRequest.startAsync();
|
||||||
|
// let's never timeout the async. If this is a DOS, then good to make them wait, if this is not
|
||||||
|
// then give them maximum time to get a thread.
|
||||||
|
async.setTimeout(0);
|
||||||
|
|
||||||
|
// dispatch the request when we do eventually get a pass
|
||||||
|
future_permit.thenAccept(c->
|
||||||
|
{
|
||||||
|
baseRequest.setAttribute(PERMIT,c);
|
||||||
|
async.dispatch();
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the permit
|
||||||
|
super.handle(target,baseRequest,request,response);
|
||||||
|
}
|
||||||
|
catch (InterruptedException | ExecutionException e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (permit!=null)
|
||||||
|
permit.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getThreadLimit(String ip)
|
||||||
|
{
|
||||||
|
if (!_includeExcludeSet.isEmpty())
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!_includeExcludeSet.test(InetAddress.getByName(ip)))
|
||||||
|
{
|
||||||
|
LOG.debug("excluded {}",ip);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(Exception e)
|
||||||
|
{
|
||||||
|
LOG.ignore(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return _threadLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Remote getRemote(Request baseRequest)
|
||||||
|
{
|
||||||
|
Remote remote = (Remote)baseRequest.getAttribute(REMOTE);
|
||||||
|
if (remote!=null)
|
||||||
|
return remote;
|
||||||
|
|
||||||
|
String ip=getRemoteIP(baseRequest);
|
||||||
|
LOG.debug("ip={}",ip);
|
||||||
|
if (ip==null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
int limit = getThreadLimit(ip);
|
||||||
|
if (limit<=0)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
remote = _remotes.get(ip);
|
||||||
|
if (remote==null)
|
||||||
|
{
|
||||||
|
Remote r = new Remote(ip,limit);
|
||||||
|
remote = _remotes.putIfAbsent(ip,r);
|
||||||
|
if (remote==null)
|
||||||
|
remote = r;
|
||||||
|
}
|
||||||
|
|
||||||
|
baseRequest.setAttribute(REMOTE,remote);
|
||||||
|
|
||||||
|
return remote;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected String getRemoteIP(Request baseRequest)
|
||||||
|
{
|
||||||
|
// Do we have a forwarded header set?
|
||||||
|
if (_forwardedHeader!=null && !_forwardedHeader.isEmpty())
|
||||||
|
{
|
||||||
|
// Yes, then try to get the remote IP from the header
|
||||||
|
String remote = _rfc7239?getForwarded(baseRequest):getXForwardedFor(baseRequest);
|
||||||
|
if (remote!=null && !remote.isEmpty())
|
||||||
|
return remote;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no remote IP from a header, determine it directly from the channel
|
||||||
|
// Do not use the request methods, as they may have been lied to by the
|
||||||
|
// RequestCustomizer!
|
||||||
|
InetSocketAddress inet_addr = baseRequest.getHttpChannel().getRemoteAddress();
|
||||||
|
if (inet_addr!=null && inet_addr.getAddress()!=null)
|
||||||
|
return inet_addr.getAddress().getHostAddress();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getForwarded(Request request)
|
||||||
|
{
|
||||||
|
// Get the right most Forwarded for value.
|
||||||
|
// This is the value from the closest proxy and the only one that
|
||||||
|
// can be trusted.
|
||||||
|
RFC7239 rfc7239 = new RFC7239();
|
||||||
|
HttpFields httpFields = request.getHttpFields();
|
||||||
|
for (HttpField field : httpFields)
|
||||||
|
if (_forwardedHeader.equalsIgnoreCase(field.getName()))
|
||||||
|
rfc7239.addValue(field.getValue());
|
||||||
|
|
||||||
|
if (rfc7239.getFor()!=null)
|
||||||
|
return new HostPortHttpField(rfc7239.getFor()).getHost();
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getXForwardedFor(Request request)
|
||||||
|
{
|
||||||
|
// Get the right most XForwarded-For for value.
|
||||||
|
// This is the value from the closest proxy and the only one that
|
||||||
|
// can be trusted.
|
||||||
|
String forwarded_for = null;
|
||||||
|
HttpFields httpFields = request.getHttpFields();
|
||||||
|
for (HttpField field : httpFields)
|
||||||
|
if (_forwardedHeader.equalsIgnoreCase(field.getName()))
|
||||||
|
forwarded_for = field.getValue();
|
||||||
|
|
||||||
|
if (forwarded_for==null || forwarded_for.isEmpty())
|
||||||
|
return null;
|
||||||
|
|
||||||
|
int comma = forwarded_for.lastIndexOf(',');
|
||||||
|
return (comma>=0)?forwarded_for.substring(comma+1).trim():forwarded_for;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private final class Remote implements Closeable
|
||||||
|
{
|
||||||
|
private final String _ip;
|
||||||
|
private final int _limit;
|
||||||
|
private final Locker _locker = new Locker();
|
||||||
|
private int _permits;
|
||||||
|
private Deque<CompletableFuture<Closeable>> _queue = new ArrayDeque<>();
|
||||||
|
private final CompletableFuture<Closeable> _permitted = CompletableFuture.completedFuture(this);
|
||||||
|
|
||||||
|
public Remote(String ip, int limit)
|
||||||
|
{
|
||||||
|
_ip=ip;
|
||||||
|
_limit=limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Closeable> acquire()
|
||||||
|
{
|
||||||
|
try(Locker.Lock lock = _locker.lock())
|
||||||
|
{
|
||||||
|
// Do we have available passes?
|
||||||
|
if (_permits<_limit)
|
||||||
|
{
|
||||||
|
// Yes - increment the allocated passes
|
||||||
|
_permits++;
|
||||||
|
// return the already completed future
|
||||||
|
return _permitted; // TODO is it OK to share/reuse this?
|
||||||
|
}
|
||||||
|
|
||||||
|
// No pass available, so queue a new future
|
||||||
|
CompletableFuture<Closeable> pass = new CompletableFuture<Closeable>();
|
||||||
|
_queue.addLast(pass);
|
||||||
|
return pass;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
try(Locker.Lock lock = _locker.lock())
|
||||||
|
{
|
||||||
|
// reduce the allocated passes
|
||||||
|
_permits--;
|
||||||
|
while(true)
|
||||||
|
{
|
||||||
|
// Are there any future passes waiting?
|
||||||
|
CompletableFuture<Closeable> permit = _queue.pollFirst();
|
||||||
|
|
||||||
|
// No - we are done
|
||||||
|
if (permit==null)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Yes - if we can complete them, we are done
|
||||||
|
if (permit.complete(this))
|
||||||
|
{
|
||||||
|
_permits++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Somebody else must have completed/failed that future pass,
|
||||||
|
// so let's try for another.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
try(Locker.Lock lock = _locker.lock())
|
||||||
|
{
|
||||||
|
return String.format("R[ip=%s,p=%d,l=%d,q=%d]",_ip,_permits,_limit,_queue.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class RFC7239 extends QuotedCSV
|
||||||
|
{
|
||||||
|
String _for;
|
||||||
|
|
||||||
|
private RFC7239()
|
||||||
|
{
|
||||||
|
super(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
String getFor()
|
||||||
|
{
|
||||||
|
return _for;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void parsedParam(StringBuffer buffer, int valueLength, int paramName, int paramValue)
|
||||||
|
{
|
||||||
|
if (valueLength==0 && paramValue>paramName)
|
||||||
|
{
|
||||||
|
String name=StringUtil.asciiToLowerCase(buffer.substring(paramName,paramValue-1));
|
||||||
|
if ("for".equalsIgnoreCase(name))
|
||||||
|
{
|
||||||
|
String value=buffer.substring(paramValue);
|
||||||
|
|
||||||
|
// if unknown, clear any leftward values
|
||||||
|
if ("unknown".equalsIgnoreCase(value))
|
||||||
|
_for = null;
|
||||||
|
// Otherwise accept IP or token(starting with '_') as remote keys
|
||||||
|
else
|
||||||
|
_for=value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,248 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.server.handler;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.http.HttpStatus;
|
||||||
|
import org.eclipse.jetty.server.Connector;
|
||||||
|
import org.eclipse.jetty.server.LocalConnector;
|
||||||
|
import org.eclipse.jetty.server.NetworkConnector;
|
||||||
|
import org.eclipse.jetty.server.Request;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ThreadLimitHandlerTest
|
||||||
|
{
|
||||||
|
private Server _server;
|
||||||
|
private NetworkConnector _connector;
|
||||||
|
private LocalConnector _local;
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before()
|
||||||
|
throws Exception
|
||||||
|
{
|
||||||
|
_server = new Server();
|
||||||
|
_connector = new ServerConnector(_server);
|
||||||
|
_local = new LocalConnector(_server);
|
||||||
|
_server.setConnectors(new Connector[] { _local,_connector });
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after()
|
||||||
|
throws Exception
|
||||||
|
{
|
||||||
|
_server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoForwardHeaders() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<String> last = new AtomicReference<>();
|
||||||
|
ThreadLimitHandler handler = new ThreadLimitHandler(null,false)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected int getThreadLimit(String ip)
|
||||||
|
{
|
||||||
|
last.set(ip);
|
||||||
|
return super.getThreadLimit(ip);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
handler.setHandler(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
response.setStatus(HttpStatus.OK_200);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
_server.setHandler(handler);
|
||||||
|
_server.start();
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testXForwardForHeaders() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<String> last = new AtomicReference<>();
|
||||||
|
ThreadLimitHandler handler = new ThreadLimitHandler("X-Forwarded-For")
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected int getThreadLimit(String ip)
|
||||||
|
{
|
||||||
|
last.set(ip);
|
||||||
|
return super.getThreadLimit(ip);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
_server.setHandler(handler);
|
||||||
|
_server.start();
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("1.2.3.4"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nX-Forwarded-For: 6.6.6.6,1.2.3.4\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("1.2.3.4"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testForwardHeaders() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<String> last = new AtomicReference<>();
|
||||||
|
ThreadLimitHandler handler = new ThreadLimitHandler("Forwarded")
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected int getThreadLimit(String ip)
|
||||||
|
{
|
||||||
|
last.set(ip);
|
||||||
|
return super.getThreadLimit(ip);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
_server.setHandler(handler);
|
||||||
|
_server.start();
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("1.2.3.4"));
|
||||||
|
|
||||||
|
last.set(null);
|
||||||
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nForwarded: for=6.6.6.6; for=1.2.3.4\r\nX-Forwarded-For: 6.6.6.6\r\nForwarded: proto=https\r\n\r\n");
|
||||||
|
Assert.assertThat(last.get(),Matchers.is("1.2.3.4"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLimit() throws Exception
|
||||||
|
{
|
||||||
|
ThreadLimitHandler handler = new ThreadLimitHandler("Forwarded");
|
||||||
|
|
||||||
|
handler.setThreadLimit(4);
|
||||||
|
|
||||||
|
AtomicInteger count = new AtomicInteger(0);
|
||||||
|
AtomicInteger total = new AtomicInteger(0);
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
handler.setHandler(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
baseRequest.setHandled(true);
|
||||||
|
response.setStatus(HttpStatus.OK_200);
|
||||||
|
if ("/other".equals(target))
|
||||||
|
return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
count.incrementAndGet();
|
||||||
|
total.incrementAndGet();
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
count.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
_server.setHandler(handler);
|
||||||
|
_server.start();
|
||||||
|
|
||||||
|
Socket[] client = new Socket[10];
|
||||||
|
for (int i=0;i<client.length;i++)
|
||||||
|
{
|
||||||
|
client[i]=new Socket("127.0.0.1",_connector.getLocalPort());
|
||||||
|
client[i].getOutputStream().write(("GET /"+i+" HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n").getBytes());
|
||||||
|
client[i].getOutputStream().flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
long wait = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
|
||||||
|
while(count.get()<4 && System.nanoTime()<wait)
|
||||||
|
Thread.sleep(1);
|
||||||
|
assertThat(count.get(),is(4));
|
||||||
|
|
||||||
|
// check that other requests are not blocked
|
||||||
|
assertThat(_local.getResponse("GET /other HTTP/1.0\r\nForwarded: for=6.6.6.6\r\n\r\n"),Matchers.containsString(" 200 OK"));
|
||||||
|
|
||||||
|
// let the other requests go
|
||||||
|
latch.countDown();
|
||||||
|
|
||||||
|
while(total.get()<10 && System.nanoTime()<wait)
|
||||||
|
Thread.sleep(1);
|
||||||
|
|
||||||
|
assertThat(count.get(),is(0));
|
||||||
|
assertThat(total.get(),is(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -183,4 +183,9 @@ public class IncludeExcludeSet<P,T> implements Predicate<T>
|
||||||
{
|
{
|
||||||
return String.format("%s@%x{i=%s,ip=%s,e=%s,ep=%s}",this.getClass().getSimpleName(),hashCode(),_includes,_includePredicate,_excludes,_excludePredicate);
|
return String.format("%s@%x{i=%s,ip=%s,e=%s,ep=%s}",this.getClass().getSimpleName(),hashCode(),_includes,_includePredicate,_excludes,_excludePredicate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty()
|
||||||
|
{
|
||||||
|
return _includes.isEmpty() && _excludes.isEmpty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,9 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
* Convenience Lock Wrapper.
|
* Convenience Lock Wrapper.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* try(SpinLock.Lock lock = locker.lock())
|
* try(Locker.Lock lock = locker.lock())
|
||||||
* {
|
* {
|
||||||
* // something very quick and non blocking
|
* // something
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue