Improve ThreadLimitHandler
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
79f473ee4b
commit
d1a9ca1156
|
@ -21,10 +21,12 @@ import java.util.ArrayDeque;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import javax.servlet.AsyncContext;
|
import javax.servlet.AsyncContext;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.ServletRequestEvent;
|
||||||
|
import javax.servlet.ServletRequestListener;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
@ -73,7 +75,7 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
private final boolean _rfc7239;
|
private final boolean _rfc7239;
|
||||||
private final String _forwardedHeader;
|
private final String _forwardedHeader;
|
||||||
private final IncludeExcludeSet<String, InetAddress> _includeExcludeSet = new IncludeExcludeSet<>(InetAddressSet.class);
|
private final IncludeExcludeSet<String, InetAddress> _includeExcludeSet = new IncludeExcludeSet<>(InetAddressSet.class);
|
||||||
private final ConcurrentMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, Remote> _remotes = new ConcurrentHashMap<>();
|
||||||
private volatile boolean _enabled;
|
private volatile boolean _enabled;
|
||||||
private int _threadLimit = 10;
|
private int _threadLimit = 10;
|
||||||
|
|
||||||
|
@ -179,6 +181,17 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
request.getServletContext().addListener(new ServletRequestListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void requestDestroyed(ServletRequestEvent sre)
|
||||||
|
{
|
||||||
|
// Use a compute method to remove the Remote instance as it is necessary for
|
||||||
|
// the ref counter release and the removal to be atomic.
|
||||||
|
_remotes.computeIfPresent(remote._ip, (k, v) -> v._referenceCounter.release() ? null : v);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Do we already have a future permit from a previous invocation?
|
// Do we already have a future permit from a previous invocation?
|
||||||
Closeable permit = (Closeable)baseRequest.getAttribute(PERMIT);
|
Closeable permit = (Closeable)baseRequest.getAttribute(PERMIT);
|
||||||
try
|
try
|
||||||
|
@ -250,14 +263,18 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
if (limit <= 0)
|
if (limit <= 0)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
remote = _remotes.get(ip);
|
// Use a compute method to create or retain the Remote instance as it is necessary for
|
||||||
if (remote == null)
|
// the ref counter increment or the instance creation to be mutually exclusive.
|
||||||
|
// The map MUST be a CHM as it guarantees the remapping function is only called once.
|
||||||
|
remote = _remotes.compute(ip, (k, v) ->
|
||||||
{
|
{
|
||||||
Remote r = new Remote(ip, limit);
|
if (v != null)
|
||||||
remote = _remotes.putIfAbsent(ip, r);
|
{
|
||||||
if (remote == null)
|
v._referenceCounter.retain();
|
||||||
remote = r;
|
return v;
|
||||||
}
|
}
|
||||||
|
return new Remote(k, limit);
|
||||||
|
});
|
||||||
|
|
||||||
baseRequest.setAttribute(REMOTE, remote);
|
baseRequest.setAttribute(REMOTE, remote);
|
||||||
|
|
||||||
|
@ -276,7 +293,7 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no remote IP from a header, determine it directly from the channel
|
// If no remote IP from a header, determine it directly from the channel
|
||||||
// Do not use the request methods, as they may have been lied to by the
|
// Do not use the request methods, as they may have been lied to by the
|
||||||
// RequestCustomizer!
|
// RequestCustomizer!
|
||||||
InetSocketAddress inetAddr = baseRequest.getHttpChannel().getRemoteAddress();
|
InetSocketAddress inetAddr = baseRequest.getHttpChannel().getRemoteAddress();
|
||||||
if (inetAddr != null && inetAddr.getAddress() != null)
|
if (inetAddr != null && inetAddr.getAddress() != null)
|
||||||
|
@ -321,11 +338,17 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
return (comma >= 0) ? forwardedFor.substring(comma + 1).trim() : forwardedFor;
|
return (comma >= 0) ? forwardedFor.substring(comma + 1).trim() : forwardedFor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int getRemoteCount()
|
||||||
|
{
|
||||||
|
return _remotes.size();
|
||||||
|
}
|
||||||
|
|
||||||
private static final class Remote implements Closeable
|
private static final class Remote implements Closeable
|
||||||
{
|
{
|
||||||
private final String _ip;
|
private final String _ip;
|
||||||
private final int _limit;
|
private final int _limit;
|
||||||
private final AutoLock _lock = new AutoLock();
|
private final AutoLock _lock = new AutoLock();
|
||||||
|
private final ReferenceCounter _referenceCounter = new ReferenceCounter();
|
||||||
private int _permits;
|
private int _permits;
|
||||||
private Deque<CompletableFuture<Closeable>> _queue = new ArrayDeque<>();
|
private Deque<CompletableFuture<Closeable>> _queue = new ArrayDeque<>();
|
||||||
private final CompletableFuture<Closeable> _permitted = CompletableFuture.completedFuture(this);
|
private final CompletableFuture<Closeable> _permitted = CompletableFuture.completedFuture(this);
|
||||||
|
@ -349,7 +372,7 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
return _permitted; // TODO is it OK to share/reuse this?
|
return _permitted; // TODO is it OK to share/reuse this?
|
||||||
}
|
}
|
||||||
|
|
||||||
// No pass available, so queue a new future
|
// No pass available, so queue a new future
|
||||||
CompletableFuture<Closeable> pass = new CompletableFuture<>();
|
CompletableFuture<Closeable> pass = new CompletableFuture<>();
|
||||||
_queue.addLast(pass);
|
_queue.addLast(pass);
|
||||||
return pass;
|
return pass;
|
||||||
|
@ -429,4 +452,26 @@ public class ThreadLimitHandler extends HandlerWrapper
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class ReferenceCounter
|
||||||
|
{
|
||||||
|
private final AtomicInteger references = new AtomicInteger(1);
|
||||||
|
|
||||||
|
public void retain()
|
||||||
|
{
|
||||||
|
if (references.getAndUpdate(c -> c == 0 ? 0 : c + 1) == 0)
|
||||||
|
throw new IllegalStateException("released " + this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean release()
|
||||||
|
{
|
||||||
|
int ref = references.updateAndGet(c ->
|
||||||
|
{
|
||||||
|
if (c == 0)
|
||||||
|
throw new IllegalStateException("already released " + this);
|
||||||
|
return c - 1;
|
||||||
|
});
|
||||||
|
return ref == 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.server.handler;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
|
@ -35,6 +36,7 @@ import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
@ -83,7 +85,9 @@ public class ThreadLimitHandlerTest
|
||||||
response.setStatus(HttpStatus.OK_200);
|
response.setStatus(HttpStatus.OK_200);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
_server.setHandler(handler);
|
ContextHandler contextHandler = new ContextHandler("/");
|
||||||
|
contextHandler.setHandler(handler);
|
||||||
|
_server.setHandler(contextHandler);
|
||||||
_server.start();
|
_server.start();
|
||||||
|
|
||||||
last.set(null);
|
last.set(null);
|
||||||
|
@ -97,6 +101,8 @@ public class ThreadLimitHandlerTest
|
||||||
last.set(null);
|
last.set(null);
|
||||||
_local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
_local.getResponse("GET / HTTP/1.0\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||||
assertThat(last.get(), Matchers.is("0.0.0.0"));
|
assertThat(last.get(), Matchers.is("0.0.0.0"));
|
||||||
|
|
||||||
|
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -112,7 +118,9 @@ public class ThreadLimitHandlerTest
|
||||||
return super.getThreadLimit(ip);
|
return super.getThreadLimit(ip);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
_server.setHandler(handler);
|
ContextHandler contextHandler = new ContextHandler("/");
|
||||||
|
contextHandler.setHandler(handler);
|
||||||
|
_server.setHandler(contextHandler);
|
||||||
_server.start();
|
_server.start();
|
||||||
|
|
||||||
last.set(null);
|
last.set(null);
|
||||||
|
@ -130,6 +138,8 @@ public class ThreadLimitHandlerTest
|
||||||
last.set(null);
|
last.set(null);
|
||||||
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nX-Forwarded-For: 6.6.6.6,1.2.3.4\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nX-Forwarded-For: 6.6.6.6,1.2.3.4\r\nForwarded: for=1.2.3.4\r\n\r\n");
|
||||||
assertThat(last.get(), Matchers.is("1.2.3.4"));
|
assertThat(last.get(), Matchers.is("1.2.3.4"));
|
||||||
|
|
||||||
|
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -145,7 +155,9 @@ public class ThreadLimitHandlerTest
|
||||||
return super.getThreadLimit(ip);
|
return super.getThreadLimit(ip);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
_server.setHandler(handler);
|
ContextHandler contextHandler = new ContextHandler("/");
|
||||||
|
contextHandler.setHandler(handler);
|
||||||
|
_server.setHandler(contextHandler);
|
||||||
_server.start();
|
_server.start();
|
||||||
|
|
||||||
last.set(null);
|
last.set(null);
|
||||||
|
@ -163,6 +175,8 @@ public class ThreadLimitHandlerTest
|
||||||
last.set(null);
|
last.set(null);
|
||||||
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nForwarded: for=6.6.6.6; for=1.2.3.4\r\nX-Forwarded-For: 6.6.6.6\r\nForwarded: proto=https\r\n\r\n");
|
_local.getResponse("GET / HTTP/1.0\r\nX-Forwarded-For: 1.1.1.1\r\nForwarded: for=6.6.6.6; for=1.2.3.4\r\nX-Forwarded-For: 6.6.6.6\r\nForwarded: proto=https\r\n\r\n");
|
||||||
assertThat(last.get(), Matchers.is("1.2.3.4"));
|
assertThat(last.get(), Matchers.is("1.2.3.4"));
|
||||||
|
|
||||||
|
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -201,7 +215,9 @@ public class ThreadLimitHandlerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
_server.setHandler(handler);
|
ContextHandler contextHandler = new ContextHandler("/");
|
||||||
|
contextHandler.setHandler(handler);
|
||||||
|
_server.setHandler(contextHandler);
|
||||||
_server.start();
|
_server.start();
|
||||||
|
|
||||||
Socket[] client = new Socket[10];
|
Socket[] client = new Socket[10];
|
||||||
|
@ -237,5 +253,7 @@ public class ThreadLimitHandlerTest
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
assertThat(count.get(), is(0));
|
assertThat(count.get(), is(0));
|
||||||
|
|
||||||
|
await().atMost(5, TimeUnit.SECONDS).until(handler::getRemoteCount, is(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue