From 0915b2b0abbc47ced392fa30413148d360d1333a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 17 Sep 2012 14:39:13 +0200 Subject: [PATCH] jetty-9 - Refactored code that was counting the number of reentrant invocations into a common utility class, ForkInvoker. --- .../org/eclipse/jetty/io/SelectorManager.java | 68 ++++----- .../eclipse/jetty/spdy/StandardSession.java | 57 ++++---- .../eclipse/jetty/util/ExecutorCallback.java | 118 +++++++-------- .../org/eclipse/jetty/util/ForkInvoker.java | 138 ++++++++++++++++++ 4 files changed, 252 insertions(+), 129 deletions(-) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/ForkInvoker.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 5dfc230d217..c0995459c68 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.ForkInvoker; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -53,14 +54,6 @@ import org.eclipse.jetty.util.log.Logger; */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { - private static final ThreadLocal _submissions = new ThreadLocal() - { - @Override - protected Integer initialValue() - { - return 0; - } - }; protected static final Logger LOG = Log.getLogger(SelectorManager.class); private final ManagedSelector[] _selectors; @@ -274,6 +267,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa */ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable { + private final ForkInvoker invoker = new ManagedSelectorInvoker(); private final Queue _changes = new ConcurrentLinkedQueue<>(); private final int _id; private Selector _selector; @@ -314,31 +308,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa */ public boolean submit(Runnable change) { - int submissions = _submissions.get(); - if (Thread.currentThread() != _thread || submissions >= 4) - { - _changes.offer(change); - LOG.debug("Queued change {}", change); - boolean wakeup = _needsWakeup; - if (wakeup) - wakeup(); - return false; - } - else - { - _submissions.set(submissions + 1); - try - { - LOG.debug("Submitted change {}", change); - runChanges(); - runChange(change); - return true; - } - finally - { - _submissions.set(submissions); - } - } + return !invoker.invoke(change); } private void runChanges() @@ -592,6 +562,38 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); } + private class ManagedSelectorInvoker extends ForkInvoker + { + private ManagedSelectorInvoker() + { + super(4); + } + + @Override + protected boolean condition() + { + return Thread.currentThread() != _thread; + } + + @Override + public void fork(Runnable change) + { + _changes.offer(change); + LOG.debug("Queued change {}", change); + boolean wakeup = _needsWakeup; + if (wakeup) + wakeup(); + } + + @Override + public void call(Runnable change) + { + LOG.debug("Submitted change {}", change); + runChanges(); + runChange(change); + } + } + private class DumpKeys implements Runnable { private final CountDownLatch latch = new CountDownLatch(1); diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index b9f75e6bc0b..cb3a076215f 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -70,6 +70,7 @@ import org.eclipse.jetty.spdy.parser.Parser; import org.eclipse.jetty.util.Atomics; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ForkInvoker; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; @@ -79,15 +80,8 @@ import org.eclipse.jetty.util.thread.Scheduler; public class StandardSession implements ISession, Parser.Listener, Callback, Dumpable { private static final Logger logger = Log.getLogger(Session.class); - private static final ThreadLocal handlerInvocations = new ThreadLocal() - { - @Override - protected Integer initialValue() - { - return 0; - } - }; + private final ForkInvoker invoker = new SessionInvoker(); private final Map attributes = new ConcurrentHashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); private final ConcurrentMap streams = new ConcurrentHashMap<>(); @@ -688,7 +682,7 @@ public class StandardSession implements ISession, Parser.Listener, Callback= 4) + invoker.invoke(new Runnable() { - execute(new Runnable() - { - @Override - public void run() - { - if (callback != null) - notifyCallbackCompleted(callback, context); - flush(); - } - }); - } - else - { - handlerInvocations.set(invocations + 1); - try + @Override + public void run() { if (callback != null) notifyCallbackCompleted(callback, context); flush(); } - finally - { - handlerInvocations.set(invocations); - } - } + }); } private void notifyCallbackCompleted(Callback callback, C context) @@ -1114,7 +1090,6 @@ public class StandardSession implements ISession, Parser.Listener, Callback + { + private SessionInvoker() + { + super(4); + } + @Override + public void fork(Runnable task) + { + execute(task); + } + + @Override + public void call(Runnable task) + { + task.run(); + } + } public interface FrameBytes extends Comparable { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/ExecutorCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/ExecutorCallback.java index 5ac547eac55..be748c0c5f0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/ExecutorCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/ExecutorCallback.java @@ -20,83 +20,34 @@ package org.eclipse.jetty.util; import java.util.concurrent.Executor; -import org.eclipse.jetty.util.component.Dumpable; - public abstract class ExecutorCallback implements Callback { - private final static ThreadLocal __calls = new ThreadLocal() - { - @Override - protected Integer initialValue() - { - return 0; - } - }; - - private final int _maxRecursion; + private final ForkInvoker _invoker; private final Executor _executor; - private final Runnable _onNullContextCompleted = new Runnable() - { - @Override - public void run() { onCompleted(null); } - }; public ExecutorCallback(Executor executor) { - this(executor,4); + this(executor, 4); } - public ExecutorCallback(Executor executor,int maxRecursion) + public ExecutorCallback(Executor executor, int maxRecursion) { - _executor=executor; - _maxRecursion=maxRecursion; + _executor = executor; + _invoker = new ExecutorCallbackInvoker(maxRecursion); } @Override public final void completed(final C context) { // Should we execute? - if (!alwaysDispatchCompletion()) + if (alwaysDispatchCompletion()) { - // Do we have a recursion limit? - if (_maxRecursion<=0) - { - // No, so just call it directly - onCompleted(context); - return; - } - else - { - // Has this thread exceeded the recursion limit - Integer calls=__calls.get(); - if (calls<_maxRecursion) - { - // No, so increment recursion count, call, then decrement - try - { - __calls.set(calls+1); - onCompleted(context); - return; - } - finally - { - __calls.set(calls); - } - } - } + _invoker.fork(context); } - - // fallen through to here so execute - _executor.execute(context==null?_onNullContextCompleted:new Runnable() + else { - @Override - public void run() { onCompleted(context);} - @Override - public String toString() - { - return String.format("ExectorCB$Completed@%x{%s}",hashCode(),context); - } - }); + _invoker.invoke(context); + } } protected abstract void onCompleted(C context); @@ -105,22 +56,22 @@ public abstract class ExecutorCallback implements Callback public final void failed(final C context, final Throwable x) { // Always execute failure - Runnable runnable=new Runnable() + Runnable runnable = new Runnable() { @Override public void run() { - onFailed(context,x); + onFailed(context, x); } @Override public String toString() { - return String.format("ExectorCB$Failed@%x{%s,%s}",hashCode(),context,x); + return String.format("ExecutorCallback@%x{%s,%s}", hashCode(), context, x); } }; - if (_executor==null) + if (_executor == null) new Thread(runnable).start(); else _executor.execute(runnable); @@ -132,7 +83,7 @@ public abstract class ExecutorCallback implements Callback protected boolean alwaysDispatchCompletion() { - return _executor!=null; + return _executor != null; } @Override @@ -140,4 +91,43 @@ public abstract class ExecutorCallback implements Callback { return String.format("%s@%x", getClass(), hashCode()); } + + private class ExecutorCallbackInvoker extends ForkInvoker implements Runnable + { + private ExecutorCallbackInvoker(int maxInvocations) + { + super(maxInvocations); + } + + @Override + public void fork(final C context) + { + _executor.execute(context == null ? this : new Runnable() + { + @Override + public void run() + { + call(context); + } + + @Override + public String toString() + { + return String.format("ExecutorCallback@%x{%s}", hashCode(), context); + } + }); + } + + @Override + public void call(C context) + { + onCompleted(context); + } + + @Override + public void run() + { + call(null); + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/ForkInvoker.java b/jetty-util/src/main/java/org/eclipse/jetty/util/ForkInvoker.java new file mode 100644 index 00000000000..e20efe1bcfe --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/ForkInvoker.java @@ -0,0 +1,138 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +/** + * Utility class that splits calls to {@link #invoke(T)} into calls to {@link #fork(T)} or {@link #call(T)} + * depending on the max number of reentrant calls to {@link #invoke(T)}. + *

+ * This class prevents {@link StackOverflowError}s in case of methods that end up invoking themselves, + * such is common for {@link Callback#completed(Object)}. + *

+ * Typical use case is: + *

+ * public void reentrantMethod(Object param)
+ * {
+ *     if (condition || tooManyReenters)
+ *         fork(param)
+ *     else
+ *         call(param)
+ * }
+ * 
+ * Calculating {@code tooManyReenters} usually involves using a {@link ThreadLocal} and algebra on the + * number of reentrant invocations, which is factored out in this class for convenience. + *

+ * The same code using this class becomes: + *

+ * private final ForkInvoker invoker = ...;
+ *
+ * public void reentrantMethod(Object param)
+ * {
+ *     invoker.invoke(param);
+ * }
+ * 
+ * + * @param the generic type of this class + */ +public abstract class ForkInvoker +{ + private static final ThreadLocal __invocations = new ThreadLocal() + { + @Override + protected Integer initialValue() + { + return 0; + } + }; + private final int _maxInvocations; + + /** + * Creates an instance with the given max number of reentrant calls to {@link #invoke(T)} + *

+ * If {@code maxInvocations} is zero or negative, it is interpreted + * as if the max number of reentrant calls is infinite. + * + * @param maxInvocations the max number of reentrant calls to {@link #invoke(T)} + */ + public ForkInvoker(int maxInvocations) + { + _maxInvocations = maxInvocations; + } + + /** + * Invokes either {@link #fork(T)} or {@link #call(T)}. + * If {@link #condition()} returns true, {@link #fork(T)} is invoked. + * Otherwise, if the max number of reentrant calls is positive and the + * actual number of reentrant invocations exceeds it, {@link #fork(T)} is invoked. + * Otherwise, {@link #call(T)} is invoked. + * + * @param context the invocation context + * @return true if {@link #fork(T)} has been called, false otherwise + */ + public boolean invoke(T context) + { + boolean countInvocations = _maxInvocations > 0; + int invocations = __invocations.get(); + if (condition() || countInvocations && invocations > _maxInvocations) + { + fork(context); + return true; + } + else + { + if (countInvocations) + __invocations.set(invocations + 1); + try + { + call(context); + return false; + } + finally + { + if (countInvocations) + __invocations.set(invocations); + } + } + } + + /** + * Subclasses should override this method returning true if they want + * {@link #invoke(T)} to call {@link #fork(T)}. + * + * @return true if {@link #invoke(T)} should call {@link #fork(T)}, false otherwise + */ + protected boolean condition() + { + return false; + } + + /** + * Executes the forked invocation + * + * @param context the invocation context + */ + public abstract void fork(T context); + + /** + * Executes the direct, non-forked, invocation + * + * @param context the invocation context + */ + public abstract void call(T context); +}