diff --git a/jetty-server/src/main/config/etc/jetty-threadlimit.xml b/jetty-server/src/main/config/etc/jetty-threadlimit.xml new file mode 100644 index 00000000000..20faf163287 --- /dev/null +++ b/jetty-server/src/main/config/etc/jetty-threadlimit.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/jetty-server/src/main/config/modules/threadlimit.mod b/jetty-server/src/main/config/modules/threadlimit.mod new file mode 100644 index 00000000000..1a2615bd9b9 --- /dev/null +++ b/jetty-server/src/main/config/modules/threadlimit.mod @@ -0,0 +1,25 @@ +# +# Thread Limit module +# Applies ThreadLimiteHandler to entire server +# + +[depend] +server + +[xml] +etc/jetty-threadlimit.xml + +[ini-template] +## Select style of proxy forwarded header +#jetty.threadlimit.forwardedHeader=X-Forwarded-For +#jetty.threadlimit.forwardedHeader=Forwarded + +## Enabled by default? +#jetty.threadlimit.enabled=true + +## MS to block for waiting for available thread +#jetty.threadlimit.blockForMs=0 + +## Thread limit per remote IP +#jetty.threadlimit.threadLimit=10 + diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ThreadLimitHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ThreadLimitHandler.java new file mode 100644 index 00000000000..9684845e331 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ThreadLimitHandler.java @@ -0,0 +1,440 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server.handler; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HostPortHttpField; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.QuotedCSV; +import org.eclipse.jetty.server.ForwardedRequestCustomizer; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.IncludeExcludeSet; +import org.eclipse.jetty.util.InetAddressSet; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.util.annotation.Name; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; + + +/** + *

Handler to limit the threads per IP address for DOS protection

+ *

The ThreadLimitHandler applies a limit to the number of Threads + * that can be used simultaneously per remote IP address. + *

+ *

The handler makes a determination of the remote IP separately to + * any that may be made by the {@link ForwardedRequestCustomizer} or similar: + *

+ * Requests in excess of the limit will be asynchronously suspended until + * a thread is available. + *

This is a simpler alternative to DosFilter

