From 6f8ed5e809c9be94a4e5773373e4ade9a2f04dcf Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 23 May 2013 17:34:28 +1000 Subject: [PATCH] 398467 Servlet 3.1 Non Blocking IO Refactored HttpChannel to be ready to control onWritePossible callbacks --- .../jetty/server/EncodingHttpWriter.java | 10 +- .../org/eclipse/jetty/server/HttpChannel.java | 74 +- .../jetty/server/HttpChannelState.java | 51 +- .../org/eclipse/jetty/server/HttpOutput.java | 641 +++++++++++++----- .../jetty/server/Iso88591HttpWriter.java | 7 +- .../org/eclipse/jetty/server/Request.java | 34 +- .../eclipse/jetty/server/Utf8HttpWriter.java | 7 +- 7 files changed, 566 insertions(+), 258 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java index 6abd28b06e3..db42f8c0c31 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java @@ -48,8 +48,12 @@ public class EncodingHttpWriter extends HttpWriter @Override public void write (char[] s,int offset, int length) throws IOException { - if (length==0) - _out.closeIfAllContentWritten(); + HttpOutput out = _out; + if (length==0 && out.isAllContentWritten()) + { + out.close(); + return; + } while (length > 0) { @@ -58,7 +62,7 @@ public class EncodingHttpWriter extends HttpWriter _converter.write(s, offset, chars); _converter.flush(); - _bytes.writeTo(_out); + _bytes.writeTo(out); length-=chars; offset+=chars; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 6b97b11b707..f4f38ec0995 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -21,12 +21,12 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.DispatcherType; import javax.servlet.RequestDispatcher; +import javax.servlet.WriteListener; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; @@ -44,7 +44,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.server.HttpChannelState.Next; +import org.eclipse.jetty.server.HttpChannelState.Action; import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.Callback; @@ -251,36 +251,50 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable // The loop is controlled by the call to async.unhandle in the // finally block below. Unhandle will return false only if an async dispatch has // already happened when unhandle is called. - HttpChannelState.Next next = _state.handling(); - while (next==Next.CONTINUE && getServer().isRunning()) + HttpChannelState.Action action = _state.handling(); + loop: while (action.ordinal() implements HttpParser.RequestHandler, Runnable } finally { - next = _state.unhandle(); + action = _state.unhandle(); } } @@ -310,7 +324,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable Thread.currentThread().setName(threadName); setCurrentHttpChannel(null); - if (next==Next.COMPLETE) + if (action==Action.COMPLETE) { try { @@ -332,11 +346,11 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable } finally { - next=Next.RECYCLE; + action=Action.RECYCLE; } } - if (next==Next.RECYCLE) + if (action==Action.RECYCLE) { _request.setHandled(true); _transport.completed(); @@ -344,7 +358,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable LOG.debug("{} handle exit", this); - return next!=Next.WAIT; + return action!=Action.WAIT; } /** @@ -591,7 +605,7 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable try { - if (_state.handling()==Next.CONTINUE) + if (_state.handling()==Action.REQUEST_DISPATCH) sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true); } catch (IOException e) @@ -600,8 +614,10 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable } finally { - if (_state.unhandle()==Next.COMPLETE) + if (_state.unhandle()==Action.COMPLETE) _state.completed(); + else + throw new IllegalStateException(); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 0d542d542f4..350472ae9d0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -27,6 +27,7 @@ import javax.servlet.AsyncListener; import javax.servlet.RequestDispatcher; import javax.servlet.ServletContext; import javax.servlet.ServletResponse; +import javax.servlet.WriteListener; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; @@ -72,12 +73,15 @@ public class HttpChannelState COMPLETED // Request is complete } - public enum Next + public enum Action { - CONTINUE, // Continue handling the channel - WAIT, // Wait for further events - COMPLETE, // Complete the channel - RECYCLE, // Channel is completed + REQUEST_DISPATCH, // handle a normal request dispatch + ASYNC_DISPATCH, // handle an async request dispatch + ASYNC_EXPIRED, // handle an async timeout + IO_CALLBACK, // handle an IO callback + WAIT, // Wait for further events + COMPLETE, // Complete the channel + RECYCLE, // Channel is completed } private final HttpChannel _channel; @@ -164,7 +168,7 @@ public class HttpChannelState /** * @return Next handling of the request should proceed */ - protected Next handling() + protected Action handling() { synchronized (this) { @@ -182,32 +186,30 @@ public class HttpChannelState _asyncListeners=_lastAsyncListeners; _lastAsyncListeners=null; } - break; + _responseWrapped=false; + return Action.REQUEST_DISPATCH; case COMPLETECALLED: _state=State.COMPLETING; - return Next.COMPLETE; + return Action.COMPLETE; case COMPLETING: - return Next.COMPLETE; + return Action.COMPLETE; case ASYNCWAIT: - return Next.WAIT; + return Action.WAIT; case COMPLETED: - return Next.RECYCLE; + return Action.RECYCLE; case REDISPATCH: _state=State.REDISPATCHED; - break; + _responseWrapped=false; + return _expired?Action.ASYNC_EXPIRED:Action.ASYNC_DISPATCH; default: throw new IllegalStateException(this.getStatusString()); } - - _responseWrapped=false; - return Next.CONTINUE; - } } @@ -270,7 +272,7 @@ public class HttpChannelState * @return next actions * be handled again (eg because of a resume that happened before unhandle was called) */ - protected Next unhandle() + protected Action unhandle() { synchronized (this) { @@ -279,7 +281,7 @@ public class HttpChannelState case REDISPATCHED: case DISPATCHED: _state=State.COMPLETING; - return Next.COMPLETE; + return Action.COMPLETE; case IDLE: throw new IllegalStateException(this.getStatusString()); @@ -288,17 +290,17 @@ public class HttpChannelState _initial=false; _state=State.ASYNCWAIT; scheduleTimeout(); - return Next.WAIT; + return Action.WAIT; case REDISPATCHING: _initial=false; _state=State.REDISPATCHED; - return Next.CONTINUE; + return _expired?Action.ASYNC_EXPIRED:Action.ASYNC_DISPATCH; case COMPLETECALLED: _initial=false; _state=State.COMPLETING; - return Next.COMPLETE; + return Action.COMPLETE; default: throw new IllegalStateException(this.getStatusString()); @@ -650,6 +652,13 @@ public class HttpChannelState _channel.getRequest().setAttribute(name,attribute); } + + public void asyncIO() + { + // TODO Auto-generated method stub + + } + public class AsyncTimeout implements Runnable { @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 461d7cd56aa..73d3e4cd1d1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -23,7 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; -import java.util.concurrent.TimeoutException; +import java.nio.channels.WritePendingException; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; @@ -32,14 +33,13 @@ import javax.servlet.ServletResponse; import javax.servlet.WriteListener; import org.eclipse.jetty.http.HttpContent; -import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.resource.Resource; /** *

{@link HttpOutput} implements {@link ServletOutputStream} @@ -55,11 +55,25 @@ public class HttpOutput extends ServletOutputStream { private static Logger LOG = Log.getLogger(HttpOutput.class); private final HttpChannel _channel; - private boolean _closed; private long _written; private ByteBuffer _aggregate; private int _bufferSize; private WriteListener _writeListener; + private volatile Throwable _onError; + + /* +ACTION OPEN ASYNC READY PENDING UNREADY +------------------------------------------------------------------------------- +setWriteListener() READY->owp ise ise ise ise +write() OPEN ise PENDING wpe wpe +flush() OPEN ise PENDING wpe wpe +isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false +write completed - - - ASYNC READY->owp + + */ + enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED } + private final AtomicReference _state=new AtomicReference<>(State.OPEN); + public HttpOutput(HttpChannel channel) @@ -86,37 +100,63 @@ public class HttpOutput extends ServletOutputStream public void reopen() { - _closed = false; + _state.set(State.OPEN); } - /** Called by the HttpChannel if the output was closed - * externally (eg by a 500 exception handling). - */ - void closed() + public boolean isAllContentWritten() { - _closed = true; - releaseBuffer(); + return _channel.getResponse().isAllContentWritten(_written); } - + @Override public void close() { - if (!isClosed()) + State state=_state.get(); + while(state!=State.CLOSED) { - try + if (_state.compareAndSet(state,State.CLOSED)) { - if (BufferUtil.hasContent(_aggregate)) - _channel.write(_aggregate, !_channel.getResponse().isIncluding()); - else - _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); - } - catch(IOException e) - { - _channel.getEndPoint().shutdownOutput(); - LOG.ignore(e); + try + { + if (BufferUtil.hasContent(_aggregate)) + _channel.write(_aggregate, !_channel.getResponse().isIncluding()); + else + _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); + } + catch(IOException e) + { + _channel.getEndPoint().shutdownOutput(); + LOG.ignore(e); + } + releaseBuffer(); + return; } + state=_state.get(); + } + } + + /* Called to indicated that the output is already closed and the state needs to be updated to match */ + void closed() + { + State state=_state.get(); + while(state!=State.CLOSED) + { + if (_state.compareAndSet(state,State.CLOSED)) + { + try + { + _channel.getResponse().closeOutput(); + } + catch(IOException e) + { + _channel.getEndPoint().shutdownOutput(); + LOG.ignore(e); + } + releaseBuffer(); + return; + } + state=_state.get(); } - closed(); } private void releaseBuffer() @@ -130,108 +170,213 @@ public class HttpOutput extends ServletOutputStream public boolean isClosed() { - return _closed; + return _state.get()==State.CLOSED; } @Override public void flush() throws IOException { - if (isClosed()) - return; - - if (BufferUtil.hasContent(_aggregate)) - _channel.write(_aggregate, false); - else - _channel.write(BufferUtil.EMPTY_BUFFER, false); - } - - public boolean closeIfAllContentWritten() throws IOException - { - Response response=_channel.getResponse(); - if (response.isAllContentWritten(_written)) + while(true) { - response.closeOutput(); - return true; + switch(_state.get()) + { + case OPEN: + if (BufferUtil.hasContent(_aggregate)) + _channel.write(_aggregate, false); + else + _channel.write(BufferUtil.EMPTY_BUFFER, false); + return; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; + new AsyncFlush().process(); + return; + + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + return; + } + break; } - return false; } + @Override public void write(byte[] b, int off, int len) throws IOException { - if (isClosed()) - throw new EOFException("Closed"); + _written+=len; + boolean complete=_channel.getResponse().isAllContentWritten(_written); - _written+=len; - boolean complete=_channel.getResponse().isAllContentWritten(_written); - int capacity = getBufferSize(); + // Async or Blocking ? + while(true) + { + switch(_state.get()) + { + case OPEN: + // process blocking below + break; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); - // Should we aggregate? - if (!complete && len<=capacity/4) - { - if (_aggregate == null) - _aggregate = _channel.getByteBufferPool().acquire(capacity, false); + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; - // YES - fill the aggregate with content from the buffer - int filled = BufferUtil.fill(_aggregate, b, off, len); + // Should we aggregate? + int capacity = getBufferSize(); + if (!complete && len<=capacity/4) + { + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(capacity, false); - // return if we are not complete, not full and filled all the content - if (!complete && filled==len && !BufferUtil.isFull(_aggregate)) - return; + // YES - fill the aggregate with content from the buffer + int filled = BufferUtil.fill(_aggregate, b, off, len); - // adjust offset/length - off+=filled; - len-=filled; - } + // return if we are not complete, not full and filled all the content + if (!complete && filled==len && !BufferUtil.isFull(_aggregate)) + { + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + throw new IllegalStateException(); + return; + } - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - { - _channel.write(_aggregate, complete && len==0); + // adjust offset/length + off+=filled; + len-=filled; + } - // should we fill aggregate again from the buffer? - if (len>0 && !complete && len<=_aggregate.capacity()/4) - { - BufferUtil.append(_aggregate, b, off, len); - return; - } - } + // Do the asynchronous writing from the callback + new AsyncWrite(b,off,len,complete).process(); + return; - // write any remaining content in the buffer directly - if (len>0) - _channel.write(ByteBuffer.wrap(b, off, len), complete); + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + throw new EofException("Closed"); + } + break; + } + + + // handle blocking write + + // Should we aggregate? + int capacity = getBufferSize(); + if (!complete && len<=capacity/4) + { + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(capacity, false); + + // YES - fill the aggregate with content from the buffer + int filled = BufferUtil.fill(_aggregate, b, off, len); + + // return if we are not complete, not full and filled all the content + if (!complete && filled==len && !BufferUtil.isFull(_aggregate)) + return; + + // adjust offset/length + off+=filled; + len-=filled; + } + + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + { + _channel.write(_aggregate, complete && len==0); + + // should we fill aggregate again from the buffer? + if (len>0 && !complete && len<=_aggregate.capacity()/4) + { + BufferUtil.append(_aggregate, b, off, len); + return; + } + } + + // write any remaining content in the buffer directly + if (len>0) + _channel.write(ByteBuffer.wrap(b, off, len), complete); + + if (complete) + { + closed(); + } - if (complete) - { - closed(); - _channel.getResponse().closeOutput(); - } } - + @Override public void write(int b) throws IOException { - if (isClosed()) - throw new EOFException("Closed"); + _written+=1; + boolean complete=_channel.getResponse().isAllContentWritten(_written); - // Allocate an aggregate buffer. - // Never direct as it is slow to do little writes to a direct buffer. - if (_aggregate == null) - _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); - - BufferUtil.append(_aggregate, (byte)b); - _written++; - - // Check if all written or full - if (!closeIfAllContentWritten() && BufferUtil.isFull(_aggregate)) + // Async or Blocking ? + while(true) { - BlockingCallback callback = _channel.getWriteBlockingCallback(); - _channel.write(_aggregate, false, callback); - callback.block(); + switch(_state.get()) + { + case OPEN: + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); + BufferUtil.append(_aggregate, (byte)b); + + // Check if all written or full + if (complete || BufferUtil.isFull(_aggregate)) + { + BlockingCallback callback = _channel.getWriteBlockingCallback(); + _channel.write(_aggregate, complete, callback); + callback.block(); + if (complete) + closed(); + } + break; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; + + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); + BufferUtil.append(_aggregate, (byte)b); + + // Check if all written or full + if (!complete && !BufferUtil.isFull(_aggregate)) + { + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + throw new IllegalStateException(); + return; + } + + // Do the asynchronous writing from the callback + new AsyncFlush().process(); + return; + + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + throw new EofException("Closed"); + } + break; } } + + @Override public void print(String s) throws IOException { @@ -241,51 +386,6 @@ public class HttpOutput extends ServletOutputStream write(s.getBytes(_channel.getResponse().getCharacterEncoding())); } - /* ------------------------------------------------------------ */ - /** Set headers and send content. - * @deprecated Use {@link Response#setHeaders(HttpContent)} and {@link #sendContent(HttpContent)} instead. - * @param content - * @throws IOException - */ - @Deprecated - public void sendContent(Object content) throws IOException - { - final BlockingCallback callback =_channel.getWriteBlockingCallback(); - - if (content instanceof HttpContent) - { - _channel.getResponse().setHeaders((HttpContent)content); - sendContent((HttpContent)content,callback); - } - else if (content instanceof Resource) - { - Resource resource = (Resource)content; - _channel.getResponse().getHttpFields().putDateField(HttpHeader.LAST_MODIFIED, resource.lastModified()); - - ReadableByteChannel in=((Resource)content).getReadableByteChannel(); - if (in!=null) - sendContent(in,callback); - else - sendContent(resource.getInputStream(),callback); - } - else if (content instanceof ByteBuffer) - { - sendContent((ByteBuffer)content,callback); - } - else if (content instanceof ReadableByteChannel) - { - sendContent((ReadableByteChannel)content,callback); - } - else if (content instanceof InputStream) - { - sendContent((InputStream)content,callback); - } - else - callback.failed(new IllegalArgumentException("unknown content type "+content.getClass())); - - callback.block(); - } - /* ------------------------------------------------------------ */ /** Blocking send of content. * @param content The content to send @@ -321,7 +421,7 @@ public class HttpOutput extends ServletOutputStream new ReadableByteChannelWritingCB(in,callback).iterate(); callback.block(); } - + /* ------------------------------------------------------------ */ /** Blocking send of content. @@ -334,7 +434,7 @@ public class HttpOutput extends ServletOutputStream sendContent(content,callback); callback.block(); } - + /* ------------------------------------------------------------ */ /** Asynchronous send of content. @@ -373,32 +473,44 @@ public class HttpOutput extends ServletOutputStream */ public void sendContent(HttpContent httpContent, Callback callback) throws IOException { - if (isClosed()) - throw new IOException("Closed"); if (BufferUtil.hasContent(_aggregate)) throw new IOException("written"); if (_channel.isCommitted()) throw new IOException("committed"); - - _closed=true; + + while (true) + { + switch(_state.get()) + { + case OPEN: + if (!_state.compareAndSet(State.OPEN, State.PENDING)) + continue; + break; + case CLOSED: + throw new EofException("Closed"); + default: + throw new IllegalStateException(); + } + break; + } ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null; if (buffer == null) buffer = httpContent.getIndirectBuffer(); - + if (buffer!=null) { sendContent(buffer,callback); return; } - + ReadableByteChannel rbc=httpContent.getReadableByteChannel(); if (rbc!=null) { sendContent(rbc,callback); return; } - + InputStream in = httpContent.getInputStream(); if ( in!=null ) { @@ -425,23 +537,16 @@ public class HttpOutput extends ServletOutputStream BufferUtil.clear(_aggregate); } - - @Override public void setWriteListener(WriteListener writeListener) { - _writeListener = writeListener; - - // TODO 3.1 implement behaviour - /* - Registering a WriteListener will start non-blocking IO. It is illegal to switch to - the traditional blocking IO at that point. - */ - - /* WriteListener.onWritePossible() will be called IFF (if and only if) canWrite has been - called AND has returned false AND a writeListener has previously been - set. - */ + if (_state.compareAndSet(State.OPEN, State.READY)) + { + _writeListener = writeListener; + _channel.getState().asyncIO(); + } + else + throw new IllegalStateException(); } /** @@ -450,10 +555,205 @@ public class HttpOutput extends ServletOutputStream @Override public boolean isReady() { - // TODO 3.1 Auto-generated method stub - return false; + while (true) + { + switch(_state.get()) + { + case OPEN: + return true; + case ASYNC: + if (!_state.compareAndSet(State.ASYNC, State.READY)) + continue; + return true; + case READY: + return true; + case PENDING: + if (!_state.compareAndSet(State.PENDING, State.UNREADY)) + continue; + return false; + case UNREADY: + return false; + case CLOSED: + return false; + } + } } - + + public void handle() + { + if (_state.get()==State.READY) + { + try + { + _writeListener.onWritePossible(); + return; + } + catch(Exception e) + { + _onError=e; + } + } + if(_onError!=null) + { + + Throwable th=_onError; + _onError=null; + _writeListener.onError(th); + close(); + } + } + + private class AsyncWrite implements Callback + { + private final byte[] _b; + private final int _off; + private final int _len; + private final boolean _complete; + private boolean _flushed; + + public AsyncWrite(byte[] b, int off, int len, boolean complete) + { + _b=b; + _off=off; + _len=len; + _complete=complete; + } + + public void process() + { + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + { + _channel.write(_aggregate, _complete && _len==0, this); + return; + } + + if (!_complete && _len0 && !_flushed) + { + _channel.write(ByteBuffer.wrap(_b, _off, _len), _complete,this); + return; + } + + try + { + if (_complete) + { + closed(); + _channel.getResponse().closeOutput(); + } + + while(true) + { + switch(_state.get()) + { + case PENDING: + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + continue; + return; + case UNREADY: + if (!_state.compareAndSet(State.UNREADY, State.READY)) + continue; + _channel.getState().asyncIO(); + return; + case CLOSED: + throw new EofException("Closed"); + + default: + throw new IllegalStateException(); + } + } + } + catch (Exception e) + { + _onError=e; + _channel.getState().asyncIO(); + close(); + } + } + @Override + public void succeeded() + { + process(); + } + + @Override + public void failed(Throwable e) + { + _onError=e; + _channel.getState().asyncIO(); + } + } + + private class AsyncFlush implements Callback + { + private boolean _flushed; + + public AsyncFlush() + { + } + + public void process() + { + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + { + _flushed=true; + _channel.write(_aggregate, false, this); + return; + } + + if (!_flushed) + _channel.write(BufferUtil.EMPTY_BUFFER,false,this); + + + try + { + while(true) + { + switch(_state.get()) + { + case PENDING: + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + continue; + return; + case UNREADY: + if (!_state.compareAndSet(State.UNREADY, State.READY)) + continue; + _channel.getState().asyncIO(); + return; + case CLOSED: + throw new EofException("Closed"); + + default: + throw new IllegalStateException(); + } + } + } + catch (Exception e) + { + _onError=e; + _channel.getState().asyncIO(); + } + } + + @Override + public void succeeded() + { + process(); + } + + @Override + public void failed(Throwable e) + { + _onError=e; + _channel.getState().asyncIO(); + } + + } + + /* ------------------------------------------------------------ */ /** An iterating callback that will take content from an * InputStream and write it to the associated {@link HttpChannel}. @@ -466,7 +766,7 @@ public class HttpOutput extends ServletOutputStream { final InputStream _in; final ByteBuffer _buffer; - + public InputStreamWritingCB(InputStream in, Callback callback) { super(callback); @@ -507,7 +807,7 @@ public class HttpOutput extends ServletOutputStream super.failed(x); _channel.getByteBufferPool().release(_buffer); } - + } /* ------------------------------------------------------------ */ @@ -523,7 +823,7 @@ public class HttpOutput extends ServletOutputStream { final ReadableByteChannel _in; final ByteBuffer _buffer; - + public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { super(callback); @@ -557,7 +857,6 @@ public class HttpOutput extends ServletOutputStream _buffer.flip(); _channel.write(_buffer,eof,this); return false; - } @Override @@ -567,4 +866,6 @@ public class HttpOutput extends ServletOutputStream _channel.getByteBufferPool().release(_buffer); } } + + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java index 27802cfb176..e13bbc3e0fd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java @@ -35,8 +35,11 @@ public class Iso88591HttpWriter extends HttpWriter public void write (char[] s,int offset, int length) throws IOException { HttpOutput out = _out; - if (length==0) - out.closeIfAllContentWritten(); + if (length==0 && out.isAllContentWritten()) + { + out.close(); + return; + } if (length==1) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index c19a8508cd6..20609c0eae2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -1605,9 +1605,7 @@ public class Request implements HttpServletRequest /* ------------------------------------------------------------ */ /* * Set a request attribute. if the attribute name is "org.eclipse.jetty.server.server.Request.queryEncoding" then the value is also passed in a call to - * {@link #setQueryEncoding}.

if the attribute name is "org.eclipse.jetty.server.server.ResponseBuffer", then the response buffer is flushed with @{link - * #flushResponseBuffer}

if the attribute name is "org.eclipse.jetty.io.EndPoint.maxIdleTime", then the value is passed to the associated {@link - * EndPoint#setIdleTimeout}. + * {@link #setQueryEncoding}. * * @see javax.servlet.ServletRequest#setAttribute(java.lang.String, java.lang.Object) */ @@ -1616,34 +1614,8 @@ public class Request implements HttpServletRequest { Object old_value = _attributes == null?null:_attributes.getAttribute(name); - if (name.startsWith("org.eclipse.jetty.")) - { - if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name)) - setQueryEncoding(value == null?null:value.toString()); - else if ("org.eclipse.jetty.server.sendContent".equals(name)) - { - try - { - ((HttpOutput)getServletResponse().getOutputStream()).sendContent(value); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - else if ("org.eclipse.jetty.server.ResponseBuffer".equals(name)) - { - try - { - throw new IOException("not implemented"); - //((HttpChannel.Output)getServletResponse().getOutputStream()).sendResponse(byteBuffer); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } + if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name)) + setQueryEncoding(value == null?null:value.toString()); if (_attributes == null) _attributes = new AttributesMap(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java index 059ad1947f2..fcee714fc89 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java @@ -43,8 +43,11 @@ public class Utf8HttpWriter extends HttpWriter public void write (char[] s,int offset, int length) throws IOException { HttpOutput out = _out; - if (length==0) - out.closeIfAllContentWritten(); + if (length==0 && out.isAllContentWritten()) + { + out.close(); + return; + } while (length > 0) {