440122 - Remove usages of ForkInvoker.
This commit is contained in:
parent
b6ca7b3fa5
commit
987800c419
|
@ -28,13 +28,13 @@ import org.eclipse.jetty.io.ByteBufferPool;
|
|||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.ForkInvoker;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public abstract class ProxyConnection extends AbstractConnection
|
||||
{
|
||||
protected static final Logger LOG = ConnectHandler.LOG;
|
||||
private final ForkInvoker<Void> invoker = new ProxyForkInvoker();
|
||||
private final IteratingCallback pipe = new ProxyIteratingCallback();
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final ConcurrentMap<String, Object> context;
|
||||
private Connection connection;
|
||||
|
@ -69,52 +69,7 @@ public abstract class ProxyConnection extends AbstractConnection
|
|||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
final ByteBuffer buffer = getByteBufferPool().acquire(getInputBufferSize(), true);
|
||||
try
|
||||
{
|
||||
final int filled = read(getEndPoint(), buffer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} filled {} bytes", this, filled);
|
||||
if (filled > 0)
|
||||
{
|
||||
write(getConnection().getEndPoint(), buffer, new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} wrote {} bytes", this, filled);
|
||||
bufferPool.release(buffer);
|
||||
invoker.invoke(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
LOG.debug(this + " failed to write " + filled + " bytes", x);
|
||||
bufferPool.release(buffer);
|
||||
connection.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
else if (filled == 0)
|
||||
{
|
||||
bufferPool.release(buffer);
|
||||
fillInterested();
|
||||
}
|
||||
else
|
||||
{
|
||||
bufferPool.release(buffer);
|
||||
connection.getEndPoint().shutdownOutput();
|
||||
}
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
LOG.debug(this + " could not fill", x);
|
||||
bufferPool.release(buffer);
|
||||
close();
|
||||
connection.close();
|
||||
}
|
||||
pipe.iterate();
|
||||
}
|
||||
|
||||
protected abstract int read(EndPoint endPoint, ByteBuffer buffer) throws IOException;
|
||||
|
@ -130,29 +85,73 @@ public abstract class ProxyConnection extends AbstractConnection
|
|||
getEndPoint().getRemoteAddress().getPort());
|
||||
}
|
||||
|
||||
private class ProxyForkInvoker extends ForkInvoker<Void> implements Runnable
|
||||
private class ProxyIteratingCallback extends IteratingCallback
|
||||
{
|
||||
private ProxyForkInvoker()
|
||||
private ByteBuffer buffer;
|
||||
private int filled;
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
{
|
||||
super(4);
|
||||
buffer = bufferPool.acquire(getInputBufferSize(), true);
|
||||
try
|
||||
{
|
||||
int filled = this.filled = read(getEndPoint(), buffer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} filled {} bytes", ProxyConnection.this, filled);
|
||||
if (filled > 0)
|
||||
{
|
||||
write(connection.getEndPoint(), buffer, this);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
else if (filled == 0)
|
||||
{
|
||||
bufferPool.release(buffer);
|
||||
fillInterested();
|
||||
return Action.IDLE;
|
||||
}
|
||||
else
|
||||
{
|
||||
bufferPool.release(buffer);
|
||||
connection.getEndPoint().shutdownOutput();
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
LOG.debug(ProxyConnection.this + " could not fill", x);
|
||||
disconnect();
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fork(Void arg)
|
||||
public void succeeded()
|
||||
{
|
||||
getExecutor().execute(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
onFillable();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} wrote {} bytes", ProxyConnection.this, filled);
|
||||
bufferPool.release(buffer);
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void call(Void arg)
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
onFillable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(ProxyConnection.this + " failed to write " + filled + " bytes", x);
|
||||
disconnect();
|
||||
}
|
||||
|
||||
private void disconnect()
|
||||
{
|
||||
bufferPool.release(buffer);
|
||||
ProxyConnection.this.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,135 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
/**
|
||||
* Utility class that splits calls to {@link #invoke(Object)} into calls to {@link #fork(Object)} or {@link #call(Object)}
|
||||
* depending on the max number of reentrant calls to {@link #invoke(Object)}.
|
||||
* <p/>
|
||||
* This class prevents {@link StackOverflowError}s in case of methods that end up invoking themselves,
|
||||
* such is common for {@link Callback#succeeded()}.
|
||||
* <p/>
|
||||
* Typical use case is:
|
||||
* <pre>
|
||||
* public void reentrantMethod(Object param)
|
||||
* {
|
||||
* if (condition || tooManyReenters)
|
||||
* fork(param)
|
||||
* else
|
||||
* call(param)
|
||||
* }
|
||||
* </pre>
|
||||
* Calculating {@code tooManyReenters} usually involves using a {@link ThreadLocal} and algebra on the
|
||||
* number of reentrant invocations, which is factored out in this class for convenience.
|
||||
* <p />
|
||||
* The same code using this class becomes:
|
||||
* <pre>
|
||||
* private final ForkInvoker invoker = ...;
|
||||
*
|
||||
* public void reentrantMethod(Object param)
|
||||
* {
|
||||
* invoker.invoke(param);
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
*/
|
||||
public abstract class ForkInvoker<T>
|
||||
{
|
||||
private static final ThreadLocal<Integer> __invocations = new ThreadLocal<Integer>()
|
||||
{
|
||||
@Override
|
||||
protected Integer initialValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
private final int _maxInvocations;
|
||||
|
||||
/**
|
||||
* Creates an instance with the given max number of reentrant calls to {@link #invoke(Object)}
|
||||
* <p/>
|
||||
* If {@code maxInvocations} is zero or negative, it is interpreted
|
||||
* as if the max number of reentrant calls is infinite.
|
||||
*
|
||||
* @param maxInvocations the max number of reentrant calls to {@link #invoke(Object)}
|
||||
*/
|
||||
public ForkInvoker(int maxInvocations)
|
||||
{
|
||||
_maxInvocations = maxInvocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes either {@link #fork(Object)} or {@link #call(Object)}.
|
||||
* If {@link #condition()} returns true, {@link #fork(Object)} is invoked.
|
||||
* Otherwise, if the max number of reentrant calls is positive and the
|
||||
* actual number of reentrant invocations exceeds it, {@link #fork(Object)} is invoked.
|
||||
* Otherwise, {@link #call(Object)} is invoked.
|
||||
* @param arg TODO
|
||||
*
|
||||
* @return true if {@link #fork(Object)} has been called, false otherwise
|
||||
*/
|
||||
public boolean invoke(T arg)
|
||||
{
|
||||
boolean countInvocations = _maxInvocations > 0;
|
||||
int invocations = __invocations.get();
|
||||
if (condition() || countInvocations && invocations > _maxInvocations)
|
||||
{
|
||||
fork(arg);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (countInvocations)
|
||||
__invocations.set(invocations + 1);
|
||||
try
|
||||
{
|
||||
call(arg);
|
||||
return false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (countInvocations)
|
||||
__invocations.set(invocations);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses should override this method returning true if they want
|
||||
* {@link #invoke(Object)} to call {@link #fork(Object)}.
|
||||
*
|
||||
* @return true if {@link #invoke(Object)} should call {@link #fork(Object)}, false otherwise
|
||||
*/
|
||||
protected boolean condition()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the forked invocation
|
||||
* @param arg TODO
|
||||
*/
|
||||
public abstract void fork(T arg);
|
||||
|
||||
/**
|
||||
* Executes the direct, non-forked, invocation
|
||||
* @param arg TODO
|
||||
*/
|
||||
public abstract void call(T arg);
|
||||
}
|
Loading…
Reference in New Issue