jetty-9 moved FutureCallback to jetty-util

This commit is contained in:
Greg Wilkins 2012-05-09 18:27:09 +02:00
parent 2b0cbac192
commit 979dae0021
8 changed files with 119 additions and 27 deletions

View File

@ -16,6 +16,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;

View File

@ -30,11 +30,9 @@ import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.DispatchingIOFuture;
import org.eclipse.jetty.io.DoneIOFuture;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IOFuture;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task;
@ -55,10 +53,9 @@ public class HttpConnection extends AbstractAsyncConnection
private final HttpGenerator _generator;
private final HttpChannel _channel;
private final ByteBufferPool _bufferPool;
private IOFuture _writeFuture;
FutureCallback<Void> _writeFuture;
ByteBuffer _requestBuffer=null;
ByteBuffer _responseHeader=null;
ByteBuffer _chunk=null;
@ -307,7 +304,7 @@ public class HttpConnection extends AbstractAsyncConnection
throw new IllegalStateException("!chunk when content length known");
case FLUSH:
write(_responseHeader,_chunk,_responseBuffer).block();
write(_responseHeader,_chunk,_responseBuffer).get();
break;
case FLUSH_CONTENT:
@ -331,7 +328,7 @@ public class HttpConnection extends AbstractAsyncConnection
catch(ExecutionException e)
{
LOG.debug(e);
DispatchingIOFuture.rethrow(e);
FutureCallback.rethrow(e);
}
finally
{
@ -357,7 +354,7 @@ public class HttpConnection extends AbstractAsyncConnection
{
// block if the last write is not complete
if (_writeFuture!=null && !_writeFuture.isDone())
_writeFuture.block();
_writeFuture.get();
if (LOG.isDebugEnabled())
LOG.debug("{}: generate({},{},{},{},{})@{}",
@ -396,13 +393,13 @@ public class HttpConnection extends AbstractAsyncConnection
case FLUSH:
if (hasContent)
write(_responseHeader,_chunk,_responseBuffer).block();
write(_responseHeader,_chunk,_responseBuffer).get();
else
_writeFuture=write(_responseHeader,_chunk,_responseBuffer);
break;
case FLUSH_CONTENT:
write(_responseHeader,_chunk,content).block();
write(_responseHeader,_chunk,content).get();
break;
case SHUTDOWN_OUT:
@ -434,21 +431,24 @@ public class HttpConnection extends AbstractAsyncConnection
return (int)(preparedAfter-preparedBefore);
}
private IOFuture write(ByteBuffer b0,ByteBuffer b1,ByteBuffer b2)
private FutureCallback<Void> write(ByteBuffer b0,ByteBuffer b1,ByteBuffer b2)
{
FutureCallback<Void> fcb=new FutureCallback<>();
if (BufferUtil.hasContent(b0))
{
if (BufferUtil.hasContent(b1))
{
if (BufferUtil.hasContent(b2))
return getEndPoint().write(b0,b1,b2);
return getEndPoint().write(b0,b1);
getEndPoint().write(null,fcb,b0,b1,b2);
else
getEndPoint().write(null,fcb,b0,b1);
}
else
{
if (BufferUtil.hasContent(b2))
return getEndPoint().write(b0,b2);
return getEndPoint().write(b0);
getEndPoint().write(null,fcb,b0,b2);
else
getEndPoint().write(null,fcb,b0);
}
}
else
@ -456,16 +456,19 @@ public class HttpConnection extends AbstractAsyncConnection
if (BufferUtil.hasContent(b1))
{
if (BufferUtil.hasContent(b2))
return getEndPoint().write(b1,b2);
return getEndPoint().write(b1);
getEndPoint().write(null,fcb,b1,b2);
else
getEndPoint().write(null,fcb,b1);
}
else
{
if (BufferUtil.hasContent(b2))
return getEndPoint().write(b2);
return DoneIOFuture.COMPLETE;
getEndPoint().write(null,fcb,b2);
else
fcb.completed(null);
}
}
return fcb;
}
/* ------------------------------------------------------------ */
@ -637,7 +640,9 @@ public class HttpConnection extends AbstractAsyncConnection
try
{
// Wait until we can read
getEndPoint().readable().block();
FutureCallback<Void> block=new FutureCallback<>();
getEndPoint().readable(null,block);
block.get();
// We will need a buffer to read into
if (_requestBuffer==null)
@ -654,7 +659,7 @@ public class HttpConnection extends AbstractAsyncConnection
catch (ExecutionException e)
{
LOG.debug(e);
DispatchingIOFuture.rethrow(e);
FutureCallback.rethrow(e);
}
finally
{

View File

@ -0,0 +1,60 @@
package org.eclipse.jetty.util;
import java.util.concurrent.Executor;
public class ExecutorCallback<C> implements Callback<C>
{
private final Executor _executor;
private final Runnable _onNullContextCompleted = new Runnable()
{
@Override
public void run() { onCompleted(null); }
};
public ExecutorCallback(Executor executor)
{
_executor=executor;
}
@Override
public void completed(final C context)
{
if (execute())
{
_executor.execute(context==null?
_onNullContextCompleted:
new Runnable()
{
@Override
public void run() { onCompleted(context);}
});
}
else
onCompleted(context);
}
protected void onCompleted(C context)
{
}
@Override
public void failed(final C context, final Throwable x)
{
_executor.execute(new Runnable()
{
@Override
public void run() { onFailed(context,x);}
});
}
protected void onFailed(C context, Throwable x)
{
}
protected boolean execute()
{
return _executor!=null;
}
}

View File

@ -1,5 +1,6 @@
package org.eclipse.jetty.io;
package org.eclipse.jetty.util;
import java.io.IOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -8,7 +9,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
public class FutureCallback<C> implements Future<C>,Callback<C>
{
@ -98,4 +98,15 @@ public class FutureCallback<C> implements Future<C>,Callback<C>
throw new ExecutionException(_cause);
}
public static void rethrow(ExecutionException e) throws IOException
{
Throwable cause=e.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
if (cause instanceof Error)
throw (Error)cause;
if (cause instanceof RuntimeException)
throw (RuntimeException)cause;
throw new RuntimeException(cause);
}
}

View File

@ -113,6 +113,14 @@ public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool,
this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue));
}
/* ------------------------------------------------------------ */
@Override
public void execute(Runnable job)
{
_executor.execute(job);
}
/* ------------------------------------------------------------ */
public boolean dispatch(Runnable job)
{

View File

@ -36,7 +36,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
{
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.component.LifeCycle;
/* ------------------------------------------------------------ */
@ -20,9 +22,13 @@ import org.eclipse.jetty.util.component.LifeCycle;
*
*
*/
public interface ThreadPool
public interface ThreadPool extends Executor
{
/* ------------------------------------------------------------ */
/**
* @deprecated use {@link Executor#execute(Runnable)}
*/
@Deprecated
public abstract boolean dispatch(Runnable job);
/* ------------------------------------------------------------ */

View File

@ -1,4 +1,4 @@
package org.eclipse.jetty.io;
package org.eclipse.jetty.util;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
@ -8,6 +8,7 @@ import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;