398467 Servlet 3.1 Non Blocking IO

Refactored HttpChannel to be ready to control onWritePossible callbacks
This commit is contained in:
Greg Wilkins 2013-05-23 17:34:28 +10:00
parent 15e0d50b79
commit 6f8ed5e809
7 changed files with 566 additions and 258 deletions

View File

@ -48,8 +48,12 @@ public class EncodingHttpWriter extends HttpWriter
@Override @Override
public void write (char[] s,int offset, int length) throws IOException public void write (char[] s,int offset, int length) throws IOException
{ {
if (length==0) HttpOutput out = _out;
_out.closeIfAllContentWritten(); if (length==0 && out.isAllContentWritten())
{
out.close();
return;
}
while (length > 0) while (length > 0)
{ {
@ -58,7 +62,7 @@ public class EncodingHttpWriter extends HttpWriter
_converter.write(s, offset, chars); _converter.write(s, offset, chars);
_converter.flush(); _converter.flush();
_bytes.writeTo(_out); _bytes.writeTo(out);
length-=chars; length-=chars;
offset+=chars; offset+=chars;
} }

View File

@ -21,12 +21,12 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -44,7 +44,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpChannelState.Next; import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -251,36 +251,50 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
// The loop is controlled by the call to async.unhandle in the // The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has // finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called. // already happened when unhandle is called.
HttpChannelState.Next next = _state.handling(); HttpChannelState.Action action = _state.handling();
while (next==Next.CONTINUE && getServer().isRunning()) loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && getServer().isRunning())
{ {
try try
{ {
switch(action)
{
case REQUEST_DISPATCH:
_request.setHandled(false); _request.setHandled(false);
_response.getHttpOutput().reopen(); _response.getHttpOutput().reopen();
if (_state.isInitial())
{
_request.setTimeStamp(System.currentTimeMillis()); _request.setTimeStamp(System.currentTimeMillis());
_request.setDispatcherType(DispatcherType.REQUEST); _request.setDispatcherType(DispatcherType.REQUEST);
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
customizer.customize(getConnector(),_configuration,_request); customizer.customize(getConnector(),_configuration,_request);
getServer().handle(this); getServer().handle(this);
} break;
else
{ case ASYNC_DISPATCH:
if (_request.getHttpChannelState().isExpired()) _request.setHandled(false);
{ _response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ASYNC);
getServer().handleAsync(this);
break;
case ASYNC_EXPIRED:
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR); _request.setDispatcherType(DispatcherType.ERROR);
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500)); _request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500));
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE,"Async Timeout"); _request.setAttribute(RequestDispatcher.ERROR_MESSAGE,"Async Timeout");
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI()); _request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI());
_response.setStatusWithReason(500,"Async Timeout"); _response.setStatusWithReason(500,"Async Timeout");
}
else
_request.setDispatcherType(DispatcherType.ASYNC);
getServer().handleAsync(this); getServer().handleAsync(this);
break;
case IO_CALLBACK:
_response.getHttpOutput().handle();
break;
default:
break loop;
} }
} }
catch (Error e) catch (Error e)
@ -302,7 +316,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
} }
finally finally
{ {
next = _state.unhandle(); action = _state.unhandle();
} }
} }
@ -310,7 +324,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
Thread.currentThread().setName(threadName); Thread.currentThread().setName(threadName);
setCurrentHttpChannel(null); setCurrentHttpChannel(null);
if (next==Next.COMPLETE) if (action==Action.COMPLETE)
{ {
try try
{ {
@ -332,11 +346,11 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
} }
finally finally
{ {
next=Next.RECYCLE; action=Action.RECYCLE;
} }
} }
if (next==Next.RECYCLE) if (action==Action.RECYCLE)
{ {
_request.setHandled(true); _request.setHandled(true);
_transport.completed(); _transport.completed();
@ -344,7 +358,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
LOG.debug("{} handle exit", this); LOG.debug("{} handle exit", this);
return next!=Next.WAIT; return action!=Action.WAIT;
} }
/** /**
@ -591,7 +605,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
try try
{ {
if (_state.handling()==Next.CONTINUE) if (_state.handling()==Action.REQUEST_DISPATCH)
sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true); sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true);
} }
catch (IOException e) catch (IOException e)
@ -600,8 +614,10 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
} }
finally finally
{ {
if (_state.unhandle()==Next.COMPLETE) if (_state.unhandle()==Action.COMPLETE)
_state.completed(); _state.completed();
else
throw new IllegalStateException();
} }
} }

View File

@ -27,6 +27,7 @@ import javax.servlet.AsyncListener;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.ServletResponse; import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context; import org.eclipse.jetty.server.handler.ContextHandler.Context;
@ -72,9 +73,12 @@ public class HttpChannelState
COMPLETED // Request is complete COMPLETED // Request is complete
} }
public enum Next public enum Action
{ {
CONTINUE, // Continue handling the channel REQUEST_DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
ASYNC_EXPIRED, // handle an async timeout
IO_CALLBACK, // handle an IO callback
WAIT, // Wait for further events WAIT, // Wait for further events
COMPLETE, // Complete the channel COMPLETE, // Complete the channel
RECYCLE, // Channel is completed RECYCLE, // Channel is completed
@ -164,7 +168,7 @@ public class HttpChannelState
/** /**
* @return Next handling of the request should proceed * @return Next handling of the request should proceed
*/ */
protected Next handling() protected Action handling()
{ {
synchronized (this) synchronized (this)
{ {
@ -182,32 +186,30 @@ public class HttpChannelState
_asyncListeners=_lastAsyncListeners; _asyncListeners=_lastAsyncListeners;
_lastAsyncListeners=null; _lastAsyncListeners=null;
} }
break; _responseWrapped=false;
return Action.REQUEST_DISPATCH;
case COMPLETECALLED: case COMPLETECALLED:
_state=State.COMPLETING; _state=State.COMPLETING;
return Next.COMPLETE; return Action.COMPLETE;
case COMPLETING: case COMPLETING:
return Next.COMPLETE; return Action.COMPLETE;
case ASYNCWAIT: case ASYNCWAIT:
return Next.WAIT; return Action.WAIT;
case COMPLETED: case COMPLETED:
return Next.RECYCLE; return Action.RECYCLE;
case REDISPATCH: case REDISPATCH:
_state=State.REDISPATCHED; _state=State.REDISPATCHED;
break; _responseWrapped=false;
return _expired?Action.ASYNC_EXPIRED:Action.ASYNC_DISPATCH;
default: default:
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
} }
_responseWrapped=false;
return Next.CONTINUE;
} }
} }
@ -270,7 +272,7 @@ public class HttpChannelState
* @return next actions * @return next actions
* be handled again (eg because of a resume that happened before unhandle was called) * be handled again (eg because of a resume that happened before unhandle was called)
*/ */
protected Next unhandle() protected Action unhandle()
{ {
synchronized (this) synchronized (this)
{ {
@ -279,7 +281,7 @@ public class HttpChannelState
case REDISPATCHED: case REDISPATCHED:
case DISPATCHED: case DISPATCHED:
_state=State.COMPLETING; _state=State.COMPLETING;
return Next.COMPLETE; return Action.COMPLETE;
case IDLE: case IDLE:
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
@ -288,17 +290,17 @@ public class HttpChannelState
_initial=false; _initial=false;
_state=State.ASYNCWAIT; _state=State.ASYNCWAIT;
scheduleTimeout(); scheduleTimeout();
return Next.WAIT; return Action.WAIT;
case REDISPATCHING: case REDISPATCHING:
_initial=false; _initial=false;
_state=State.REDISPATCHED; _state=State.REDISPATCHED;
return Next.CONTINUE; return _expired?Action.ASYNC_EXPIRED:Action.ASYNC_DISPATCH;
case COMPLETECALLED: case COMPLETECALLED:
_initial=false; _initial=false;
_state=State.COMPLETING; _state=State.COMPLETING;
return Next.COMPLETE; return Action.COMPLETE;
default: default:
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
@ -650,6 +652,13 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute); _channel.getRequest().setAttribute(name,attribute);
} }
public void asyncIO()
{
// TODO Auto-generated method stub
}
public class AsyncTimeout implements Runnable public class AsyncTimeout implements Runnable
{ {
@Override @Override

View File

@ -23,7 +23,8 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.TimeoutException; import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
@ -32,14 +33,13 @@ import javax.servlet.ServletResponse;
import javax.servlet.WriteListener; import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.resource.Resource;
/** /**
* <p>{@link HttpOutput} implements {@link ServletOutputStream} * <p>{@link HttpOutput} implements {@link ServletOutputStream}
@ -55,11 +55,25 @@ public class HttpOutput extends ServletOutputStream
{ {
private static Logger LOG = Log.getLogger(HttpOutput.class); private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel<?> _channel; private final HttpChannel<?> _channel;
private boolean _closed;
private long _written; private long _written;
private ByteBuffer _aggregate; private ByteBuffer _aggregate;
private int _bufferSize; private int _bufferSize;
private WriteListener _writeListener; private WriteListener _writeListener;
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY
-------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise
write() OPEN ise PENDING wpe wpe
flush() OPEN ise PENDING wpe wpe
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
write completed - - - ASYNC READY->owp
*/
enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
public HttpOutput(HttpChannel<?> channel) public HttpOutput(HttpChannel<?> channel)
@ -86,22 +100,21 @@ public class HttpOutput extends ServletOutputStream
public void reopen() public void reopen()
{ {
_closed = false; _state.set(State.OPEN);
} }
/** Called by the HttpChannel if the output was closed public boolean isAllContentWritten()
* externally (eg by a 500 exception handling).
*/
void closed()
{ {
_closed = true; return _channel.getResponse().isAllContentWritten(_written);
releaseBuffer();
} }
@Override @Override
public void close() public void close()
{ {
if (!isClosed()) State state=_state.get();
while(state!=State.CLOSED)
{
if (_state.compareAndSet(state,State.CLOSED))
{ {
try try
{ {
@ -115,8 +128,35 @@ public class HttpOutput extends ServletOutputStream
_channel.getEndPoint().shutdownOutput(); _channel.getEndPoint().shutdownOutput();
LOG.ignore(e); LOG.ignore(e);
} }
releaseBuffer();
return;
}
state=_state.get();
}
}
/* Called to indicated that the output is already closed and the state needs to be updated to match */
void closed()
{
State state=_state.get();
while(state!=State.CLOSED)
{
if (_state.compareAndSet(state,State.CLOSED))
{
try
{
_channel.getResponse().closeOutput();
}
catch(IOException e)
{
_channel.getEndPoint().shutdownOutput();
LOG.ignore(e);
}
releaseBuffer();
return;
}
state=_state.get();
} }
closed();
} }
private void releaseBuffer() private void releaseBuffer()
@ -130,43 +170,108 @@ public class HttpOutput extends ServletOutputStream
public boolean isClosed() public boolean isClosed()
{ {
return _closed; return _state.get()==State.CLOSED;
} }
@Override @Override
public void flush() throws IOException public void flush() throws IOException
{ {
if (isClosed()) while(true)
return; {
switch(_state.get())
{
case OPEN:
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
_channel.write(_aggregate, false); _channel.write(_aggregate, false);
else else
_channel.write(BufferUtil.EMPTY_BUFFER, false); _channel.write(BufferUtil.EMPTY_BUFFER, false);
return;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
continue;
new AsyncFlush().process();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case CLOSED:
return;
}
break;
}
} }
public boolean closeIfAllContentWritten() throws IOException
{
Response response=_channel.getResponse();
if (response.isAllContentWritten(_written))
{
response.closeOutput();
return true;
}
return false;
}
@Override @Override
public void write(byte[] b, int off, int len) throws IOException public void write(byte[] b, int off, int len) throws IOException
{ {
if (isClosed())
throw new EOFException("Closed");
_written+=len; _written+=len;
boolean complete=_channel.getResponse().isAllContentWritten(_written); boolean complete=_channel.getResponse().isAllContentWritten(_written);
int capacity = getBufferSize();
// Async or Blocking ?
while(true)
{
switch(_state.get())
{
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
continue;
// Should we aggregate? // Should we aggregate?
int capacity = getBufferSize();
if (!complete && len<=capacity/4)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, false);
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (!complete && filled==len && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException();
return;
}
// adjust offset/length
off+=filled;
len-=filled;
}
// Do the asynchronous writing from the callback
new AsyncWrite(b,off,len,complete).process();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case CLOSED:
throw new EofException("Closed");
}
break;
}
// handle blocking write
// Should we aggregate?
int capacity = getBufferSize();
if (!complete && len<=capacity/4) if (!complete && len<=capacity/4)
{ {
if (_aggregate == null) if (_aggregate == null)
@ -204,34 +309,74 @@ public class HttpOutput extends ServletOutputStream
if (complete) if (complete)
{ {
closed(); closed();
_channel.getResponse().closeOutput();
} }
} }
@Override @Override
public void write(int b) throws IOException public void write(int b) throws IOException
{ {
if (isClosed()) _written+=1;
throw new EOFException("Closed"); boolean complete=_channel.getResponse().isAllContentWritten(_written);
// Allocate an aggregate buffer. // Async or Blocking ?
// Never direct as it is slow to do little writes to a direct buffer. while(true)
{
switch(_state.get())
{
case OPEN:
if (_aggregate == null) if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
BufferUtil.append(_aggregate, (byte)b); BufferUtil.append(_aggregate, (byte)b);
_written++;
// Check if all written or full // Check if all written or full
if (!closeIfAllContentWritten() && BufferUtil.isFull(_aggregate)) if (complete || BufferUtil.isFull(_aggregate))
{ {
BlockingCallback callback = _channel.getWriteBlockingCallback(); BlockingCallback callback = _channel.getWriteBlockingCallback();
_channel.write(_aggregate, false, callback); _channel.write(_aggregate, complete, callback);
callback.block(); callback.block();
if (complete)
closed();
}
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
continue;
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
if (!complete && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException();
return;
}
// Do the asynchronous writing from the callback
new AsyncFlush().process();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case CLOSED:
throw new EofException("Closed");
}
break;
} }
} }
@Override @Override
public void print(String s) throws IOException public void print(String s) throws IOException
{ {
@ -241,51 +386,6 @@ public class HttpOutput extends ServletOutputStream
write(s.getBytes(_channel.getResponse().getCharacterEncoding())); write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
} }
/* ------------------------------------------------------------ */
/** Set headers and send content.
* @deprecated Use {@link Response#setHeaders(HttpContent)} and {@link #sendContent(HttpContent)} instead.
* @param content
* @throws IOException
*/
@Deprecated
public void sendContent(Object content) throws IOException
{
final BlockingCallback callback =_channel.getWriteBlockingCallback();
if (content instanceof HttpContent)
{
_channel.getResponse().setHeaders((HttpContent)content);
sendContent((HttpContent)content,callback);
}
else if (content instanceof Resource)
{
Resource resource = (Resource)content;
_channel.getResponse().getHttpFields().putDateField(HttpHeader.LAST_MODIFIED, resource.lastModified());
ReadableByteChannel in=((Resource)content).getReadableByteChannel();
if (in!=null)
sendContent(in,callback);
else
sendContent(resource.getInputStream(),callback);
}
else if (content instanceof ByteBuffer)
{
sendContent((ByteBuffer)content,callback);
}
else if (content instanceof ReadableByteChannel)
{
sendContent((ReadableByteChannel)content,callback);
}
else if (content instanceof InputStream)
{
sendContent((InputStream)content,callback);
}
else
callback.failed(new IllegalArgumentException("unknown content type "+content.getClass()));
callback.block();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Blocking send of content. /** Blocking send of content.
* @param content The content to send * @param content The content to send
@ -373,14 +473,26 @@ public class HttpOutput extends ServletOutputStream
*/ */
public void sendContent(HttpContent httpContent, Callback callback) throws IOException public void sendContent(HttpContent httpContent, Callback callback) throws IOException
{ {
if (isClosed())
throw new IOException("Closed");
if (BufferUtil.hasContent(_aggregate)) if (BufferUtil.hasContent(_aggregate))
throw new IOException("written"); throw new IOException("written");
if (_channel.isCommitted()) if (_channel.isCommitted())
throw new IOException("committed"); throw new IOException("committed");
_closed=true; while (true)
{
switch(_state.get())
{
case OPEN:
if (!_state.compareAndSet(State.OPEN, State.PENDING))
continue;
break;
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null; ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
if (buffer == null) if (buffer == null)
@ -425,23 +537,16 @@ public class HttpOutput extends ServletOutputStream
BufferUtil.clear(_aggregate); BufferUtil.clear(_aggregate);
} }
@Override @Override
public void setWriteListener(WriteListener writeListener) public void setWriteListener(WriteListener writeListener)
{
if (_state.compareAndSet(State.OPEN, State.READY))
{ {
_writeListener = writeListener; _writeListener = writeListener;
_channel.getState().asyncIO();
// TODO 3.1 implement behaviour }
/* else
Registering a WriteListener will start non-blocking IO. It is illegal to switch to throw new IllegalStateException();
the traditional blocking IO at that point.
*/
/* WriteListener.onWritePossible() will be called IFF (if and only if) canWrite has been
called AND has returned false AND a writeListener has previously been
set.
*/
} }
/** /**
@ -450,9 +555,204 @@ public class HttpOutput extends ServletOutputStream
@Override @Override
public boolean isReady() public boolean isReady()
{ {
// TODO 3.1 Auto-generated method stub while (true)
{
switch(_state.get())
{
case OPEN:
return true;
case ASYNC:
if (!_state.compareAndSet(State.ASYNC, State.READY))
continue;
return true;
case READY:
return true;
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.UNREADY))
continue;
return false;
case UNREADY:
return false;
case CLOSED:
return false; return false;
} }
}
}
public void handle()
{
if (_state.get()==State.READY)
{
try
{
_writeListener.onWritePossible();
return;
}
catch(Exception e)
{
_onError=e;
}
}
if(_onError!=null)
{
Throwable th=_onError;
_onError=null;
_writeListener.onError(th);
close();
}
}
private class AsyncWrite implements Callback
{
private final byte[] _b;
private final int _off;
private final int _len;
private final boolean _complete;
private boolean _flushed;
public AsyncWrite(byte[] b, int off, int len, boolean complete)
{
_b=b;
_off=off;
_len=len;
_complete=complete;
}
public void process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_channel.write(_aggregate, _complete && _len==0, this);
return;
}
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_aggregate.capacity()/4)
BufferUtil.append(_aggregate, _b, _off, _len);
else if (_len>0 && !_flushed)
{
_channel.write(ByteBuffer.wrap(_b, _off, _len), _complete,this);
return;
}
try
{
if (_complete)
{
closed();
_channel.getResponse().closeOutput();
}
while(true)
{
switch(_state.get())
{
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
continue;
return;
case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY))
continue;
_channel.getState().asyncIO();
return;
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
}
}
catch (Exception e)
{
_onError=e;
_channel.getState().asyncIO();
close();
}
}
@Override
public void succeeded()
{
process();
}
@Override
public void failed(Throwable e)
{
_onError=e;
_channel.getState().asyncIO();
}
}
private class AsyncFlush implements Callback
{
private boolean _flushed;
public AsyncFlush()
{
}
public void process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
_channel.write(_aggregate, false, this);
return;
}
if (!_flushed)
_channel.write(BufferUtil.EMPTY_BUFFER,false,this);
try
{
while(true)
{
switch(_state.get())
{
case PENDING:
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
continue;
return;
case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY))
continue;
_channel.getState().asyncIO();
return;
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
}
}
catch (Exception e)
{
_onError=e;
_channel.getState().asyncIO();
}
}
@Override
public void succeeded()
{
process();
}
@Override
public void failed(Throwable e)
{
_onError=e;
_channel.getState().asyncIO();
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** An iterating callback that will take content from an /** An iterating callback that will take content from an
@ -557,7 +857,6 @@ public class HttpOutput extends ServletOutputStream
_buffer.flip(); _buffer.flip();
_channel.write(_buffer,eof,this); _channel.write(_buffer,eof,this);
return false; return false;
} }
@Override @Override
@ -567,4 +866,6 @@ public class HttpOutput extends ServletOutputStream
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
} }
} }
} }

View File

@ -35,8 +35,11 @@ public class Iso88591HttpWriter extends HttpWriter
public void write (char[] s,int offset, int length) throws IOException public void write (char[] s,int offset, int length) throws IOException
{ {
HttpOutput out = _out; HttpOutput out = _out;
if (length==0) if (length==0 && out.isAllContentWritten())
out.closeIfAllContentWritten(); {
out.close();
return;
}
if (length==1) if (length==1)
{ {

View File

@ -1605,9 +1605,7 @@ public class Request implements HttpServletRequest
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* /*
* Set a request attribute. if the attribute name is "org.eclipse.jetty.server.server.Request.queryEncoding" then the value is also passed in a call to * Set a request attribute. if the attribute name is "org.eclipse.jetty.server.server.Request.queryEncoding" then the value is also passed in a call to
* {@link #setQueryEncoding}. <p> if the attribute name is "org.eclipse.jetty.server.server.ResponseBuffer", then the response buffer is flushed with @{link * {@link #setQueryEncoding}.
* #flushResponseBuffer} <p> if the attribute name is "org.eclipse.jetty.io.EndPoint.maxIdleTime", then the value is passed to the associated {@link
* EndPoint#setIdleTimeout}.
* *
* @see javax.servlet.ServletRequest#setAttribute(java.lang.String, java.lang.Object) * @see javax.servlet.ServletRequest#setAttribute(java.lang.String, java.lang.Object)
*/ */
@ -1616,34 +1614,8 @@ public class Request implements HttpServletRequest
{ {
Object old_value = _attributes == null?null:_attributes.getAttribute(name); Object old_value = _attributes == null?null:_attributes.getAttribute(name);
if (name.startsWith("org.eclipse.jetty."))
{
if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name)) if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name))
setQueryEncoding(value == null?null:value.toString()); setQueryEncoding(value == null?null:value.toString());
else if ("org.eclipse.jetty.server.sendContent".equals(name))
{
try
{
((HttpOutput)getServletResponse().getOutputStream()).sendContent(value);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
else if ("org.eclipse.jetty.server.ResponseBuffer".equals(name))
{
try
{
throw new IOException("not implemented");
//((HttpChannel.Output)getServletResponse().getOutputStream()).sendResponse(byteBuffer);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
if (_attributes == null) if (_attributes == null)
_attributes = new AttributesMap(); _attributes = new AttributesMap();

View File

@ -43,8 +43,11 @@ public class Utf8HttpWriter extends HttpWriter
public void write (char[] s,int offset, int length) throws IOException public void write (char[] s,int offset, int length) throws IOException
{ {
HttpOutput out = _out; HttpOutput out = _out;
if (length==0) if (length==0 && out.isAllContentWritten())
out.closeIfAllContentWritten(); {
out.close();
return;
}
while (length > 0) while (length > 0)
{ {