Reorganization of the code in preview of a larger refactoring.

This commit is contained in:
Simone Bordet 2014-01-14 22:10:28 +01:00
parent 1286363dbf
commit 10dd14d34e
2 changed files with 87 additions and 112 deletions

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import javax.servlet.ReadListener; import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
@ -30,37 +29,19 @@ import org.eclipse.jetty.util.log.Logger;
/** /**
* <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p> * <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
* <p>{@link HttpInput} holds a queue of items passed to it by calls to {@link #content(T)}.</p>
* <p>{@link HttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
* but simply holds references to the item, thus the caller must organize for those buffers to valid while
* held by this class.</p>
* <p>To assist the caller, subclasses may override methods {@link #onContentQueued(T)},
* {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
* caller will know when buffers are queued and consumed.</p>
*/
/**
* @author gregw
*
* @param <T>
*/
/**
* @author gregw
*
* @param <T>
*/ */
public abstract class HttpInput<T> extends ServletInputStream implements Runnable public abstract class HttpInput<T> extends ServletInputStream implements Runnable
{ {
private final static Logger LOG = Log.getLogger(HttpInput.class); private final static Logger LOG = Log.getLogger(HttpInput.class);
private final byte[] _oneByteBuffer = new byte[1]; private final byte[] _oneByteBuffer = new byte[1];
private HttpChannelState _channelState;
private Throwable _onError;
private ReadListener _listener;
private boolean _notReady;
protected State _state = BLOCKING;
private State _eof=null;
private final Object _lock; private final Object _lock;
private HttpChannelState _channelState;
private ReadListener _listener;
private Throwable _onError;
private boolean _notReady;
private State _state = BLOCKING;
private State _eof;
private long _contentRead; private long _contentRead;
protected HttpInput() protected HttpInput()
@ -70,7 +51,15 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected HttpInput(Object lock) protected HttpInput(Object lock)
{ {
_lock=lock==null?this:lock; _lock = lock == null ? this : lock;
}
public void init(HttpChannelState state)
{
synchronized (lock())
{
_channelState = state;
}
} }
public final Object lock() public final Object lock()
@ -89,43 +78,6 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
} }
} }
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
* or {@link #consume(Object, int)} are required to consume data from the content.
* @return Content or null if none available.
* @throws IOException
*/
protected abstract T nextContent() throws IOException;
/**
* A convenience method to call nextContent and to check the return value, which if null then the
* a check is made for EOF and the state changed accordingly.
* @see #nextContent()
* @return Content or null if none available.
* @throws IOException
*/
protected T getNextContent() throws IOException
{
T content=nextContent();
if (content==null && _eof!=null)
{
LOG.debug("{} eof {}",this,_eof);
_state=_eof;
_eof=null;
}
return content;
}
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
}
@Override @Override
public int available() public int available()
{ {
@ -143,6 +95,13 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
} }
} }
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
}
@Override @Override
public int read(byte[] b, int off, int len) throws IOException public int read(byte[] b, int off, int len) throws IOException
{ {
@ -171,6 +130,36 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
return l; return l;
} }
/**
* A convenience method to call nextContent and to check the return value, which if null then the
* a check is made for EOF and the state changed accordingly.
* @see #nextContent()
* @return Content or null if none available.
* @throws IOException
*/
protected T getNextContent() throws IOException
{
T content=nextContent();
if (content==null && _eof!=null)
{
LOG.debug("{} eof {}",this,_eof);
_state=_eof;
_eof=null;
}
return content;
}
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
* or {@link #consume(Object, int)} are required to consume data from the content.
* @return Content or null if none available.
* @throws IOException
*/
protected abstract T nextContent() throws IOException;
protected abstract int remaining(T item); protected abstract int remaining(T item);
protected abstract int get(T item, byte[] buffer, int offset, int length); protected abstract int get(T item, byte[] buffer, int offset, int length);
@ -179,9 +168,14 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected abstract void blockForContent() throws IOException; protected abstract void blockForContent() throws IOException;
/** Add some content to the input stream
* @param item
*/
public abstract void content(T item);
protected boolean onAsyncRead() protected boolean onAsyncRead()
{ {
if (_listener==null) if (_listener == null)
return false; return false;
_channelState.onReadPossible(); _channelState.onReadPossible();
return true; return true;
@ -195,11 +189,6 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
} }
} }
/** Add some content to the input stream
* @param item
*/
public abstract void content(T item);
/** This method should be called to signal to the HttpInput /** This method should be called to signal to the HttpInput
* that an EOF has arrived before all the expected content. * that an EOF has arrived before all the expected content.
@ -442,13 +431,4 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
return "EOF"; return "EOF";
} }
}; };
public void init(HttpChannelState state)
{
synchronized (lock())
{
_channelState=state;
}
}
} }

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.ArrayQueue;
@ -44,7 +43,33 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock()); private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
public QueuedHttpInput() public QueuedHttpInput()
{} {
}
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
// the signals to the caller that the buffers can be recycled.
synchronized (lock())
{
boolean empty=_inputQ.isEmpty();
_inputQ.add(item);
if (empty)
{
if (!onAsyncRead())
lock().notify();
}
LOG.debug("{} queued {}", this, item);
}
}
public void recycle() public void recycle()
{ {
@ -84,13 +109,11 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
return item; return item;
} }
protected abstract void onContentConsumed(T item);
protected void blockForContent() throws IOException protected void blockForContent() throws IOException
{ {
synchronized (lock()) synchronized (lock())
{ {
while (_inputQ.isEmpty() && !_state.isEOF()) while (_inputQ.isEmpty() && !isFinished())
{ {
try try
{ {
@ -105,41 +128,14 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
} }
} }
protected abstract void onContentConsumed(T item);
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal all available content has been consumed /** Called by this HttpInput to signal all available content has been consumed
*/ */
protected void onAllContentConsumed() protected void onAllContentConsumed()
{ {
} }
/* ------------------------------------------------------------ */
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{
// The buffer is not copied here. This relies on the caller not recycling the buffer
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
// the signals to the caller that the buffers can be recycled.
synchronized (lock())
{
boolean empty=_inputQ.isEmpty();
_inputQ.add(item);
if (empty)
{
if (!onAsyncRead())
lock().notify();
}
LOG.debug("{} queued {}", this, item);
}
}
public void earlyEOF() public void earlyEOF()
{ {
synchronized (lock()) synchronized (lock())
@ -157,5 +153,4 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
lock().notify(); lock().notify();
} }
} }
} }