Merge pull request #3372 from eclipse/jetty-9.4.x-3361-thread-safe-setHandlers

Issue #3361 thread safe setHandlers
This commit is contained in:
Simone Bordet 2019-04-05 09:01:26 +02:00 committed by GitHub
commit 1ef851523f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 492 additions and 170 deletions

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.deploy.App;
import org.eclipse.jetty.deploy.AppLifeCycle;
import org.eclipse.jetty.deploy.graph.Node;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
public class StandardDeployer implements AppLifeCycle.Binding
{
@ -37,9 +38,10 @@ public class StandardDeployer implements AppLifeCycle.Binding
{
ContextHandler handler = app.getContextHandler();
if (handler == null)
{
throw new NullPointerException("No Handler created for App: " + app);
}
app.getDeploymentManager().getContexts().addHandler(handler);
Callback.Completable blocker = new Callback.Completable();
app.getDeploymentManager().getContexts().deployHandler(handler, blocker);
blocker.get();
}
}

View File

@ -18,20 +18,16 @@
package org.eclipse.jetty.deploy.bindings;
import org.eclipse.jetty.deploy.App;
import org.eclipse.jetty.deploy.AppLifeCycle;
import org.eclipse.jetty.deploy.graph.Node;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.Callback;
public class StandardUndeployer implements AppLifeCycle.Binding
{
private static final Logger LOG = Log.getLogger(StandardUndeployer.class);
@Override
public String[] getBindingTargets()
{
@ -42,33 +38,11 @@ public class StandardUndeployer implements AppLifeCycle.Binding
@Override
public void processBinding(Node node, App app) throws Exception
{
ContextHandler handler = app.getContextHandler();
ContextHandlerCollection chcoll = app.getDeploymentManager().getContexts();
recursiveRemoveContext(chcoll,handler);
}
private void recursiveRemoveContext(HandlerCollection coll, ContextHandler context)
{
Handler children[] = coll.getHandlers();
int originalCount = children.length;
for (int i = 0, n = children.length; i < n; i++)
{
Handler child = children[i];
LOG.debug("Child handler {}",child);
if (child.equals(context))
{
LOG.debug("Removing handler {}",child);
coll.removeHandler(child);
child.destroy();
if (LOG.isDebugEnabled())
LOG.debug("After removal: {} (originally {})",coll.getHandlers().length,originalCount);
}
else if (child instanceof HandlerCollection)
{
recursiveRemoveContext((HandlerCollection)child,context);
}
}
ContextHandlerCollection contexts = app.getDeploymentManager().getContexts();
ContextHandler context = app.getContextHandler();
Callback.Completable blocker = new Callback.Completable();
contexts.undeployHandler(context, blocker);
blocker.get();
context.destroy();
}
}

View File

@ -1605,9 +1605,10 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
if (getServer() != null && (getServer().isStarting() || getServer().isStarted()))
{
Handler[] contextCollections = getServer().getChildHandlersByClass(ContextHandlerCollection.class);
ContextHandlerCollection[] contextCollections =
(ContextHandlerCollection[])getServer().getChildHandlersByClass(ContextHandlerCollection.class);
for (int h = 0; contextCollections != null && h < contextCollections.length; h++)
((ContextHandlerCollection)contextCollections[h]).mapContexts();
contextCollections[h].mapContexts();
}
}

View File

