diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index f1b6bcb0983..2ed5c972e24 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -1,7 +1,13 @@ package org.eclipse.jetty.io; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -9,48 +15,160 @@ import org.eclipse.jetty.util.log.Logger; public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint { public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class); + private static final Timer _timer = new Timer(); + private boolean _checkForIdle; + private AsyncConnection _connection; + + private final ReadInterest _readInterest = new ReadInterest() + { + @Override + protected boolean readIsPossible() throws IOException + { + if (_closed) + throw new ClosedChannelException(); + return _in==null || BufferUtil.hasContent(_in); + } + }; + + private final WriteFlusher _writeFlusher = new WriteFlusher(this) + { + @Override + protected void scheduleCompleteWrite() + { + } + }; + + public AsyncByteArrayEndPoint() + { + super(); + } + + public AsyncByteArrayEndPoint(byte[] input, int outputSize) + { + super(input,outputSize); + } + + public AsyncByteArrayEndPoint(String input, int outputSize) + { + super(input,outputSize); + } + + @Override + public void setInput(ByteBuffer in) + { + super.setInput(in); + if (in==null || BufferUtil.hasContent(in)) + _readInterest.readable(); + } + + @Override + public ByteBuffer takeOutput() + { + ByteBuffer b = super.takeOutput(); + _writeFlusher.completeWrite(); + return b; + } + + @Override + public void setOutput(ByteBuffer out) + { + super.setOutput(out); + _writeFlusher.completeWrite(); + } + @Override + public void reset() + { + _readInterest.close(); + _writeFlusher.close(); + super.reset(); + } @Override public void readable(C context, Callback callback) throws IllegalStateException { - // TODO Auto-generated method stub - + _readInterest.readable(context,callback); } @Override public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException { - // TODO Auto-generated method stub - + _writeFlusher.write(context,callback,buffers); } @Override public void setCheckForIdle(boolean check) { - // TODO Auto-generated method stub + _checkForIdle=check; + + if (check) + { + final TimerTask task=new TimerTask() + { + @Override + public void run() + { + checkForIdleOrReadWriteTimeout(System.currentTimeMillis()); + if (_checkForIdle) + _timer.schedule(this,1000); + } + }; + + _timer.schedule(task,1000); + } } @Override public boolean isCheckForIdle() { - // TODO Auto-generated method stub - return false; + return _checkForIdle; } @Override public AsyncConnection getAsyncConnection() { - // TODO Auto-generated method stub - return null; + return _connection; } @Override public void setAsyncConnection(AsyncConnection connection) { - // TODO Auto-generated method stub - + _connection=connection; + } + public void checkForIdleOrReadWriteTimeout(long now) + { + synchronized (this) + { + if (_checkForIdle || _readInterest.isInterested() || _writeFlusher.isWriting()) + { + long idleTimestamp = getIdleTimestamp(); + long max_idle_time = getMaxIdleTime(); + + if (idleTimestamp != 0 && max_idle_time > 0) + { + long idleForMs = now - idleTimestamp; + + if (idleForMs > max_idle_time) + { + notIdle(); + + if (_checkForIdle) + _connection.onIdleExpired(idleForMs); + + TimeoutException timeout = new TimeoutException(); + _readInterest.failed(timeout); + _writeFlusher.failed(timeout); + } + } + } + } } - + @Override + public void onClose() + { + setCheckForIdle(false); + super.onClose(); + } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index 2627459468d..1b15cbdd448 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -2,6 +2,8 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; import java.util.concurrent.Future; import org.eclipse.jetty.util.Callback; @@ -23,9 +25,9 @@ public interface AsyncEndPoint extends EndPoint * This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF. * @param context Context to return via the callback * @param callback The callback to call when an error occurs or we are readable. - * @throws IllegalStateException if another read operation is concurrent. + * @throws ReadPendingException if another read operation is concurrent. */ - void readable(C context, Callback callback) throws IllegalStateException; + void readable(C context, Callback callback) throws ReadPendingException; /* ------------------------------------------------------------ */ /** Asynchronous write operation. @@ -35,9 +37,9 @@ public interface AsyncEndPoint extends EndPoint * @param context Context to return via the callback * @param callback The callback to call when an error occurs or we are readable. * @param buffers One or more {@link ByteBuffer}s that will be flushed. - * @throws IllegalStateException if another write operation is concurrent. + * @throws WritePendingException if another write operation is concurrent. */ - void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException; + void write(C context, Callback callback, ByteBuffer... buffers) throws WritePendingException; /* ------------------------------------------------------------ */ /** Set if the endpoint should be checked for idleness diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 8fd22b997c4..f3a1ab06c64 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -144,9 +144,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint */ public String takeOutputString() { - String s=BufferUtil.toString(_out,StringUtil.__UTF8_CHARSET); - BufferUtil.clear(_out); - return s; + ByteBuffer buffer=takeOutput(); + return BufferUtil.toString(buffer,StringUtil.__UTF8_CHARSET); } /* ------------------------------------------------------------ */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java index cf113d45e4a..10f9ecfbd09 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java @@ -1,5 +1,6 @@ package org.eclipse.jetty.io; +import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadPendingException; import java.util.concurrent.TimeoutException; @@ -7,7 +8,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.Callback; -public class ReadInterest + +/* ------------------------------------------------------------ */ +/** + * A Utility class to help implement {@link AsyncEndPoint#readable(Object, Callback)} + * by keeping and calling the context and callback objects. + */ +public abstract class ReadInterest { private final AtomicBoolean _interested = new AtomicBoolean(false); private volatile Callback _readCallback; @@ -25,13 +32,20 @@ public class ReadInterest throw new ReadPendingException(); _readContext=context; _readCallback=callback; - if (makeInterested()) - completed(); + try + { + if (readIsPossible()) + readable(); + } + catch(IOException e) + { + failed(e); + } } /* ------------------------------------------------------------ */ - public void completed() + public void readable() { if (_interested.compareAndSet(true,false)) { @@ -76,9 +90,6 @@ public class ReadInterest } /* ------------------------------------------------------------ */ - protected boolean makeInterested() - { - throw new IllegalStateException("Unimplemented"); - } + abstract protected boolean readIsPossible() throws IOException; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index f649a812180..3365a5dd207 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -37,8 +37,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); - private final Object _lock = this; - private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; @@ -59,7 +57,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa private final ReadInterest _readInterest = new ReadInterest() { @Override - protected boolean makeInterested() + protected boolean readIsPossible() { _interestOps=_interestOps | SelectionKey.OP_READ; updateKey(); @@ -133,7 +131,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa boolean can_read; boolean can_write; - synchronized (_lock) + synchronized (this) { _selected = true; try @@ -153,7 +151,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa } } if (can_read) - _readInterest.completed(); + _readInterest.readable(); if (can_write) _writeFlusher.completeWrite(); } @@ -188,7 +186,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa @Override public void checkForIdleOrReadWriteTimeout(long now) { - synchronized (_lock) + synchronized (this) { if (_idlecheck || _readInterest.isInterested() || _writeFlusher.isWriting()) { @@ -208,7 +206,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa TimeoutException timeout = new TimeoutException(); _readInterest.failed(timeout); - _writeFlusher.failWrite(timeout); + _writeFlusher.failed(timeout); } } } @@ -256,7 +254,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa */ private void updateKey() { - synchronized (_lock) + synchronized (this) { if (!_selected) { @@ -290,7 +288,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa @Override public void doUpdateKey() { - synchronized (_lock) + synchronized (this) { _changing = false; if (getChannel().isOpen()) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index a25f057095d..b0a9032d976 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -7,10 +7,22 @@ import java.nio.channels.WritePendingException; import java.util.ConcurrentModificationException; import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -public class WriteFlusher + +/* ------------------------------------------------------------ */ +/** + * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} + * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. + * The abstract method {@link #scheduleCompleteWrite()} is called when not all content has been + * written after a call to flush and should organise for the {@link #completeWrite()} + * method to be called when a subsequent call to flush should be able to make more progress. + * + */ +abstract public class WriteFlusher { + private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0]; private final AtomicBoolean _writing = new AtomicBoolean(false); private final EndPoint _endp; @@ -41,7 +53,7 @@ public class WriteFlusher _writeContext=context; _writeCallback=callback; scheduleCompleteWrite(); - _writing.set(true); + _writing.set(true); // Needed as memory barrier return; } } @@ -59,20 +71,47 @@ public class WriteFlusher } /* ------------------------------------------------------------ */ - protected void scheduleCompleteWrite() - { - // _interestOps = _interestOps | SelectionKey.OP_WRITE; - // updateKey(); - } + abstract protected void scheduleCompleteWrite(); + /* ------------------------------------------------------------ */ - public void completeWrite() + /* Remove empty buffers from the start of a multi buffer array + */ + private ByteBuffer[] compact(ByteBuffer[] buffers) + { + if (buffers.length<2) + return buffers; + int b=0; + while (b