Fixes #12289 - Improve ConcurrentPool concurrency. (#12290)

A call to `sweep()`, although protected by the lock for concurrent calls to `reserve()`, may be concurrent with `remove(Entry)`.

`remove(Entry)` in turn calls `entries.remove(Object)`, so that the concurrent iteration in `sweep()` over `entries` fails with an `ArrayIndexOutOfBoundsException`.

Now using the bulk `entries.removeIf(Predicate)` method in `sweep()`, so that sweeping is atomic with respect to `entries.remove(Object)`.

Fixed other occurrences of manual iteration over CopyOnWriteArrayList that may be concurrent with removals.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-09-20 16:50:39 +03:00 committed by GitHub
parent 9c342637cc
commit 115ee1cf39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 48 additions and 36 deletions

View File

@ -653,11 +653,11 @@ public class ContextHandler extends Handler.Wrapper implements Attributes, Alias
*/ */
protected void notifyExitScope(Request request) protected void notifyExitScope(Request request)
{ {
for (int i = _contextListeners.size(); i-- > 0; ) for (ContextScopeListener listener : TypeUtil.reverse(_contextListeners))
{ {
try try
{ {
_contextListeners.get(i).exitScope(_context, request); listener.exitScope(_context, request);
} }
catch (Throwable e) catch (Throwable e)
{ {

View File

@ -87,7 +87,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
* @param cache whether a {@link ThreadLocal} cache should be used for the most recently released entry * @param cache whether a {@link ThreadLocal} cache should be used for the most recently released entry
* @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID} * @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID}
*/ */
@Deprecated @Deprecated(since = "12.0.4", forRemoval = true)
public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache) public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache)
{ {
this(strategyType, maxSize, pooled -> 1); this(strategyType, maxSize, pooled -> 1);
@ -103,7 +103,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
* @param maxMultiplex a function that given the pooled object returns the max multiplex count * @param maxMultiplex a function that given the pooled object returns the max multiplex count
* @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID} * @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID}
*/ */
@Deprecated @Deprecated(since = "12.0.4", forRemoval = true)
public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToIntFunction<P> maxMultiplex) public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToIntFunction<P> maxMultiplex)
{ {
this(strategyType, maxSize, maxMultiplex); this(strategyType, maxSize, maxMultiplex);
@ -148,7 +148,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
{ {
leaked.increment(); leaked.increment();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Leaked " + holder); LOG.debug("Leaked {}", holder);
leaked(); leaked();
} }
@ -195,15 +195,14 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
void sweep() void sweep()
{ {
for (int i = 0; i < entries.size(); i++) // Remove entries atomically with respect to remove(Entry).
entries.removeIf(holder ->
{ {
Holder<P> holder = entries.get(i); boolean remove = holder.getEntry() == null;
if (holder.getEntry() == null) if (remove)
{
entries.remove(i--);
leaked(holder); leaked(holder);
} return remove;
} });
} }
@Override @Override
@ -285,8 +284,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
if (!removed) if (!removed)
return false; return false;
// No need to lock, no race with reserve() // In a harmless race with reserve()/sweep()/terminate().
// and the race with terminate() is harmless.
Holder<P> holder = ((ConcurrentEntry<P>)entry).getHolder(); Holder<P> holder = ((ConcurrentEntry<P>)entry).getHolder();
boolean evicted = entries.remove(holder); boolean evicted = entries.remove(holder);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -313,10 +311,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
// Field this.terminated must be modified with the lock held // Field this.terminated must be modified with the lock held
// because the list of entries is modified, see reserve(). // because the list of entries is modified, see reserve().
terminated = true; terminated = true;
copy = entries.stream() copy = stream().toList();
.map(Holder::getEntry)
.filter(Objects::nonNull)
.toList();
entries.clear(); entries.clear();
} }

View File

