jetty-9 - Refactored code that was counting the number of reentrant invocations into a common utility class, ForkInvoker.

This commit is contained in:
Simone Bordet 2012-09-17 14:39:13 +02:00
parent e7db1661c6
commit 0915b2b0ab
4 changed files with 252 additions and 129 deletions

View File

@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -53,14 +54,6 @@ import org.eclipse.jetty.util.log.Logger;
*/ */
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
{ {
private static final ThreadLocal<Integer> _submissions = new ThreadLocal<Integer>()
{
@Override
protected Integer initialValue()
{
return 0;
}
};
protected static final Logger LOG = Log.getLogger(SelectorManager.class); protected static final Logger LOG = Log.getLogger(SelectorManager.class);
private final ManagedSelector[] _selectors; private final ManagedSelector[] _selectors;
@ -274,6 +267,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/ */
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{ {
private final ForkInvoker<Runnable> invoker = new ManagedSelectorInvoker();
private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>(); private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
private final int _id; private final int _id;
private Selector _selector; private Selector _selector;
@ -314,31 +308,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/ */
public boolean submit(Runnable change) public boolean submit(Runnable change)
{ {
int submissions = _submissions.get(); return !invoker.invoke(change);
if (Thread.currentThread() != _thread || submissions >= 4)
{
_changes.offer(change);
LOG.debug("Queued change {}", change);
boolean wakeup = _needsWakeup;
if (wakeup)
wakeup();
return false;
}
else
{
_submissions.set(submissions + 1);
try
{
LOG.debug("Submitted change {}", change);
runChanges();
runChange(change);
return true;
}
finally
{
_submissions.set(submissions);
}
}
} }
private void runChanges() private void runChanges()
@ -592,6 +562,38 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
} }
private class ManagedSelectorInvoker extends ForkInvoker<Runnable>
{
private ManagedSelectorInvoker()
{
super(4);
}
@Override
protected boolean condition()
{
return Thread.currentThread() != _thread;
}
@Override
public void fork(Runnable change)
{
_changes.offer(change);
LOG.debug("Queued change {}", change);
boolean wakeup = _needsWakeup;
if (wakeup)
wakeup();
}
@Override
public void call(Runnable change)
{
LOG.debug("Submitted change {}", change);
runChanges();
runChange(change);
}
}
private class DumpKeys implements Runnable private class DumpKeys implements Runnable
{ {
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -70,6 +70,7 @@ import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Atomics; import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -79,15 +80,8 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class StandardSession implements ISession, Parser.Listener, Callback<StandardSession.FrameBytes>, Dumpable public class StandardSession implements ISession, Parser.Listener, Callback<StandardSession.FrameBytes>, Dumpable
{ {
private static final Logger logger = Log.getLogger(Session.class); private static final Logger logger = Log.getLogger(Session.class);
private static final ThreadLocal<Integer> handlerInvocations = new ThreadLocal<Integer>()
{
@Override
protected Integer initialValue()
{
return 0;
}
};
private final ForkInvoker<Runnable> invoker = new SessionInvoker();
private final Map<String, Object> attributes = new ConcurrentHashMap<>(); private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
@ -688,7 +682,7 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
private void onCredential(CredentialFrame frame) private void onCredential(CredentialFrame frame)
{ {
logger.warn("{} frame not yet supported", ControlFrameType.CREDENTIAL); logger.warn("{} frame not yet supported", frame.getType());
flush(); flush();
} }
@ -1034,10 +1028,7 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
// if we call Callback.completed() only synchronously we risk // if we call Callback.completed() only synchronously we risk
// starvation (for the last frames sent) and stack overflow. // starvation (for the last frames sent) and stack overflow.
// Therefore every some invocation, we dispatch to a new thread // Therefore every some invocation, we dispatch to a new thread
Integer invocations = handlerInvocations.get(); invoker.invoke(new Runnable()
if (invocations >= 4)
{
execute(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -1048,21 +1039,6 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
} }
}); });
} }
else
{
handlerInvocations.set(invocations + 1);
try
{
if (callback != null)
notifyCallbackCompleted(callback, context);
flush();
}
finally
{
handlerInvocations.set(invocations);
}
}
}
private <C> void notifyCallbackCompleted(Callback<C> callback, C context) private <C> void notifyCallbackCompleted(Callback<C> callback, C context)
{ {
@ -1114,7 +1090,6 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
return String.format("%s@%x{v%d,queuSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(), hashCode(), version, queue.size(), getWindowSize(), streams.size()); return String.format("%s@%x{v%d,queuSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(), hashCode(), version, queue.size(), getWindowSize(), streams.size());
} }
@Override @Override
public String dump() public String dump()
{ {
@ -1128,7 +1103,25 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
AggregateLifeCycle.dump(out,indent,Collections.singletonList(controller),streams.values()); AggregateLifeCycle.dump(out,indent,Collections.singletonList(controller),streams.values());
} }
private class SessionInvoker extends ForkInvoker<Runnable>
{
private SessionInvoker()
{
super(4);
}
@Override
public void fork(Runnable task)
{
execute(task);
}
@Override
public void call(Runnable task)
{
task.run();
}
}
public interface FrameBytes extends Comparable<FrameBytes> public interface FrameBytes extends Comparable<FrameBytes>
{ {

View File

@ -20,26 +20,10 @@ package org.eclipse.jetty.util;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.util.component.Dumpable;
public abstract class ExecutorCallback<C> implements Callback<C> public abstract class ExecutorCallback<C> implements Callback<C>
{ {
private final static ThreadLocal<Integer> __calls = new ThreadLocal<Integer>() private final ForkInvoker<C> _invoker;
{
@Override
protected Integer initialValue()
{
return 0;
}
};
private final int _maxRecursion;
private final Executor _executor; private final Executor _executor;
private final Runnable _onNullContextCompleted = new Runnable()
{
@Override
public void run() { onCompleted(null); }
};
public ExecutorCallback(Executor executor) public ExecutorCallback(Executor executor)
{ {
@ -49,54 +33,21 @@ public abstract class ExecutorCallback<C> implements Callback<C>
public ExecutorCallback(Executor executor, int maxRecursion) public ExecutorCallback(Executor executor, int maxRecursion)
{ {
_executor = executor; _executor = executor;
_maxRecursion=maxRecursion; _invoker = new ExecutorCallbackInvoker(maxRecursion);
} }
@Override @Override
public final void completed(final C context) public final void completed(final C context)
{ {
// Should we execute? // Should we execute?
if (!alwaysDispatchCompletion()) if (alwaysDispatchCompletion())
{ {
// Do we have a recursion limit? _invoker.fork(context);
if (_maxRecursion<=0)
{
// No, so just call it directly
onCompleted(context);
return;
} }
else else
{ {
// Has this thread exceeded the recursion limit _invoker.invoke(context);
Integer calls=__calls.get();
if (calls<_maxRecursion)
{
// No, so increment recursion count, call, then decrement
try
{
__calls.set(calls+1);
onCompleted(context);
return;
} }
finally
{
__calls.set(calls);
}
}
}
}
// fallen through to here so execute
_executor.execute(context==null?_onNullContextCompleted:new Runnable()
{
@Override
public void run() { onCompleted(context);}
@Override
public String toString()
{
return String.format("ExectorCB$Completed@%x{%s}",hashCode(),context);
}
});
} }
protected abstract void onCompleted(C context); protected abstract void onCompleted(C context);
@ -116,7 +67,7 @@ public abstract class ExecutorCallback<C> implements Callback<C>
@Override @Override
public String toString() public String toString()
{ {
return String.format("ExectorCB$Failed@%x{%s,%s}",hashCode(),context,x); return String.format("ExecutorCallback@%x{%s,%s}", hashCode(), context, x);
} }
}; };
@ -140,4 +91,43 @@ public abstract class ExecutorCallback<C> implements Callback<C>
{ {
return String.format("%s@%x", getClass(), hashCode()); return String.format("%s@%x", getClass(), hashCode());
} }
private class ExecutorCallbackInvoker extends ForkInvoker<C> implements Runnable
{
private ExecutorCallbackInvoker(int maxInvocations)
{
super(maxInvocations);
}
@Override
public void fork(final C context)
{
_executor.execute(context == null ? this : new Runnable()
{
@Override
public void run()
{
call(context);
}
@Override
public String toString()
{
return String.format("ExecutorCallback@%x{%s}", hashCode(), context);
}
});
}
@Override
public void call(C context)
{
onCompleted(context);
}
@Override
public void run()
{
call(null);
}
}
} }

View File

@ -0,0 +1,138 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
/**
* Utility class that splits calls to {@link #invoke(T)} into calls to {@link #fork(T)} or {@link #call(T)}
* depending on the max number of reentrant calls to {@link #invoke(T)}.
* <p/>
* This class prevents {@link StackOverflowError}s in case of methods that end up invoking themselves,
* such is common for {@link Callback#completed(Object)}.
* <p/>
* Typical use case is:
* <pre>
* public void reentrantMethod(Object param)
* {
* if (condition || tooManyReenters)
* fork(param)
* else
* call(param)
* }
* </pre>
* Calculating {@code tooManyReenters} usually involves using a {@link ThreadLocal} and algebra on the
* number of reentrant invocations, which is factored out in this class for convenience.
* <p />
* The same code using this class becomes:
* <pre>
* private final ForkInvoker invoker = ...;
*
* public void reentrantMethod(Object param)
* {
* invoker.invoke(param);
* }
* </pre>
*
* @param <T> the generic type of this class
*/
public abstract class ForkInvoker<T>
{
private static final ThreadLocal<Integer> __invocations = new ThreadLocal<Integer>()
{
@Override
protected Integer initialValue()
{
return 0;
}
};
private final int _maxInvocations;
/**
* Creates an instance with the given max number of reentrant calls to {@link #invoke(T)}
* <p/>
* If {@code maxInvocations} is zero or negative, it is interpreted
* as if the max number of reentrant calls is infinite.
*
* @param maxInvocations the max number of reentrant calls to {@link #invoke(T)}
*/
public ForkInvoker(int maxInvocations)
{
_maxInvocations = maxInvocations;
}
/**
* Invokes either {@link #fork(T)} or {@link #call(T)}.
* If {@link #condition()} returns true, {@link #fork(T)} is invoked.
* Otherwise, if the max number of reentrant calls is positive and the
* actual number of reentrant invocations exceeds it, {@link #fork(T)} is invoked.
* Otherwise, {@link #call(T)} is invoked.
*
* @param context the invocation context
* @return true if {@link #fork(T)} has been called, false otherwise
*/
public boolean invoke(T context)
{
boolean countInvocations = _maxInvocations > 0;
int invocations = __invocations.get();
if (condition() || countInvocations && invocations > _maxInvocations)
{
fork(context);
return true;
}
else
{
if (countInvocations)
__invocations.set(invocations + 1);
try
{
call(context);
return false;
}
finally
{
if (countInvocations)
__invocations.set(invocations);
}
}
}
/**
* Subclasses should override this method returning true if they want
* {@link #invoke(T)} to call {@link #fork(T)}.
*
* @return true if {@link #invoke(T)} should call {@link #fork(T)}, false otherwise
*/
protected boolean condition()
{
return false;
}
/**
* Executes the forked invocation
*
* @param context the invocation context
*/
public abstract void fork(T context);
/**
* Executes the direct, non-forked, invocation
*
* @param context the invocation context
*/
public abstract void call(T context);
}