From 10dd14d34eb2117bb2fb0f91f3c55658dee44224 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 14 Jan 2014 22:10:28 +0100 Subject: [PATCH] Reorganization of the code in preview of a larger refactoring. --- .../org/eclipse/jetty/server/HttpInput.java | 136 ++++++++---------- .../eclipse/jetty/server/QueuedHttpInput.java | 63 ++++---- 2 files changed, 87 insertions(+), 112 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 04c151e5f6e..7506e6010f9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.server; import java.io.IOException; - import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; @@ -30,37 +29,19 @@ import org.eclipse.jetty.util.log.Logger; /** *

{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.

- *

{@link HttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.

- *

{@link HttpInput} stores the items directly; if the items contain byte buffers, it does not copy them - * but simply holds references to the item, thus the caller must organize for those buffers to valid while - * held by this class.

- *

To assist the caller, subclasses may override methods {@link #onContentQueued(T)}, - * {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the - * caller will know when buffers are queued and consumed.

- */ -/** - * @author gregw - * - * @param - */ -/** - * @author gregw - * - * @param */ public abstract class HttpInput extends ServletInputStream implements Runnable { private final static Logger LOG = Log.getLogger(HttpInput.class); private final byte[] _oneByteBuffer = new byte[1]; - private HttpChannelState _channelState; - private Throwable _onError; - private ReadListener _listener; - private boolean _notReady; - - protected State _state = BLOCKING; - private State _eof=null; private final Object _lock; + private HttpChannelState _channelState; + private ReadListener _listener; + private Throwable _onError; + private boolean _notReady; + private State _state = BLOCKING; + private State _eof; private long _contentRead; protected HttpInput() @@ -70,7 +51,15 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl protected HttpInput(Object lock) { - _lock=lock==null?this:lock; + _lock = lock == null ? this : lock; + } + + public void init(HttpChannelState state) + { + synchronized (lock()) + { + _channelState = state; + } } public final Object lock() @@ -89,43 +78,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl } } - /** - * Access the next content to be consumed from. Returning the next item does not consume it - * and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)} - * or {@link #consume(Object, int)} are required to consume data from the content. - * @return Content or null if none available. - * @throws IOException - */ - protected abstract T nextContent() throws IOException; - - /** - * A convenience method to call nextContent and to check the return value, which if null then the - * a check is made for EOF and the state changed accordingly. - * @see #nextContent() - * @return Content or null if none available. - * @throws IOException - */ - protected T getNextContent() throws IOException - { - T content=nextContent(); - - if (content==null && _eof!=null) - { - LOG.debug("{} eof {}",this,_eof); - _state=_eof; - _eof=null; - } - - return content; - } - - @Override - public int read() throws IOException - { - int read = read(_oneByteBuffer, 0, 1); - return read < 0 ? -1 : 0xff & _oneByteBuffer[0]; - } - @Override public int available() { @@ -143,6 +95,13 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl } } + @Override + public int read() throws IOException + { + int read = read(_oneByteBuffer, 0, 1); + return read < 0 ? -1 : 0xff & _oneByteBuffer[0]; + } + @Override public int read(byte[] b, int off, int len) throws IOException { @@ -171,6 +130,36 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl return l; } + /** + * A convenience method to call nextContent and to check the return value, which if null then the + * a check is made for EOF and the state changed accordingly. + * @see #nextContent() + * @return Content or null if none available. + * @throws IOException + */ + protected T getNextContent() throws IOException + { + T content=nextContent(); + + if (content==null && _eof!=null) + { + LOG.debug("{} eof {}",this,_eof); + _state=_eof; + _eof=null; + } + + return content; + } + + /** + * Access the next content to be consumed from. Returning the next item does not consume it + * and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)} + * or {@link #consume(Object, int)} are required to consume data from the content. + * @return Content or null if none available. + * @throws IOException + */ + protected abstract T nextContent() throws IOException; + protected abstract int remaining(T item); protected abstract int get(T item, byte[] buffer, int offset, int length); @@ -178,10 +167,15 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl protected abstract void consume(T item, int length); protected abstract void blockForContent() throws IOException; + + /** Add some content to the input stream + * @param item + */ + public abstract void content(T item); protected boolean onAsyncRead() { - if (_listener==null) + if (_listener == null) return false; _channelState.onReadPossible(); return true; @@ -194,11 +188,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl return _contentRead; } } - - /** Add some content to the input stream - * @param item - */ - public abstract void content(T item); /** This method should be called to signal to the HttpInput @@ -442,13 +431,4 @@ public abstract class HttpInput extends ServletInputStream implements Runnabl return "EOF"; } }; - - public void init(HttpChannelState state) - { - synchronized (lock()) - { - _channelState=state; - } - } - } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java index c5320ef95f5..cadae5fe189 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.io.InterruptedIOException; - import javax.servlet.ServletInputStream; import org.eclipse.jetty.util.ArrayQueue; @@ -44,8 +43,34 @@ public abstract class QueuedHttpInput extends HttpInput private final ArrayQueue _inputQ = new ArrayQueue<>(lock()); public QueuedHttpInput() - {} + { + } + /** Add some content to the input stream + * @param item + */ + public void content(T item) + { + // The buffer is not copied here. This relies on the caller not recycling the buffer + // until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are + // the signals to the caller that the buffers can be recycled. + + synchronized (lock()) + { + boolean empty=_inputQ.isEmpty(); + + _inputQ.add(item); + + if (empty) + { + if (!onAsyncRead()) + lock().notify(); + } + + LOG.debug("{} queued {}", this, item); + } + } + public void recycle() { synchronized (lock()) @@ -84,13 +109,11 @@ public abstract class QueuedHttpInput extends HttpInput return item; } - protected abstract void onContentConsumed(T item); - protected void blockForContent() throws IOException { synchronized (lock()) { - while (_inputQ.isEmpty() && !_state.isEOF()) + while (_inputQ.isEmpty() && !isFinished()) { try { @@ -105,41 +128,14 @@ public abstract class QueuedHttpInput extends HttpInput } } + protected abstract void onContentConsumed(T item); - /* ------------------------------------------------------------ */ /** Called by this HttpInput to signal all available content has been consumed */ protected void onAllContentConsumed() { } - /* ------------------------------------------------------------ */ - /** Add some content to the input stream - * @param item - */ - public void content(T item) - { - // The buffer is not copied here. This relies on the caller not recycling the buffer - // until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are - // the signals to the caller that the buffers can be recycled. - - synchronized (lock()) - { - boolean empty=_inputQ.isEmpty(); - - _inputQ.add(item); - - if (empty) - { - if (!onAsyncRead()) - lock().notify(); - } - - LOG.debug("{} queued {}", this, item); - } - } - - public void earlyEOF() { synchronized (lock()) @@ -157,5 +153,4 @@ public abstract class QueuedHttpInput extends HttpInput lock().notify(); } } - }