@ -173,6 +173,21 @@ public class TypeUtil
return Arrays.asList(a); return Arrays.asList(a);
} }
/**
* <p>Returns a new list with the elements of the specified list in reverse order.</p>
* <p>The specified list is not modified, differently from {@link Collections#reverse(List)}.</p>
*
* @param list the list whose elements are to be reversed
* @return a new list with the elements in reverse order
* @param <T> the element type
*/
public static <T> List<T> reverse(List<T> list)
{
List<T> result = new ArrayList<>(list);
Collections.reverse(result);
return result;
}
/** /**
* Class from a canonical name for a type. * Class from a canonical name for a type.
* *

View File

@ -95,6 +95,7 @@ import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.Loader; import org.eclipse.jetty.util.Loader;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
@ -518,8 +519,7 @@ public class ServletContextHandler extends ContextHandler
//Call context listeners //Call context listeners
Throwable multiException = null; Throwable multiException = null;
ServletContextEvent event = new ServletContextEvent(getServletContext()); ServletContextEvent event = new ServletContextEvent(getServletContext());
Collections.reverse(_destroyServletContextListeners); for (ServletContextListener listener : TypeUtil.reverse(_destroyServletContextListeners))
for (ServletContextListener listener : _destroyServletContextListeners)
{ {
try try
{ {
@ -574,17 +574,17 @@ public class ServletContextHandler extends ContextHandler
if (!_servletRequestListeners.isEmpty()) if (!_servletRequestListeners.isEmpty())
{ {
final ServletRequestEvent sre = new ServletRequestEvent(getServletContext(), request); final ServletRequestEvent sre = new ServletRequestEvent(getServletContext(), request);
for (int i = _servletRequestListeners.size(); i-- > 0; ) for (ServletRequestListener listener : TypeUtil.reverse(_servletRequestListeners))
{ {
_servletRequestListeners.get(i).requestDestroyed(sre); listener.requestDestroyed(sre);
} }
} }
if (!_servletRequestAttributeListeners.isEmpty()) if (!_servletRequestAttributeListeners.isEmpty())
{ {
for (int i = _servletRequestAttributeListeners.size(); i-- > 0; ) for (ServletRequestAttributeListener listener : TypeUtil.reverse(_servletRequestAttributeListeners))
{ {
scopedRequest.removeEventListener(_servletRequestAttributeListeners.get(i)); scopedRequest.removeEventListener(listener);
} }
} }
} }
@ -1223,11 +1223,11 @@ public class ServletContextHandler extends ContextHandler
ServletContextRequest scopedRequest = Request.as(request, ServletContextRequest.class); ServletContextRequest scopedRequest = Request.as(request, ServletContextRequest.class);
if (!_contextListeners.isEmpty()) if (!_contextListeners.isEmpty())
{ {
for (int i = _contextListeners.size(); i-- > 0; ) for (ServletContextScopeListener listener : TypeUtil.reverse(_contextListeners))
{ {
try try
{ {
_contextListeners.get(i).exitScope(getContext(), scopedRequest); listener.exitScope(getContext(), scopedRequest);
} }
catch (Throwable e) catch (Throwable e)
{ {

View File

@ -47,6 +47,7 @@ import org.eclipse.jetty.session.AbstractSessionManager;
import org.eclipse.jetty.session.ManagedSession; import org.eclipse.jetty.session.ManagedSession;
import org.eclipse.jetty.session.SessionConfig; import org.eclipse.jetty.session.SessionConfig;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
public class SessionHandler extends AbstractSessionManager implements Handler.Singleton public class SessionHandler extends AbstractSessionManager implements Handler.Singleton
{ {
@ -569,9 +570,9 @@ public class SessionHandler extends AbstractSessionManager implements Handler.Si
getSessionContext().run(() -> getSessionContext().run(() ->
{ {
HttpSessionEvent event = new HttpSessionEvent(session.getApi()); HttpSessionEvent event = new HttpSessionEvent(session.getApi());
for (int i = _sessionListeners.size() - 1; i >= 0; i--) for (HttpSessionListener listener : TypeUtil.reverse(_sessionListeners))
{ {
_sessionListeners.get(i).sessionDestroyed(event); listener.sessionDestroyed(event);
} }
}); });
} }

View File

@ -1000,17 +1000,17 @@ public class ContextHandler extends ScopedHandler implements Attributes, Supplie
if (!_servletRequestListeners.isEmpty()) if (!_servletRequestListeners.isEmpty())
{ {
final ServletRequestEvent sre = new ServletRequestEvent(_apiContext, request); final ServletRequestEvent sre = new ServletRequestEvent(_apiContext, request);
for (int i = _servletRequestListeners.size(); i-- > 0; ) for (ServletRequestListener listener : TypeUtil.reverse(_servletRequestListeners))
{ {
_servletRequestListeners.get(i).requestDestroyed(sre); listener.requestDestroyed(sre);
} }
} }
if (!_servletRequestAttributeListeners.isEmpty()) if (!_servletRequestAttributeListeners.isEmpty())
{ {
for (int i = _servletRequestAttributeListeners.size(); i-- > 0; ) for (ServletRequestAttributeListener listener : TypeUtil.reverse(_servletRequestAttributeListeners))
{ {
baseRequest.removeEventListener(_servletRequestAttributeListeners.get(i)); baseRequest.removeEventListener(listener);
} }
} }
} }
@ -1070,11 +1070,11 @@ public class ContextHandler extends ScopedHandler implements Attributes, Supplie
{ {
if (!_contextListeners.isEmpty()) if (!_contextListeners.isEmpty())
{ {
for (int i = _contextListeners.size(); i-- > 0; ) for (ContextScopeListener listener : TypeUtil.reverse(_contextListeners))
{ {
try try
{ {
_contextListeners.get(i).exitScope(_apiContext, request); listener.exitScope(_apiContext, request);
} }
catch (Throwable e) catch (Throwable e)
{ {

View File

@ -54,6 +54,7 @@ import org.eclipse.jetty.session.SessionIdManager;
import org.eclipse.jetty.session.SessionManager; import org.eclipse.jetty.session.SessionManager;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -834,9 +835,9 @@ public class SessionHandler extends ScopedHandler implements SessionConfig.Mutab
Runnable r = () -> Runnable r = () ->
{ {
HttpSessionEvent event = new HttpSessionEvent(session.getApi()); HttpSessionEvent event = new HttpSessionEvent(session.getApi());
for (int i = _sessionListeners.size() - 1; i >= 0; i--) for (HttpSessionListener listener : TypeUtil.reverse(_sessionListeners))
{ {
_sessionListeners.get(i).sessionDestroyed(event); listener.sessionDestroyed(event);
} }
}; };
_contextHandler.getCoreContextHandler().getContext().run(r); _contextHandler.getCoreContextHandler().getContext().run(r);