398467 Non Blocking IO

Working towards 3.1 by exposing more of the underlying async IO operations.
added async support to write in HttpChannel
This commit is contained in:
Greg Wilkins 2013-05-10 20:11:19 +10:00
parent affc41ddf5
commit fe582e544a
2 changed files with 77 additions and 36 deletions

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -45,7 +46,8 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpChannelState.Next; import org.eclipse.jetty.server.HttpChannelState.Next;
import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -88,7 +90,8 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
private final HttpURI _uri; private final HttpURI _uri;
private final HttpChannelState _state; private final HttpChannelState _state;
private final Request _request; private final Request _request;
private final Response _response; private final Response _response;
private final BlockingCallback _writeblock=new BlockingCallback();
private HttpVersion _version = HttpVersion.HTTP_1_1; private HttpVersion _version = HttpVersion.HTTP_1_1;
private boolean _expect = false; private boolean _expect = false;
private boolean _expect100Continue = false; private boolean _expect100Continue = false;
@ -500,6 +503,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
if (charset != null) if (charset != null)
_request.setCharacterEncodingUnchecked(charset); _request.setCharacterEncodingUnchecked(charset);
break; break;
default:
} }
} }
@ -597,55 +601,91 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
_state.completed(); _state.completed();
} }
} }
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete, final Callback callback)
{ {
boolean committing = _committed.compareAndSet(false, true); boolean committing = _committed.compareAndSet(false, true);
if (committing) if (committing)
{ {
// We need an info to commit // We need an info to commit
if (info==null) if (info==null)
{
info = _response.newResponseInfo(); info = _response.newResponseInfo();
}
try final int status=info.getStatus();
final Callback committed = new Callback()
{ {
// Try to commit with the passed info @Override
_transport.send(info, content, complete); public void succeeded()
{
// If we are committing a 1xx response, we need to reset the commit
// status so that the "real" response can be committed again.
if (status<200 && status>=100)
_committed.set(false);
callback.succeeded();
}
// If we are committing a 1xx response, we need to reset the commit @Override
// status so that the "real" response can be committed again. public void failed(final Throwable x)
if (info.getStatus() < 200) {
_committed.set(false); if (x instanceof EofException)
} {
catch (EofException e) LOG.debug(x);
{ _response.getHttpOutput().closed();
LOG.debug(e); callback.failed(x);
// TODO is it worthwhile sending if we are at EoF? }
// "application" info failed to commit, commit with a failsafe 500 info else
_transport.send(HttpGenerator.RESPONSE_500_INFO,null,true); {
complete=true; LOG.warn(x);
throw e; _transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback()
} {
catch (Exception e) @Override
{ public void succeeded()
LOG.warn(e); {
// "application" info failed to commit, commit with a failsafe 500 info _response.getHttpOutput().closed();
_transport.send(HttpGenerator.RESPONSE_500_INFO,null,true); callback.failed(x);
complete=true; }
throw e;
} @Override
finally public void failed(Throwable th)
{ {
// TODO this indicates the relationship with HttpOutput is not exactly correct LOG.ignore(th);
if (complete) _response.getHttpOutput().closed();
_response.getHttpOutput().closed(); callback.failed(x);
} }
});
}
}
};
_transport.send(info, content, complete, committed);
} }
else if (info==null) else if (info==null)
{ {
// This is a normal write // This is a normal write
_transport.send(null, content, complete); _transport.send(null, content, complete, callback);
} }
else
{
callback.failed(new IllegalStateException("committed"));
}
return committing;
}
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
boolean committing=sendResponse(info,content,complete,_writeblock);
try
{
_writeblock.block();
}
catch (InterruptedException | TimeoutException e)
{
throw new IOException(e);
}
return committing; return committing;
} }

View File

@ -89,6 +89,7 @@ public class ResponseTest
@Override @Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{ {
callback.succeeded();
} }
@Override @Override