Improve ThreadLimitHandler (#11723)
* Improve ThreadLimitHandler Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
d7e66dd818
commit
45e474b7d5
|
@ -21,7 +21,6 @@ import java.util.Deque;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -31,6 +30,7 @@ import org.eclipse.jetty.http.HttpField;
|
|||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.QuotedCSV;
|
||||
import org.eclipse.jetty.io.Retainable;
|
||||
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
|
@ -68,7 +68,7 @@ public class ThreadLimitHandler extends ConditionalHandler.Abstract
|
|||
|
||||
private final boolean _rfc7239;
|
||||
private final String _forwardedHeader;
|
||||
private final ConcurrentMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
||||
private volatile boolean _enabled;
|
||||
private int _threadLimit = 10;
|
||||
|
||||
|
@ -163,7 +163,10 @@ public class ThreadLimitHandler extends ConditionalHandler.Abstract
|
|||
}
|
||||
|
||||
// We accept the request and will always handle it.
|
||||
LimitedRequest limitedRequest = new LimitedRequest(remote, next, request, response, callback);
|
||||
// Use a compute method to remove the Remote instance as it is necessary for
|
||||
// the ref counter release and the removal to be atomic.
|
||||
LimitedRequest limitedRequest = new LimitedRequest(remote, next, request, response, Callback.from(callback, () ->
|
||||
_remotes.computeIfPresent(remote._ip, (k, v) -> v._referenceCounter.release() ? null : v)));
|
||||
limitedRequest.handle();
|
||||
return true;
|
||||
}
|
||||
|
@ -177,6 +180,7 @@ public class ThreadLimitHandler extends ConditionalHandler.Abstract
|
|||
private Remote getRemote(Request baseRequest)
|
||||
{
|
||||
String ip = getRemoteIP(baseRequest);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ip={}", ip);
|
||||
if (ip == null)
|
||||
return null;
|
||||
|
@ -185,15 +189,18 @@ public class ThreadLimitHandler extends ConditionalHandler.Abstract
|
|||
if (limit <= 0)
|
||||
return null;
|
||||
|
||||
Remote remote = _remotes.get(ip);
|
||||
if (remote == null)
|
||||
// Use a compute method to create or retain the Remote instance as it is necessary for
|
||||
// the ref counter increment or the instance creation to be mutually exclusive.
|
||||
// The map MUST be a CHM as it guarantees the remapping function is only called once.
|
||||
return _remotes.compute(ip, (k, v) ->
|
||||
{
|
||||
Remote r = new Remote(baseRequest.getContext(), ip, limit);
|
||||
remote = _remotes.putIfAbsent(ip, r);
|
||||
if (remote == null)
|
||||
remote = r;
|
||||
if (v != null)
|
||||
{
|
||||
v._referenceCounter.retain();
|
||||
return v;
|
||||
}
|
||||
return remote;
|
||||
return new Remote(baseRequest.getContext(), k, limit);
|
||||
});
|
||||
}
|
||||
|
||||
protected String getRemoteIP(Request baseRequest)
|
||||
|
@ -256,6 +263,11 @@ public class ThreadLimitHandler extends ConditionalHandler.Abstract
|
|||
return (comma >= 0) ? forwardedFor.substring(comma + 1).trim() : forwardedFor;
|
||||
}
|
||||
|
||||
int getRemoteCount()
|
||||
{
|
||||
return _remotes.size();
|
||||
}
|
||||
|
||||
private static class LimitedRequest extends Request.Wrapper
|
||||
{
|
||||
private final Remote _remote;
|
||||
|
@ -517,6 +529,7 @@ public class ThreadLimitHandler extends ConditionalHandler.Abstract
|
|||
private static final class Remote
|
||||
{
|
||||
private final Executor _executor;
|
||||
private final Retainable.ReferenceCounter _referenceCounter = new Retainable.ReferenceCounter();
|
||||
private final String _ip;
|
||||
private final int _limit;
|
||||
private final AutoLock _lock = new AutoLock();
|
||||
|
|
|
@ -104,6 +104,8 @@ public class ThreadLimitHandlerTest
|
|||
last.set(null);
|
||||
_local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||
assertThat(last.get(), is("0.0.0.0"));
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -147,6 +149,8 @@ public class ThreadLimitHandlerTest
|
|||
last.set(null);
|
||||
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nX-Forwarded-For: 6.6.6.6,1.2.3.4\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||
assertThat(last.get(), is("1.2.3.4"));
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -190,6 +194,8 @@ public class ThreadLimitHandlerTest
|
|||
last.set(null);
|
||||
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nForwarded: for=6.6.6.6; for=1.2.3.4\r\nX-Forwarded-For: 6.6.6.6\r\nForwarded: proto=https\r\n\r\n");
|
||||
assertThat(last.get(), is("1.2.3.4"));
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -248,6 +254,8 @@ public class ThreadLimitHandlerTest
|
|||
|
||||
await().atMost(10, TimeUnit.SECONDS).until(total::get, is(10));
|
||||
await().atMost(10, TimeUnit.SECONDS).until(count::get, is(0));
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -367,5 +375,7 @@ public class ThreadLimitHandlerTest
|
|||
assertThat(response, containsString(" 200 OK"));
|
||||
assertThat(response, containsString(" read 2"));
|
||||
}
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,17 +21,19 @@ import java.util.ArrayDeque;
|
|||
import java.util.Deque;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import jakarta.servlet.AsyncContext;
|
||||
import jakarta.servlet.ServletException;
|
||||
import jakarta.servlet.ServletRequestEvent;
|
||||
import jakarta.servlet.ServletRequestListener;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import org.eclipse.jetty.http.HostPortHttpField;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.QuotedCSV;
|
||||
import org.eclipse.jetty.io.Retainable;
|
||||
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
|
||||
import org.eclipse.jetty.util.IncludeExcludeSet;
|
||||
import org.eclipse.jetty.util.InetAddressSet;
|
||||
|
@ -72,7 +74,7 @@ public class ThreadLimitHandler extends HandlerWrapper
|
|||
private final boolean _rfc7239;
|
||||
private final String _forwardedHeader;
|
||||
private final IncludeExcludeSet<String, InetAddress> _includeExcludeSet = new IncludeExcludeSet<>(InetAddressSet.class);
|
||||
private final ConcurrentMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
||||
private volatile boolean _enabled;
|
||||
private int _threadLimit = 10;
|
||||
|
||||
|
@ -178,6 +180,17 @@ public class ThreadLimitHandler extends HandlerWrapper
|
|||
}
|
||||
else
|
||||
{
|
||||
baseRequest.addEventListener(new ServletRequestListener()
|
||||
{
|
||||
@Override
|
||||
public void requestDestroyed(ServletRequestEvent sre)
|
||||
{
|
||||
// Use a compute method to remove the Remote instance as it is necessary for
|
||||
// the ref counter release and the removal to be atomic.
|
||||
_remotes.computeIfPresent(remote._ip, (k, v) -> v._referenceCounter.release() ? null : v);
|
||||
}
|
||||
});
|
||||
|
||||
// Do we already have a future permit from a previous invocation?
|
||||
Closeable permit = (Closeable)baseRequest.getAttribute(PERMIT);
|
||||
try
|
||||
|
@ -249,14 +262,18 @@ public class ThreadLimitHandler extends HandlerWrapper
|
|||
if (limit <= 0)
|
||||
return null;
|
||||
|
||||
remote = _remotes.get(ip);
|
||||
if (remote == null)
|
||||
// Use a compute method to create or retain the Remote instance as it is necessary for
|
||||
// the ref counter increment or the instance creation to be mutually exclusive.
|
||||
// The map MUST be a CHM as it guarantees the remapping function is only called once.
|
||||
remote = _remotes.compute(ip, (k, v) ->
|
||||
{
|
||||
Remote r = new Remote(ip, limit);
|
||||
remote = _remotes.putIfAbsent(ip, r);
|
||||
if (remote == null)
|
||||
remote = r;
|
||||
if (v != null)
|
||||
{
|
||||
v._referenceCounter.retain();
|
||||
return v;
|
||||
}
|
||||
return new Remote(k, limit);
|
||||
});
|
||||
|
||||
baseRequest.setAttribute(REMOTE, remote);
|
||||
|
||||
|
@ -325,6 +342,7 @@ public class ThreadLimitHandler extends HandlerWrapper
|
|||
private final String _ip;
|
||||
private final int _limit;
|
||||
private final AutoLock _lock = new AutoLock();
|
||||
private final Retainable.ReferenceCounter _referenceCounter = new Retainable.ReferenceCounter();
|
||||
private int _permits;
|
||||
private Deque<CompletableFuture<Closeable>> _queue = new ArrayDeque<>();
|
||||
private final CompletableFuture<Closeable> _permitted = CompletableFuture.completedFuture(this);
|
||||
|
|
Loading…
Reference in New Issue