Issue #113 channel persistance and completed getters

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2018-11-15 14:36:40 +01:00
parent 7ca6577ac6
commit 7dd3cfffe6
2 changed files with 78 additions and 39 deletions

View File

@ -34,8 +34,6 @@ import java.util.function.Supplier;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.ServletException;
import javax.servlet.UnavailableException;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -48,9 +46,9 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.QuietException; import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.HttpChannelState.Action; import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.server.handler.ErrorHandler;
@ -74,6 +72,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{ {
private static final Logger LOG = Log.getLogger(HttpChannel.class); private static final Logger LOG = Log.getLogger(HttpChannel.class);
private final AtomicBoolean _committed = new AtomicBoolean(); private final AtomicBoolean _committed = new AtomicBoolean();
private final AtomicBoolean _responseCompleted = new AtomicBoolean();
private final AtomicLong _requests = new AtomicLong(); private final AtomicLong _requests = new AtomicLong();
private final Connector _connector; private final Connector _connector;
private final Executor _executor; private final Executor _executor;
@ -246,6 +245,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return _response; return _response;
} }
public Connection getConnection()
{
return _endPoint.getConnection();
}
public EndPoint getEndPoint() public EndPoint getEndPoint()
{ {
return _endPoint; return _endPoint;
@ -277,6 +281,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
public void recycle() public void recycle()
{ {
_committed.set(false); _committed.set(false);
_responseCompleted.set(false);
_request.recycle(); _request.recycle();
_response.recycle(); _response.recycle();
_committedMetaData=null; _committedMetaData=null;
@ -662,11 +667,13 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
public String toString() public String toString()
{ {
long timeStamp = _request.getTimeStamp(); long timeStamp = _request.getTimeStamp();
return String.format("%s@%x{r=%s,c=%b,a=%s,uri=%s,age=%d}", return String.format("%s@%x{r=%s,c=%b,c=%b/%b,a=%s,uri=%s,age=%d}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
_requests, _requests,
_committed.get(), _committed.get(),
isRequestCompleted(),
isResponseCompleted(),
_state.getState(), _state.getState(),
_request.getHttpURI(), _request.getHttpURI(),
timeStamp == 0 ? 0 : System.currentTimeMillis() - timeStamp); timeStamp == 0 ? 0 : System.currentTimeMillis() - timeStamp);
@ -828,7 +835,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// wrap callback to process 100 responses // wrap callback to process 100 responses
final int status=info.getStatus(); final int status=info.getStatus();
final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback, content, complete); final Callback committed = (status<200&&status>=100)?new Send100Callback(callback):new SendCallback(callback, content, true, complete);
notifyResponseBegin(_request); notifyResponseBegin(_request);
@ -838,7 +845,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
else if (info==null) else if (info==null)
{ {
// This is a normal write // This is a normal write
_transport.send(null,_request.isHead(), content, complete, new ContentCallback(callback, content, complete)); _transport.send(null,_request.isHead(), content, complete, new SendCallback(callback, content, false, complete));
} }
else else
{ {
@ -878,6 +885,27 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
return _committed.get(); return _committed.get();
} }
/**
* @return True if the request lifecycle is completed
*/
public boolean isRequestCompleted()
{
return _state.isCompleted();
}
/**
* @return True if the response is completely written.
*/
public boolean isResponseCompleted()
{
return _responseCompleted.get();
}
public boolean isPersistent()
{
return _endPoint.isOpen();
}
/** /**
* <p>Non-Blocking write, committing the response if needed.</p> * <p>Non-Blocking write, committing the response if needed.</p>
* Called as last link in HttpOutput.Filter chain * Called as last link in HttpOutput.Filter chain
@ -1222,17 +1250,19 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
} }
} }
private class CommitCallback extends Callback.Nested private class SendCallback extends Callback.Nested
{ {
private final ByteBuffer _content; private final ByteBuffer _content;
private final int _length; private final int _length;
private final boolean _commit;
private final boolean _complete; private final boolean _complete;
private CommitCallback(Callback callback, ByteBuffer content, boolean complete) private SendCallback(Callback callback, ByteBuffer content, boolean commit, boolean complete)
{ {
super(callback); super(callback);
_content = content == null ? BufferUtil.EMPTY_BUFFER : content.slice(); _content = content == null ? BufferUtil.EMPTY_BUFFER : content.slice();
_length = _content.remaining(); _length = _content.remaining();
_commit = commit;
_complete = complete; _complete = complete;
} }
@ -1241,12 +1271,16 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{ {
_written += _length; _written += _length;
super.succeeded(); super.succeeded();
if (_commit)
notifyResponseCommit(_request); notifyResponseCommit(_request);
if (_content.hasRemaining()) if (_length>0)
notifyResponseContent(_request, _content); notifyResponseContent(_request, _content);
if (_complete) if (_complete)
{
_responseCompleted.set(true);
notifyResponseEnd(_request); notifyResponseEnd(_request);
} }
}
@Override @Override
public void failed(final Throwable x) public void failed(final Throwable x)
@ -1281,11 +1315,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
} }
} }
private class Commit100Callback extends CommitCallback private class Send100Callback extends SendCallback
{ {
private Commit100Callback(Callback callback) private Send100Callback(Callback callback)
{ {
super(callback, null, false); super(callback, null, false, false);
} }
@Override @Override
@ -1297,30 +1331,4 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
super.failed(new IllegalStateException()); super.failed(new IllegalStateException());
} }
} }
private class ContentCallback extends Callback.Nested
{
private final ByteBuffer _content;
private final int _length;
private final boolean _complete;
private ContentCallback(Callback callback, ByteBuffer content, boolean complete)
{
super(callback);
_content = content == null ? BufferUtil.EMPTY_BUFFER : content.slice();
_length = _content.remaining();
_complete = complete;
}
@Override
public void succeeded()
{
_written += _length;
super.succeeded();
if (_content.hasRemaining())
notifyResponseContent(_request, _content);
if (_complete)
notifyResponseEnd(_request);
}
}
} }

View File

@ -329,9 +329,12 @@ public class RequestLogTest
data.add(new Object[] { new ResponseSendErrorHandler(), "/sendError", "\"GET /sendError HTTP/1.0\" 599" }); data.add(new Object[] { new ResponseSendErrorHandler(), "/sendError", "\"GET /sendError HTTP/1.0\" 599" });
data.add(new Object[] { new ServletExceptionHandler(), "/sex", "\"GET /sex HTTP/1.0\" 500" }); data.add(new Object[] { new ServletExceptionHandler(), "/sex", "\"GET /sex HTTP/1.0\" 500" });
data.add(new Object[] { new IOExceptionHandler(), "/ioex", "\"GET /ioex HTTP/1.0\" 500" }); data.add(new Object[] { new IOExceptionHandler(), "/ioex", "\"GET /ioex HTTP/1.0\" 500" });
data.add(new Object[] { new IOExceptionPartialHandler(), "/ioex", "\"GET /ioex HTTP/1.0\" 200" });
data.add(new Object[] { new RuntimeExceptionHandler(), "/rtex", "\"GET /rtex HTTP/1.0\" 500" }); data.add(new Object[] { new RuntimeExceptionHandler(), "/rtex", "\"GET /rtex HTTP/1.0\" 500" });
data.add(new Object[] { new BadMessageHandler(), "/bad", "\"GET /bad HTTP/1.0\" 499" }); data.add(new Object[] { new BadMessageHandler(), "/bad", "\"GET /bad HTTP/1.0\" 499" });
data.add(new Object[] { new AbortHandler(), "/bad", "\"GET /bad HTTP/1.0\" 488" }); data.add(new Object[] { new AbortHandler(), "/bad", "\"GET /bad HTTP/1.0\" 488" });
data.add(new Object[] { new AbortPartialHandler(), "/bad", "\"GET /bad HTTP/1.0\" 200" });
return data.stream().map(Arguments::of); return data.stream().map(Arguments::of);
} }
@ -623,6 +626,20 @@ public class RequestLogTest
} }
} }
private static class IOExceptionPartialHandler extends AbstractTestHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentType("text/plain");
response.setContentLength(100);
response.getOutputStream().println("You were expecting maybe a ");
response.flushBuffer();
throw new IOException("expected");
}
}
private static class RuntimeExceptionHandler extends AbstractTestHandler private static class RuntimeExceptionHandler extends AbstractTestHandler
{ {
@Override @Override
@ -652,6 +669,20 @@ public class RequestLogTest
} }
} }
private static class AbortPartialHandler extends AbstractTestHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentType("text/plain");
response.setContentLength(100);
response.getOutputStream().println("You were expecting maybe a ");
response.flushBuffer();
baseRequest.getHttpChannel().abort(new Throwable("bomb"));
}
}
public static class OKErrorHandler extends ErrorHandler public static class OKErrorHandler extends ErrorHandler
{ {
@Override @Override