392237 moved HttpTransport towards asynchronous solution

This commit is contained in:
Greg Wilkins 2012-10-25 23:28:32 +11:00
parent bf7a0ace22
commit c51cabccc1
15 changed files with 103 additions and 101 deletions

View File

@ -611,7 +611,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{
if (isCommitted())
{
_transport.send(content, complete);
_transport.send(null, content, complete);
}
else
{

View File

@ -619,7 +619,7 @@ public class HttpChannelState implements AsyncContext
protected void scheduleTimeout()
{
Scheduler scheduler = _channel.getScheduler();
if (scheduler!=null)
if (scheduler!=null && _timeoutMs>0)
_event._timeout=scheduler.schedule(new AsyncTimeout(),_timeoutMs,TimeUnit.MILLISECONDS);
}

View File

@ -21,9 +21,7 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpGenerator;
@ -32,6 +30,7 @@ import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -39,7 +38,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -322,8 +321,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
// TODO This is always blocking! One of the important use-cases is to be able to write large static content without a thread
// If we are still expecting a 100 continues
if (_channel.isExpecting100Continue())
// then we can't be persistent
@ -415,9 +412,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public void send(ByteBuffer content, boolean lastContent) throws IOException
public <C> void send(ResponseInfo info, ByteBuffer content, boolean lastContent, C context, Callback<C> callback)
{
send(null, content, lastContent);
try
{
send(info,content,lastContent);
callback.completed(context);
}
catch (IOException e)
{
callback.failed(context,e);
}
}
private void blockingWrite(ByteBuffer... bytes) throws IOException

View File

@ -22,12 +22,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.util.Callback;
public interface HttpTransport
{
void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException;
void send(ByteBuffer content, boolean lastContent) throws IOException;
<C> void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, C context, Callback<C> callback);
void completed();
}

View File

@ -42,6 +42,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.io.AbstractEndPoint;
@ -51,6 +52,7 @@ import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.session.HashSessionIdManager;
import org.eclipse.jetty.server.session.HashSessionManager;
import org.eclipse.jetty.server.session.HashedSession;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.hamcrest.Matchers;
@ -86,7 +88,7 @@ public class ResponseTest
}
@Override
public void send(ByteBuffer content, boolean lastContent) throws IOException
public <C> void send(ResponseInfo info, ByteBuffer content, boolean lastContent, C context, Callback<C> callback)
{
}
@ -94,6 +96,7 @@ public class ResponseTest
public void completed()
{
}
}, input);
}

View File

@ -359,12 +359,12 @@ public class StandardStream implements IStream
public Future<Void> data(DataInfo dataInfo)
{
FutureCallback<Void> fcb = new FutureCallback<>();
data(dataInfo,0,TimeUnit.MILLISECONDS,fcb);
data(dataInfo,0,TimeUnit.MILLISECONDS,null,fcb);
return fcb;
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Callback<Void> callback)
public <C> void data(DataInfo dataInfo, long timeout, TimeUnit unit, C context, Callback<C> callback)
{
if (!canSend())
{
@ -379,15 +379,15 @@ public class StandardStream implements IStream
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo, timeout, unit, callback, null);
session.data(this, dataInfo, timeout, unit, callback, context);
}
@Override
public Future<Void> headers(HeadersInfo headersInfo)
{
Promise<Void> result = new Promise<>();
headers(headersInfo,0,TimeUnit.MILLISECONDS,result);
return result;
FutureCallback<Void> fcb = new FutureCallback<>();
headers(headersInfo,0,TimeUnit.MILLISECONDS,fcb);
return fcb;
}
@Override

View File