@ -24,8 +24,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -37,15 +35,16 @@ import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.ArrayTernaryTrie;
import org.eclipse.jetty.util.ArrayUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Trie;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SerializedExecutor;
/* ------------------------------------------------------------ */
/** ContextHandlerCollection.
*
/**
* This {@link org.eclipse.jetty.server.handler.HandlerCollection} is creates a
* Map of contexts to it's contained handlers based
* on the context path and virtual hosts of any contained {@link org.eclipse.jetty.server.handler.ContextHandler}s.
@ -58,9 +57,9 @@ import org.eclipse.jetty.util.log.Logger;
public class ContextHandlerCollection extends HandlerCollection
{
private static final Logger LOG = Log.getLogger(ContextHandlerCollection.class);
private final SerializedExecutor _serializedExecutor = new SerializedExecutor();
private final ConcurrentMap<ContextHandler,Handler> _contextBranches = new ConcurrentHashMap<>();
private volatile Trie<Map.Entry<String,Branch[]>> _pathBranches;
@Deprecated
private Class<? extends ContextHandler> _contextClass = ContextHandler.class;
/* ------------------------------------------------------------ */
@ -72,43 +71,57 @@ public class ContextHandlerCollection extends HandlerCollection
/* ------------------------------------------------------------ */
public ContextHandlerCollection(ContextHandler... contexts)
{
super(true,contexts);
super(true);
setHandlers(contexts);
}
/* ------------------------------------------------------------ */
/**
* Remap the context paths.
* Remap the contexts. Normally this is not required as context
* mapping is maintained as a side effect of {@link #setHandlers(Handler[])}
* However, if configuration changes in the deep handler structure (eg contextpath is changed), then
* this call will trigger a remapping.
* This method is mutually excluded from {@link #deployHandler(Handler, Callback)} and
* {@link #undeployHandler(Handler, Callback)}
*/
@ManagedOperation("update the mapping of context path to context")
@ManagedOperation("Update the mapping of context path to context")
public void mapContexts()
{
_contextBranches.clear();
Handler[] handlers = getHandlers();
if (handlers==null)
_serializedExecutor.execute(()->
{
_pathBranches=new ArrayTernaryTrie<>(false,16);
return;
}
while(true)
{
Handlers handlers = _handlers.get();
if (handlers==null)
break;
if (updateHandlers(handlers, newHandlers(handlers.getHandlers())))
break;
}
});
}
/* ------------------------------------------------------------ */
@Override
protected Handlers newHandlers(Handler[] handlers)
{
if (handlers==null || handlers.length==0)
return null;
// Create map of contextPath to handler Branch
Map<String,Branch[]> map = new HashMap<>();
// A branch is a Handler that could contain 0 or more ContextHandlers
Map<String,Branch[]> path2Branches = new HashMap<>();
for (Handler handler:handlers)
{
Branch branch=new Branch(handler);
for (String contextPath : branch.getContextPaths())
{
Branch[] branches=map.get(contextPath);
map.put(contextPath, ArrayUtil.addToArray(branches, branch, Branch.class));
Branch[] branches=path2Branches.get(contextPath);
path2Branches.put(contextPath, ArrayUtil.addToArray(branches, branch, Branch.class));
}
for (ContextHandler context : branch.getContextHandlers())
_contextBranches.putIfAbsent(context, branch.getHandler());
}
// Sort the branches so those with virtual hosts are considered before those without
for (Map.Entry<String,Branch[]> entry: map.entrySet())
// Sort the branches for each contextPath so those with virtual hosts are considered before those without
for (Map.Entry<String,Branch[]> entry: path2Branches.entrySet())
{
Branch[] branches=entry.getValue();
Branch[] sorted=new Branch[branches.length];
@ -124,69 +137,56 @@ public class ContextHandlerCollection extends HandlerCollection
// Loop until we have a big enough trie to hold all the context paths
int capacity=512;
Trie<Map.Entry<String,Branch[]>> trie;
Mapping mapping;
loop: while(true)
{
trie=new ArrayTernaryTrie<>(false,capacity);
for (Map.Entry<String,Branch[]> entry: map.entrySet())
mapping = new Mapping(handlers, capacity);
for (Map.Entry<String,Branch[]> entry: path2Branches.entrySet())
{
if (!trie.put(entry.getKey().substring(1),entry))
if (!mapping._pathBranches.put(entry.getKey().substring(1),entry))
{
capacity+=512;
continue loop;
}
}
break loop;
break;
}
if (LOG.isDebugEnabled())
{
for (String ctx : trie.keySet())
LOG.debug("{}->{}",ctx,Arrays.asList(trie.get(ctx).getValue()));
for (String ctx : mapping._pathBranches.keySet())
LOG.debug("{}->{}",ctx,Arrays.asList(mapping._pathBranches.get(ctx).getValue()));
}
_pathBranches=trie;
// add new context branches to concurrent map
for (Branch[] branches: path2Branches.values())
{
for (Branch branch : branches)
{
for (ContextHandler context : branch.getContextHandlers())
mapping._contextBranches.put(context, branch.getHandler());
}
}
return mapping;
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.jetty.server.server.handler.HandlerCollection#setHandlers(org.eclipse.jetty.server.server.Handler[])
*/
@Override
public void setHandlers(Handler[] handlers)
{
super.setHandlers(handlers);
if (isStarted())
mapContexts();
}
/* ------------------------------------------------------------ */
@Override
protected void doStart() throws Exception
{
mapContexts();
super.doStart();
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.jetty.server.server.Handler#handle(java.lang.String, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse, int)
*/
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
Handler[] handlers = getHandlers();
if (handlers==null || handlers.length==0)
Handlers handlers = _handlers.get();
if (handlers==null)
return;
Mapping mapping = (Mapping)handlers;
HttpChannelState async = baseRequest.getHttpChannelState();
if (async.isAsync())
{
ContextHandler context=async.getContextHandler();
if (context!=null)
{
Handler branch = _contextBranches.get(context);
Handler branch = mapping._contextBranches.get(context);
if (branch==null)
context.handle(target,baseRequest,request, response);
@ -196,19 +196,19 @@ public class ContextHandlerCollection extends HandlerCollection
}
}
// data structure which maps a request to a context; first-best match wins
// { context path => [ context ] }
// }
if (target.startsWith("/"))
{
Trie<Map.Entry<String,Branch[]>> pathBranches = mapping._pathBranches;
if (pathBranches==null)
return;
int limit = target.length()-1;
while (limit>=0)
{
// Get best match
Map.Entry<String,Branch[]> branches = _pathBranches.getBest(target,1,limit);
Map.Entry<String,Branch[]> branches = pathBranches.getBest(target,1,limit);
if (branches==null)
break;
@ -228,10 +228,11 @@ public class ContextHandlerCollection extends HandlerCollection
}
else
{
// This may not work in all circumstances... but then I think it should never be called
for (int i=0;i<handlers.length;i++)
if (mapping.getHandlers()==null)
return;
for (int i=0;i<mapping.getHandlers().length;i++)
{
handlers[i].handle(target,baseRequest, request, response);
mapping.getHandlers()[i].handle(target,baseRequest, request, response);
if ( baseRequest.isHandled())
return;
}
@ -239,11 +240,15 @@ public class ContextHandlerCollection extends HandlerCollection
}
/* ------------------------------------------------------------ */
/** Add a context handler.
/**
* Adds a context handler.
*
* @param contextPath The context path to add
* @param resourceBase the base (root) Resource
* @return the ContextHandler just added
* @deprecated Unused convenience method no longer supported.
*/
@Deprecated
public ContextHandler addContext(String contextPath,String resourceBase)
{
try
@ -261,22 +266,90 @@ public class ContextHandlerCollection extends HandlerCollection
}
}
/* ------------------------------------------------------------ */
/**
* Thread safe deploy of a Handler.
* <p>
* This method is the equivalent of {@link #addHandler(Handler)},
* but its execution is non-block and mutually excluded from all
* other calls to {@link #deployHandler(Handler, Callback)} and
* {@link #undeployHandler(Handler, Callback)}.
* The handler may be added after this call returns.
* </p>
* @param handler the handler to deploy
* @param callback Called after handler has been added
*/
public void deployHandler(Handler handler, Callback callback)
{
if (handler.getServer()!=getServer())
handler.setServer(getServer());
_serializedExecutor.execute(new SerializedExecutor.ErrorHandlingTask()
{
@Override
public void run()
{
addHandler(handler);
callback.succeeded();
}
@Override
public void accept(Throwable throwable)
{
callback.failed(throwable);
}
});
}
/* ------------------------------------------------------------ */
/**
* Thread safe undeploy of a Handler.
* <p>
* This method is the equivalent of {@link #removeHandler(Handler)},
* but its execution is non-block and mutually excluded from all
* other calls to {@link #deployHandler(Handler,Callback)} and
* {@link #undeployHandler(Handler,Callback)}.
* The handler may be removed after this call returns.
* </p>
* @param handler The handler to undeploy
* @param callback Called after handler has been removed
*/
public void undeployHandler(Handler handler, Callback callback)
{
_serializedExecutor.execute(new SerializedExecutor.ErrorHandlingTask()
{
@Override
public void run()
{
removeHandler(handler);
callback.succeeded();
}
@Override
public void accept(Throwable throwable)
{
callback.failed(throwable);
}
});
}
/* ------------------------------------------------------------ */
/**
* @return The class to use to add new Contexts
* @deprecated Unused convenience mechanism not used.
*/
@Deprecated
public Class<?> getContextClass()
{
return _contextClass;
}
/* ------------------------------------------------------------ */
/**
* @param contextClass The class to use to add new Contexts
* @deprecated Unused convenience mechanism not used.
*/
@Deprecated
public void setContextClass(Class<? extends ContextHandler> contextClass)
{
if (contextClass ==null || !(ContextHandler.class.isAssignableFrom(contextClass)))
@ -312,7 +385,7 @@ public class ContextHandlerCollection extends HandlerCollection
Set<String> getContextPaths()
{
Set<String> set = new HashSet<String>();
Set<String> set = new HashSet<>();
for (ContextHandler context:_contexts)
set.add(context.getContextPath());
return set;
@ -343,5 +416,18 @@ public class ContextHandlerCollection extends HandlerCollection
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private static class Mapping extends Handlers
{
private final Map<ContextHandler,Handler> _contextBranches = new HashMap<>();
private final Trie<Map.Entry<String,Branch[]>> _pathBranches;
private Mapping(Handler[] handlers, int capacity)
{
super(handlers);
_pathBranches = new ArrayTernaryTrie<>(false, capacity);
}
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -47,7 +48,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
public class HandlerCollection extends AbstractHandlerContainer
{
private final boolean _mutableWhenRunning;
private volatile Handler[] _handlers;
protected final AtomicReference<Handlers> _handlers = new AtomicReference<>();
/* ------------------------------------------------------------ */
public HandlerCollection()
@ -71,72 +72,93 @@ public class HandlerCollection extends AbstractHandlerContainer
/* ------------------------------------------------------------ */
/**
* @return Returns the handlers.
* @return the array of handlers.
*/
@Override
@ManagedAttribute(value="Wrapped handlers", readonly=true)
public Handler[] getHandlers()
{
return _handlers;
Handlers handlers = _handlers.get();
return handlers==null ? null : handlers._handlers;
}
/* ------------------------------------------------------------ */
/**
* @param handlers The handlers to set.
* @param handlers the array of handlers to set.
*/
public void setHandlers(Handler[] handlers)
{
if (!_mutableWhenRunning && isStarted())
throw new IllegalStateException(STARTED);
if (handlers!=null)
while(true)
{
// check for loops
for (Handler handler:handlers)
if (handler == this || (handler instanceof HandlerContainer &&
Arrays.asList(((HandlerContainer)handler).getChildHandlers()).contains(this)))
throw new IllegalStateException("setHandler loop");
// Set server
for (Handler handler:handlers)
if (handler.getServer()!=getServer())
handler.setServer(getServer());
if (updateHandlers(_handlers.get(), newHandlers(handlers)))
break;
}
Handler[] old=_handlers;;
_handlers = handlers;
updateBeans(old, handlers);
}
/* ------------------------------------------------------------ */
/**
* @see Handler#handle(String, Request, HttpServletRequest, HttpServletResponse)
*/
protected Handlers newHandlers(Handler[] handlers)
{
if (handlers==null || handlers.length==0)
return null;
return new Handlers(handlers);
}
/* ------------------------------------------------------------ */
protected boolean updateHandlers(Handlers old, Handlers handlers)
{
if (handlers!=null)
{
// check for loops
for (Handler handler:handlers._handlers)
if (handler == this || (handler instanceof HandlerContainer &&
Arrays.asList(((HandlerContainer)handler).getChildHandlers()).contains(this)))
throw new IllegalStateException("setHandler loop");
// Set server
for (Handler handler:handlers._handlers)
if (handler.getServer()!=getServer())
handler.setServer(getServer());
}
if (_handlers.compareAndSet(old, handlers))
{
Handler[] oldBeans = old == null ? null : old._handlers;
Handler[] newBeans = handlers == null ? null : handlers._handlers;
updateBeans(oldBeans, newBeans);
return true;
}
return false;
}
/* ------------------------------------------------------------ */
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
if (_handlers!=null && isStarted())
if (isStarted())
{
MultiException mex=null;
Handlers handlers = _handlers.get();
if (handlers==null)
return;
for (int i=0;i<_handlers.length;i++)
MultiException mex=null;
for (Handler handler : handlers._handlers)
{
try
{
_handlers[i].handle(target,baseRequest, request, response);
handler.handle(target, baseRequest, request, response);
}
catch(IOException e)
catch (IOException | RuntimeException e)
{
throw e;
}
catch(RuntimeException e)
catch (Exception e)
{
throw e;
}
catch(Exception e)
{
if (mex==null)
mex=new MultiException();
if (mex == null)
mex = new MultiException();
mex.add(e);
}
}
@ -147,37 +169,54 @@ public class HandlerCollection extends AbstractHandlerContainer
else
throw new ServletException(mex);
}
}
}
/* ------------------------------------------------------------ */
/* Add a handler.
/**
* Adds a handler.
* This implementation adds the passed handler to the end of the existing collection of handlers.
* @see org.eclipse.jetty.server.server.HandlerContainer#addHandler(org.eclipse.jetty.server.server.Handler)
* If the handler is already added, it is removed and readded
*/
public void addHandler(Handler handler)
{
setHandlers(ArrayUtil.addToArray(getHandlers(), handler, Handler.class));
while(true)
{
Handlers old = _handlers.get();
Handlers handlers = newHandlers(ArrayUtil.addToArray(old==null?null:ArrayUtil.removeFromArray(old._handlers, handler), handler, Handler.class));
if (updateHandlers(old,handlers))
break;
}
}
/* ------------------------------------------------------------ */
/* Prepend a handler.
/**
* Prepends a handler.
* This implementation adds the passed handler to the start of the existing collection of handlers.
* @see org.eclipse.jetty.server.server.HandlerContainer#addHandler(org.eclipse.jetty.server.server.Handler)
*/
public void prependHandler(Handler handler)
{
setHandlers(ArrayUtil.prependToArray(handler, getHandlers(), Handler.class));
while(true)
{
Handlers old = _handlers.get();
Handlers handlers = newHandlers(ArrayUtil.prependToArray(handler, old==null?null:old._handlers, Handler.class));
if (updateHandlers(old,handlers))
break;
}
}
/* ------------------------------------------------------------ */
public void removeHandler(Handler handler)
{
Handler[] handlers = getHandlers();
if (handlers!=null && handlers.length>0 )
setHandlers(ArrayUtil.removeFromArray(handlers, handler));
while(true)
{
Handlers old = _handlers.get();
if (old==null || old._handlers.length==0)
break;
Handlers handlers = newHandlers(ArrayUtil.removeFromArray(old._handlers, handler));
if (updateHandlers(old,handlers))
break;
}
}
/* ------------------------------------------------------------ */
@ -196,10 +235,28 @@ public class HandlerCollection extends AbstractHandlerContainer
{
if (!isStopped())
throw new IllegalStateException("!STOPPED");
Handler[] children=getChildHandlers();
Handler[] children = getChildHandlers();
setHandlers(null);
for (Handler child: children)
child.destroy();
super.destroy();
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
protected static class Handlers
{
private final Handler[] _handlers;
protected Handlers(Handler[] handlers)
{
this._handlers = handlers;
}
public Handler[] getHandlers()
{
return _handlers;
}
}
}

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.server.handler;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import javax.servlet.AsyncContext;
@ -41,9 +31,18 @@ import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ContextHandlerCollectionTest
{
@Test
@ -214,7 +213,7 @@ public class ContextHandlerCollectionTest
IsHandledHandler handler = (IsHandledHandler)context.getHandler();
context.setVirtualHosts(contextHosts);
// trigger this manually; it's supposed to be called when adding the handler
// trigger this manually
handlerCollection.mapContexts();
for(String host : requestHosts)

View File

@ -0,0 +1,111 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.jetty.util.log.Log;
/**
* An executor than ensurers serial execution of submitted tasks.
* <p>
* Callers of this execute will never block in the executor, but they may
* be required to either execute the task they submit or tasks submitted
* by other threads whilst they are executing tasks.
* </p>
* <p>
* This class was inspired by the public domain class
* <a href="https://github.com/jroper/reactive-streams-servlet/blob/master/reactive-streams-servlet/src/main/java/org/reactivestreams/servlet/NonBlockingMutexExecutor.java">NonBlockingMutexExecutor</a>
* </p>
*/
public class SerializedExecutor implements Executor
{
private final AtomicReference<Link> _tail = new AtomicReference<>();
@Override
public void execute(Runnable task)
{
Link link = new Link(task);
Link lastButOne = _tail.getAndSet(link);
if (lastButOne==null)
run(link);
else
lastButOne._next.lazySet(link);
}
protected void onError(Runnable task, Throwable t)
{
if (task instanceof ErrorHandlingTask)
((ErrorHandlingTask)task).accept(t);
Log.getLogger(task.getClass()).warn(t);
}
private void run(Link link)
{
while(link!=null)
{
try
{
link._task.run();
}
catch (Throwable t)
{
onError(link._task, t);
}
finally
{
// Are we the current the last Link?
if (_tail.compareAndSet(link, null))
link = null;
else
{
// not the last task, so its next link will eventually be set
Link next = link._next.get();
while (next == null)
{
Thread.yield(); // Thread.onSpinWait();
next = link._next.get();
}
link = next;
}
}
}
}
private class Link
{
private final Runnable _task;
private final AtomicReference<Link> _next = new AtomicReference<>();
public Link(Runnable task)
{
_task = task;
}
}
/**
* Error handling task
* <p>If a submitted task implements this interface, it will be passed
* any exceptions thrown when running the task.</p>
*/
public interface ErrorHandlingTask extends Runnable, Consumer<Throwable>
{}
}

View File

@ -0,0 +1,92 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SerializedExecutorTest
{
@Test
public void test() throws Exception
{
int threads = 64;
int loops = 1000;
int depth = 100;
AtomicInteger ran = new AtomicInteger();
AtomicBoolean running = new AtomicBoolean();
SerializedExecutor executor = new SerializedExecutor();
CountDownLatch start = new CountDownLatch(1);
CountDownLatch stop = new CountDownLatch(threads);
Random random = new Random();
for (int t = threads; t-- > 0; )
{
new Thread(() ->
{
try
{
start.await();
for (int l = loops; l-- > 0; )
{
final AtomicInteger d = new AtomicInteger(depth);
executor.execute(new Runnable()
{
@Override
public void run()
{
ran.incrementAndGet();
if (!running.compareAndSet(false, true))
throw new IllegalStateException();
if (d.decrementAndGet() > 0)
executor.execute(this);
if (!running.compareAndSet(true, false))
throw new IllegalStateException();
}
});
Thread.sleep(random.nextInt(5));
}
}
catch (Throwable th)
{
th.printStackTrace();
}
finally
{
stop.countDown();
}
}).start();
}
start.countDown();
assertTrue(stop.await(30, TimeUnit.SECONDS));
assertThat(ran.get(), Matchers.is(threads * loops * depth));
}
}