+ */ +public class ThreadLimitHandler extends HandlerWrapper +{ + private static final Logger LOG = Log.getLogger(ThreadLimitHandler.class); + + private final static String REMOTE = "o.e.j.s.h.TLH.REMOTE"; + private final static String PERMIT = "o.e.j.s.h.TLH.PASS"; + private final boolean _rfc7239; + private final String _forwardedHeader; + private final IncludeExcludeSet _includeExcludeSet = new IncludeExcludeSet<>(InetAddressSet.class); + private final ConcurrentMap _remotes = new ConcurrentHashMap<>(); + private volatile boolean _enabled; + private int _threadLimit=10; + + public ThreadLimitHandler() + { + this(null,false); + } + + public ThreadLimitHandler(@Name("forwardedHeader") String forwardedHeader) + { + this(forwardedHeader,HttpHeader.FORWARDED.is(forwardedHeader)); + } + + public ThreadLimitHandler(@Name("forwardedHeader") String forwardedHeader, @Name("rfc7239") boolean rfc7239) + { + super(); + _rfc7239 = rfc7239; + _forwardedHeader = forwardedHeader; + _enabled = true; + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + LOG.info(String.format("ThreadLimitHandler enable=%b limit=%d include=%s",_enabled,_threadLimit,_includeExcludeSet)); + } + + @ManagedAttribute("true if this handler is enabled") + public boolean isEnabled() + { + return _enabled; + } + + public void setEnabled(boolean enabled) + { + _enabled = enabled; + LOG.info(String.format("ThreadLimitHandler enable=%b limit=%d include=%s",_enabled,_threadLimit,_includeExcludeSet)); + } + + @ManagedAttribute("The maximum threads that can be dispatched per remote IP") + public int getThreadLimit() + { + return _threadLimit; + } + + public void setThreadLimit(int threadLimit) + { + if (threadLimit<=0) + throw new IllegalArgumentException("limit must be >0"); + _threadLimit = threadLimit; + } + + @ManagedOperation("Include IP in thread limits") + public void include(String inetAddressPattern) + { + _includeExcludeSet.include(inetAddressPattern); + } + + @ManagedOperation("Exclude IP from thread limits") + public void exclude(String inetAddressPattern) + { + _includeExcludeSet.exclude(inetAddressPattern); + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + // Allow ThreadLimit to be enabled dynamically without restarting server + if (!_enabled) + { + // if disabled, handle normally + super.handle(target,baseRequest,request,response); + } + else + { + // Get the remote address of the request + Remote remote = getRemote(baseRequest); + if (remote==null) + { + // if remote is not known, handle normally + super.handle(target,baseRequest,request,response); + } + else + { + // Do we already have a future permit from a previous invocation? + Closeable permit = (Closeable)baseRequest.getAttribute(PERMIT); + try + { + if (permit!=null) + { + // Yes, remove it from any future async cycles. + baseRequest.removeAttribute(PERMIT); + } + else + { + // No, then lets try to acquire one + CompletableFuture future_permit=remote.acquire(); + + // Did we get a permit? + if (future_permit.isDone()) + { + // yes + permit=future_permit.get(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Threadlimited {} {}",remote,target); + // No, lets asynchronously suspend the request + AsyncContext async = baseRequest.startAsync(); + // let's never timeout the async. If this is a DOS, then good to make them wait, if this is not + // then give them maximum time to get a thread. + async.setTimeout(0); + + // dispatch the request when we do eventually get a pass + future_permit.thenAccept(c-> + { + baseRequest.setAttribute(PERMIT,c); + async.dispatch(); + }); + return; + } + } + + // Use the permit + super.handle(target,baseRequest,request,response); + } + catch (InterruptedException | ExecutionException e) + { + throw new ServletException(e); + } + finally + { + if (permit!=null) + permit.close(); + } + } + } + } + + protected int getThreadLimit(String ip) + { + if (!_includeExcludeSet.isEmpty()) + { + try + { + if (!_includeExcludeSet.test(InetAddress.getByName(ip))) + { + LOG.debug("excluded {}",ip); + return 0; + } + } + catch(Exception e) + { + LOG.ignore(e); + } + } + return _threadLimit; + } + + protected Remote getRemote(Request baseRequest) + { + Remote remote = (Remote)baseRequest.getAttribute(REMOTE); + if (remote!=null) + return remote; + + String ip=getRemoteIP(baseRequest); + LOG.debug("ip={}",ip); + if (ip==null) + return null; + + int limit = getThreadLimit(ip); + if (limit<=0) + return null; + + remote = _remotes.get(ip); + if (remote==null) + { + Remote r = new Remote(ip,limit); + remote = _remotes.putIfAbsent(ip,r); + if (remote==null) + remote = r; + } + + baseRequest.setAttribute(REMOTE,remote); + + return remote; + } + + + protected String getRemoteIP(Request baseRequest) + { + // Do we have a forwarded header set? + if (_forwardedHeader!=null && !_forwardedHeader.isEmpty()) + { + // Yes, then try to get the remote IP from the header + String remote = _rfc7239?getForwarded(baseRequest):getXForwardedFor(baseRequest); + if (remote!=null && !remote.isEmpty()) + return remote; + } + + // If no remote IP from a header, determine it directly from the channel + // Do not use the request methods, as they may have been lied to by the + // RequestCustomizer! + InetSocketAddress inet_addr = baseRequest.getHttpChannel().getRemoteAddress(); + if (inet_addr!=null && inet_addr.getAddress()!=null) + return inet_addr.getAddress().getHostAddress(); + return null; + } + + private String getForwarded(Request request) + { + // Get the right most Forwarded for value. + // This is the value from the closest proxy and the only one that + // can be trusted. + RFC7239 rfc7239 = new RFC7239(); + HttpFields httpFields = request.getHttpFields(); + for (HttpField field : httpFields) + if (_forwardedHeader.equalsIgnoreCase(field.getName())) + rfc7239.addValue(field.getValue()); + + if (rfc7239.getFor()!=null) + return new HostPortHttpField(rfc7239.getFor()).getHost(); + + return null; + } + + private String getXForwardedFor(Request request) + { + // Get the right most XForwarded-For for value. + // This is the value from the closest proxy and the only one that + // can be trusted. + String forwarded_for = null; + HttpFields httpFields = request.getHttpFields(); + for (HttpField field : httpFields) + if (_forwardedHeader.equalsIgnoreCase(field.getName())) + forwarded_for = field.getValue(); + + if (forwarded_for==null || forwarded_for.isEmpty()) + return null; + + int comma = forwarded_for.lastIndexOf(','); + return (comma>=0)?forwarded_for.substring(comma+1).trim():forwarded_for; + } + + + private final class Remote implements Closeable + { + private final String _ip; + private final int _limit; + private final Locker _locker = new Locker(); + private int _permits; + private Deque> _queue = new ArrayDeque<>(); + private final CompletableFuture _permitted = CompletableFuture.completedFuture(this); + + public Remote(String ip, int limit) + { + _ip=ip; + _limit=limit; + } + + public CompletableFuture acquire() + { + try(Locker.Lock lock = _locker.lock()) + { + // Do we have available passes? + if (_permits<_limit) + { + // Yes - increment the allocated passes + _permits++; + // return the already completed future + return _permitted; // TODO is it OK to share/reuse this? + } + + // No pass available, so queue a new future + CompletableFuture pass = new CompletableFuture(); + _queue.addLast(pass); + return pass; + } + } + + @Override + public void close() throws IOException + { + try(Locker.Lock lock = _locker.lock()) + { + // reduce the allocated passes + _permits--; + while(true) + { + // Are there any future passes waiting? + CompletableFuture permit = _queue.pollFirst(); + + // No - we are done + if (permit==null) + break; + + // Yes - if we can complete them, we are done + if (permit.complete(this)) + { + _permits++; + break; + } + + // Somebody else must have completed/failed that future pass, + // so let's try for another. + } + } + } + + @Override + public String toString() + { + try(Locker.Lock lock = _locker.lock()) + { + return String.format("R[ip=%s,p=%d,l=%d,q=%d]",_ip,_permits,_limit,_queue.size()); + } + } + } + + private final class RFC7239 extends QuotedCSV + { + String _for; + + private RFC7239() + { + super(false); + } + + String getFor() + { + return _for; + } + + @Override + protected void parsedParam(StringBuffer buffer, int valueLength, int paramName, int paramValue) + { + if (valueLength==0 && paramValue>paramName) + { + String name=StringUtil.asciiToLowerCase(buffer.substring(paramName,paramValue-1)); + if ("for".equalsIgnoreCase(name)) + { + String value=buffer.substring(paramValue); + + // if unknown, clear any leftward values + if ("unknown".equalsIgnoreCase(value)) + _for = null; + // Otherwise accept IP or token(starting with '_') as remote keys + else + _for=value; + } + } + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ThreadLimitHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ThreadLimitHandlerTest.java new file mode 100644 index 00000000000..a1436a503f2 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ThreadLimitHandlerTest.java @@ -0,0 +1,248 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server.handler; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.LocalConnector; +import org.eclipse.jetty.server.NetworkConnector; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ThreadLimitHandlerTest +{ + private Server _server; + private NetworkConnector _connector; + private LocalConnector _local; + + + @Before + public void before() + throws Exception + { + _server = new Server(); + _connector = new ServerConnector(_server); + _local = new LocalConnector(_server); + _server.setConnectors(new Connector[] { _local,_connector }); + + } + + @After + public void after() + throws Exception + { + _server.stop(); + } + + + @Test + public void testNoForwardHeaders() throws Exception + { + AtomicReference last = new AtomicReference<>(); + ThreadLimitHandler handler = new ThreadLimitHandler(null,false) + { + @Override + protected int getThreadLimit(String ip) + { + last.set(ip); + return super.getThreadLimit(ip); + } + }; + handler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setStatus(HttpStatus.OK_200); + } + }); + _server.setHandler(handler); + _server.start(); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + } + + @Test + public void testXForwardForHeaders() throws Exception + { + AtomicReference last = new AtomicReference<>(); + ThreadLimitHandler handler = new ThreadLimitHandler("X-Forwarded-For") + { + @Override + protected int getThreadLimit(String ip) + { + last.set(ip); + return super.getThreadLimit(ip); + } + }; + _server.setHandler(handler); + _server.start(); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("1.2.3.4")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nX-Forwarded-For: 6.6.6.6,1.2.3.4\r\nForwarded: for=1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("1.2.3.4")); + + } + + @Test + public void testForwardHeaders() throws Exception + { + AtomicReference last = new AtomicReference<>(); + ThreadLimitHandler handler = new ThreadLimitHandler("Forwarded") + { + @Override + protected int getThreadLimit(String ip) + { + last.set(ip); + return super.getThreadLimit(ip); + } + }; + _server.setHandler(handler); + _server.start(); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("0.0.0.0")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("1.2.3.4")); + + last.set(null); + _local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nForwarded: for=6.6.6.6; for=1.2.3.4\r\nX-Forwarded-For: 6.6.6.6\r\nForwarded: proto=https\r\n\r\n"); + Assert.assertThat(last.get(),Matchers.is("1.2.3.4")); + } + + + + @Test + public void testLimit() throws Exception + { + ThreadLimitHandler handler = new ThreadLimitHandler("Forwarded"); + + handler.setThreadLimit(4); + + AtomicInteger count = new AtomicInteger(0); + AtomicInteger total = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + handler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setStatus(HttpStatus.OK_200); + if ("/other".equals(target)) + return; + + try + { + count.incrementAndGet(); + total.incrementAndGet(); + latch.await(); + } + catch (InterruptedException e) + { + throw new ServletException(e); + } + finally + { + count.decrementAndGet(); + } + + } + }); + _server.setHandler(handler); + _server.start(); + + Socket[] client = new Socket[10]; + for (int i=0;i implements Predicate { return String.format("%s@%x{i=%s,ip=%s,e=%s,ep=%s}",this.getClass().getSimpleName(),hashCode(),_includes,_includePredicate,_excludes,_excludePredicate); } + + public boolean isEmpty() + { + return _includes.isEmpty() && _excludes.isEmpty(); + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java index 9348f3f3806..32a06fffd77 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Locker.java @@ -25,9 +25,9 @@ import java.util.concurrent.locks.ReentrantLock; * Convenience Lock Wrapper. * *
- * try(SpinLock.Lock lock = locker.lock())
+ * try(Locker.Lock lock = locker.lock())
  * {
- *   // something very quick and non blocking
+ *   // something 
  * }
  * 
*/