@ -158,10 +158,11 @@ public interface Stream
* @param dataInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param context the context passed to the callback
* @param callback the completion callback that gets notified of data sent
* @see #data(DataInfo)
*/
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Callback<Void> callback);
public <C> void data(DataInfo dataInfo, long timeout, TimeUnit unit, C context, Callback<C> callback);
/**
* <p>Sends asynchronously a HEADER frame on this stream.</p>

View File

@ -125,7 +125,7 @@ public class AsyncTimeoutTest
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.data(new StringDataInfo("data", true), timeout, unit, new Callback.Empty<Void>()
stream.data(new StringDataInfo("data", true), timeout, unit, null,new Callback.Empty<Void>()
{
@Override
public void failed(Void context, Throwable x)

View File

@ -430,9 +430,9 @@ public class StandardSessionTest
};
// first data frame should fail on controller.write()
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, callback);
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, null,callback);
// second data frame should fail without controller.writer() as the connection is expected to be broken after first controller.write() call failed.
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, callback);
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, null,callback);
verify(controller, times(1)).write(any(ByteBuffer.class), any(Callback.class), any(FrameBytes.class));
assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));

View File

@ -21,9 +21,8 @@ package org.eclipse.jetty.spdy.server.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
@ -31,7 +30,6 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannelConfig;
import org.eclipse.jetty.server.HttpTransport;
@ -40,10 +38,10 @@ import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -57,6 +55,7 @@ public class HttpTransportOverSPDY implements HttpTransport
private final PushStrategy pushStrategy;
private final Stream stream;
private final Fields requestHeaders;
private final BlockingCallback streamBlocker = new BlockingCallback();
public HttpTransportOverSPDY(Connector connector, HttpChannelConfig configuration, EndPoint endPoint, PushStrategy pushStrategy, Stream stream, Fields requestHeaders)
{
@ -69,7 +68,11 @@ public class HttpTransportOverSPDY implements HttpTransport
}
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
public <C> void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, C context, Callback<C> callback)
{
boolean hasContent = !BufferUtil.isEmpty(content);
if (info!=null)
{
short version = stream.getSession().getVersion();
Fields headers = new Fields();
@ -101,40 +104,28 @@ public class HttpTransportOverSPDY implements HttpTransport
}
}
boolean hasContent = !BufferUtil.isEmpty(content);
boolean close = !hasContent && lastContent;
reply(stream, new ReplyInfo(headers, close));
}
if ((hasContent || lastContent ) && !stream.isClosed() )
stream.data(new ByteBufferDataInfo(content, lastContent),endPoint.getIdleTimeout(),TimeUnit.MILLISECONDS,context,callback);
else
callback.completed(context);
if (hasContent)
sendToStream(content, lastContent);
}
@Override
public void send(ByteBuffer content, boolean lastContent) throws IOException
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
// Guard against a last 0 bytes write
// TODO work out if we can avoid double calls for lastContent==true
if (stream.isClosed() && BufferUtil.isEmpty(content) && lastContent)
return;
sendToStream(content, lastContent);
}
private void sendToStream(ByteBuffer content, boolean lastContent) throws IOException
{
FutureCallback<Void> future = new FutureCallback<>();
stream.data(new ByteBufferDataInfo(content, lastContent),endPoint.getIdleTimeout(),TimeUnit.MILLISECONDS,future);
send(info,content,lastContent,streamBlocker.getPhase(),streamBlocker);
try
{
future.get();
streamBlocker.block();
}
catch (ExecutionException e)
catch (IOException e)
{
LOG.debug(e);
Throwable cause=e.getCause();
throw new EofException(cause);
throw e;
}
catch (Exception e)
{
@ -142,6 +133,7 @@ public class HttpTransportOverSPDY implements HttpTransport
}
}
@Override
public void completed()
{

View File

@ -286,23 +286,23 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Callback<Void> handler)
public <C> void data(DataInfo dataInfo, long timeout, TimeUnit unit, C context, Callback<C> handler)
{
try
{
// Data buffer must be copied, as the ByteBuffer is pooled
ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
send(byteBuffer, dataInfo.isClose());
send(null, byteBuffer, dataInfo.isClose());
if (dataInfo.isClose())
completed();
handler.completed(null);
handler.completed(context);
}
catch (IOException x)
{
handler.failed(null, x);
handler.failed(context, x);
}
}
}
@ -322,10 +322,10 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Callback<Void> handler)
public <C> void data(DataInfo dataInfo, long timeout, TimeUnit unit, C context, Callback<C> handler)
{
// Ignore pushed data
handler.completed(null);
handler.completed(context);
}
}
}

View File

@ -277,7 +277,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
private void data(final Stream stream, final DataInfo dataInfo)
{
clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Callback<Void>()
clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, null,new Callback<Void>()
{
@Override
public void completed(Void context)
@ -393,7 +393,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
{
logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, null,dataInfoHandler);
}
private class DataInfoHandler implements Callback<Void>

View File

@ -134,7 +134,7 @@ public class ResetStreamTest extends AbstractTest
});
Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,new Callback.Empty<Void>()
stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,null,new Callback.Empty<Void>()
{
@Override
public void completed(Void context)
@ -182,9 +182,9 @@ public class ResetStreamTest extends AbstractTest
Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
assertThat("syn is received by server", synLatch.await(5,TimeUnit.SECONDS),is(true));
stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,null);
stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,null,null);
assertThat("stream is reset",rstLatch.await(5,TimeUnit.SECONDS),is(true));
stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Callback.Empty<Void>()
stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,null,new Callback.Empty<Void>()
{
@Override
public void failed(Void context, Throwable x)

View File

@ -177,7 +177,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
@Override
public void completed(Stream stream)
{
stream.data(new StringDataInfo("data_" + stream.getId(), true), 0, TimeUnit.SECONDS, null);
stream.data(new StringDataInfo("data_" + stream.getId(), true), 0, TimeUnit.SECONDS, null,null);
}
});
}

View File

@ -199,7 +199,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertTrue(stream.isHalfClosed());
stream.reply(new ReplyInfo(false));
stream.data(new StringDataInfo(data1, false), 5, TimeUnit.SECONDS, new Callback.Empty<Void>()
stream.data(new StringDataInfo(data1, false), 5, TimeUnit.SECONDS, null,new Callback.Empty<Void>()
{
@Override
public void completed(Void context)