425703 - Review [Queued]HttpInput.
Fixed synchronization. Fixed notification of callbacks outside sync blocks. Added isEOF() method to allow correct implementation of blockForContent(). Remove unused callback onAllContentConsumed().
This commit is contained in:
parent
e3e402c751
commit
93013b36dd
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import javax.servlet.ReadListener;
|
||||
import javax.servlet.ServletInputStream;
|
||||
|
||||
|
@ -28,7 +29,12 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
|
||||
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
|
||||
* <p/>
|
||||
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
|
||||
* maintains two states: the content state that tells whether there is content to consume and the EOF
|
||||
* state that tells whether an EOF has arrived.
|
||||
* Only once the content has been consumed the content state is moved to the EOF state.
|
||||
*/
|
||||
public abstract class HttpInput<T> extends ServletInputStream implements Runnable
|
||||
{
|
||||
|
@ -40,8 +46,8 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
private ReadListener _listener;
|
||||
private Throwable _onError;
|
||||
private boolean _notReady;
|
||||
private State _state = BLOCKING;
|
||||
private State _eof;
|
||||
private State _contentState = STREAM;
|
||||
private State _eofState;
|
||||
private long _contentRead;
|
||||
|
||||
protected HttpInput()
|
||||
|
@ -71,10 +77,12 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
_state = BLOCKING;
|
||||
_eof=null;
|
||||
_onError=null;
|
||||
_contentRead=0;
|
||||
_listener = null;
|
||||
_onError = null;
|
||||
_notReady = false;
|
||||
_contentState = STREAM;
|
||||
_eofState = null;
|
||||
_contentRead = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,7 +94,7 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
synchronized (lock())
|
||||
{
|
||||
T item = getNextContent();
|
||||
return item==null?0:remaining(item);
|
||||
return item == null ? 0 : remaining(item);
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
|
@ -99,84 +107,111 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
public int read() throws IOException
|
||||
{
|
||||
int read = read(_oneByteBuffer, 0, 1);
|
||||
return read < 0 ? -1 : 0xff & _oneByteBuffer[0];
|
||||
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
T item = null;
|
||||
int l;
|
||||
synchronized (lock())
|
||||
{
|
||||
// System.err.printf("read s=%s q=%d e=%s%n",_state,_inputQ.size(),_eof);
|
||||
|
||||
// Get the current head of the input Q
|
||||
item = getNextContent();
|
||||
|
||||
// If we have no item
|
||||
T item = getNextContent();
|
||||
if (item == null)
|
||||
{
|
||||
_state.waitForContent(this);
|
||||
item=getNextContent();
|
||||
if (item==null)
|
||||
return _state.noContent();
|
||||
_contentState.waitForContent(this);
|
||||
item = getNextContent();
|
||||
if (item == null)
|
||||
return _contentState.noContent();
|
||||
}
|
||||
|
||||
l=get(item, b, off, len);
|
||||
_contentRead+=l;
|
||||
|
||||
int l = get(item, b, off, len);
|
||||
_contentRead += l;
|
||||
return l;
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A convenience method to call nextContent and to check the return value, which if null then the
|
||||
* a check is made for EOF and the state changed accordingly.
|
||||
* @see #nextContent()
|
||||
*
|
||||
* @return Content or null if none available.
|
||||
* @throws IOException
|
||||
* @see #nextContent()
|
||||
*/
|
||||
protected T getNextContent() throws IOException
|
||||
{
|
||||
T content=nextContent();
|
||||
|
||||
if (content==null && _eof!=null)
|
||||
T content = nextContent();
|
||||
if (content == null)
|
||||
{
|
||||
LOG.debug("{} eof {}",this,_eof);
|
||||
_state=_eof;
|
||||
_eof=null;
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_eofState != null)
|
||||
{
|
||||
LOG.debug("{} eof {}", this, _eofState);
|
||||
_contentState = _eofState;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return content;
|
||||
}
|
||||
|
||||
/**
|
||||
* Access the next content to be consumed from. Returning the next item does not consume it
|
||||
* and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
|
||||
* and it may be returned multiple times until it is consumed.
|
||||
* <p/>
|
||||
* Calls to {@link #get(Object, byte[], int, int)}
|
||||
* or {@link #consume(Object, int)} are required to consume data from the content.
|
||||
* @return Content or null if none available.
|
||||
* @throws IOException
|
||||
*
|
||||
* @return the content or null if none available.
|
||||
* @throws IOException if retrieving the content fails
|
||||
*/
|
||||
protected abstract T nextContent() throws IOException;
|
||||
|
||||
/**
|
||||
* @param item the content
|
||||
* @return how many bytes remain in the given content
|
||||
*/
|
||||
protected abstract int remaining(T item);
|
||||
|
||||
/**
|
||||
* Copies the given content into the given byte buffer.
|
||||
*
|
||||
* @param item the content to copy from
|
||||
* @param buffer the buffer to copy into
|
||||
* @param offset the buffer offset to start copying from
|
||||
* @param length the space available in the buffer
|
||||
* @return the number of bytes actually copied
|
||||
*/
|
||||
protected abstract int get(T item, byte[] buffer, int offset, int length);
|
||||
|
||||
/**
|
||||
* Consumes the given content.
|
||||
*
|
||||
* @param item the content to consume
|
||||
* @param length the number of bytes to consume
|
||||
*/
|
||||
protected abstract void consume(T item, int length);
|
||||
|
||||
/**
|
||||
* Blocks until some content or some end-of-file event arrives.
|
||||
*
|
||||
* @throws IOException if the wait is interrupted
|
||||
*/
|
||||
protected abstract void blockForContent() throws IOException;
|
||||
|
||||
/** Add some content to the input stream
|
||||
* @param item
|
||||
|
||||
/**
|
||||
* Adds some content to this input stream.
|
||||
*
|
||||
* @param item the content to add
|
||||
*/
|
||||
public abstract void content(T item);
|
||||
|
||||
protected boolean onAsyncRead()
|
||||
{
|
||||
if (_listener == null)
|
||||
return false;
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_listener == null)
|
||||
return false;
|
||||
}
|
||||
_channelState.onReadPossible();
|
||||
return true;
|
||||
}
|
||||
|
@ -189,9 +224,10 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/** This method should be called to signal to the HttpInput
|
||||
* that an EOF has arrived before all the expected content.
|
||||
/**
|
||||
* This method should be called to signal that an EOF has been
|
||||
* detected before all the expected content arrived.
|
||||
* <p/>
|
||||
* Typically this will result in an EOFException being thrown
|
||||
* from a subsequent read rather than a -1 return.
|
||||
*/
|
||||
|
@ -199,28 +235,34 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_eof==null || !_eof.isEOF())
|
||||
if (!isEOF())
|
||||
{
|
||||
LOG.debug("{} early EOF", this);
|
||||
_eof=EARLY_EOF;
|
||||
if (_listener!=null)
|
||||
_channelState.onReadPossible();
|
||||
_eofState = EARLY_EOF;
|
||||
if (_listener == null)
|
||||
return;
|
||||
}
|
||||
}
|
||||
_channelState.onReadPossible();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should be called to signal that all the expected
|
||||
* content arrived.
|
||||
*/
|
||||
public void messageComplete()
|
||||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_eof==null || !_eof.isEOF())
|
||||
if (!isEOF())
|
||||
{
|
||||
LOG.debug("{} EOF", this);
|
||||
_eof=EOF;
|
||||
if (_listener!=null)
|
||||
_channelState.onReadPossible();
|
||||
_eofState = EOF;
|
||||
if (_listener == null)
|
||||
return;
|
||||
}
|
||||
}
|
||||
_channelState.onReadPossible();
|
||||
}
|
||||
|
||||
public void consumeAll()
|
||||
|
@ -232,10 +274,10 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
while (!isFinished())
|
||||
{
|
||||
T item = getNextContent();
|
||||
if (item==null)
|
||||
_state.waitForContent(this);
|
||||
if (item == null)
|
||||
_contentState.waitForContent(this);
|
||||
else
|
||||
consume(item,remaining(item));
|
||||
consume(item, remaining(item));
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
|
@ -245,35 +287,46 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether an EOF has been detected, even though there may be content to consume.
|
||||
*/
|
||||
public boolean isEOF()
|
||||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
return _eofState != null && _eofState.isEOF();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinished()
|
||||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
return _state.isEOF();
|
||||
return _contentState.isEOF();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReady()
|
||||
{
|
||||
boolean finished;
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_listener==null)
|
||||
if (_listener == null)
|
||||
return true;
|
||||
int available = available();
|
||||
if (available>0)
|
||||
if (available() > 0)
|
||||
return true;
|
||||
if (!_notReady)
|
||||
{
|
||||
_notReady=true;
|
||||
if (_state.isEOF())
|
||||
_channelState.onReadPossible();
|
||||
else
|
||||
unready();
|
||||
}
|
||||
return false;
|
||||
if (_notReady)
|
||||
return false;
|
||||
_notReady = true;
|
||||
finished = isFinished();
|
||||
}
|
||||
if (finished)
|
||||
_channelState.onReadPossible();
|
||||
else
|
||||
unready();
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void unready()
|
||||
|
@ -283,80 +336,79 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
@Override
|
||||
public void setReadListener(ReadListener readListener)
|
||||
{
|
||||
if (readListener==null)
|
||||
throw new NullPointerException("readListener==null");
|
||||
readListener = Objects.requireNonNull(readListener);
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_state!=BLOCKING)
|
||||
throw new IllegalStateException("state="+_state);
|
||||
_state=ASYNC;
|
||||
_listener=readListener;
|
||||
_notReady=true;
|
||||
|
||||
_channelState.onReadPossible();
|
||||
if (_contentState != STREAM)
|
||||
throw new IllegalStateException("state=" + _contentState);
|
||||
_contentState = ASYNC;
|
||||
_listener = readListener;
|
||||
_notReady = true;
|
||||
}
|
||||
_channelState.onReadPossible();
|
||||
}
|
||||
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
if (_onError==null)
|
||||
if (_onError == null)
|
||||
LOG.warn(x);
|
||||
else
|
||||
_onError=x;
|
||||
_onError = x;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final boolean available;
|
||||
final Throwable error;
|
||||
final ReadListener listener;
|
||||
boolean available = false;
|
||||
final boolean eof;
|
||||
final Throwable x;
|
||||
|
||||
synchronized (lock())
|
||||
{
|
||||
if (!_notReady || _listener==null)
|
||||
if (!_notReady || _listener == null)
|
||||
return;
|
||||
|
||||
x=_onError;
|
||||
T item;
|
||||
error = _onError;
|
||||
listener = _listener;
|
||||
|
||||
try
|
||||
{
|
||||
item = getNextContent();
|
||||
T item = getNextContent();
|
||||
available = item != null && remaining(item) > 0;
|
||||
}
|
||||
catch(Exception e)
|
||||
catch (Exception e)
|
||||
{
|
||||
item=null;
|
||||
failed(e);
|
||||
}
|
||||
available= item!=null && remaining(item)>0;
|
||||
|
||||
eof = !available && _state.isEOF();
|
||||
_notReady=!available&&!eof;
|
||||
eof = !available && isFinished();
|
||||
_notReady = !available && !eof;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (x!=null)
|
||||
_listener.onError(x);
|
||||
if (error != null)
|
||||
listener.onError(error);
|
||||
else if (available)
|
||||
_listener.onDataAvailable();
|
||||
listener.onDataAvailable();
|
||||
else if (eof)
|
||||
_listener.onAllDataRead();
|
||||
listener.onAllDataRead();
|
||||
else
|
||||
unready();
|
||||
}
|
||||
catch(Throwable e)
|
||||
catch (Throwable e)
|
||||
{
|
||||
LOG.warn(e.toString());
|
||||
LOG.debug(e);
|
||||
_listener.onError(e);
|
||||
listener.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected static class State
|
||||
protected static abstract class State
|
||||
{
|
||||
public void waitForContent(HttpInput<?> in) throws IOException
|
||||
{
|
||||
|
@ -373,26 +425,28 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
}
|
||||
}
|
||||
|
||||
protected static final State BLOCKING= new State()
|
||||
protected static final State STREAM = new State()
|
||||
{
|
||||
@Override
|
||||
public void waitForContent(HttpInput<?> in) throws IOException
|
||||
public void waitForContent(HttpInput<?> input) throws IOException
|
||||
{
|
||||
in.blockForContent();
|
||||
input.blockForContent();
|
||||
}
|
||||
|
||||
public String toString()
|
||||
{
|
||||
return "OPEN";
|
||||
return "STREAM";
|
||||
}
|
||||
};
|
||||
|
||||
protected static final State ASYNC= new State()
|
||||
protected static final State ASYNC = new State()
|
||||
{
|
||||
@Override
|
||||
public int noContent() throws IOException
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -400,25 +454,27 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
|
|||
}
|
||||
};
|
||||
|
||||
protected static final State EARLY_EOF= new State()
|
||||
protected static final State EARLY_EOF = new State()
|
||||
{
|
||||
@Override
|
||||
public int noContent() throws IOException
|
||||
{
|
||||
throw new EofException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEOF()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
public String toString()
|
||||
{
|
||||
return "EARLY_EOF";
|
||||
}
|
||||
};
|
||||
|
||||
protected static final State EOF= new State()
|
||||
protected static final State EOF = new State()
|
||||
{
|
||||
@Override
|
||||
public boolean isEOF()
|
||||
|
|
|
@ -20,70 +20,59 @@ package org.eclipse.jetty.server;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import javax.servlet.ServletInputStream;
|
||||
|
||||
import org.eclipse.jetty.util.ArrayQueue;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>{@link QueuedHttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
|
||||
* <p>{@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.</p>
|
||||
* <p>{@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
|
||||
* {@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.
|
||||
* <p/>
|
||||
* {@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
|
||||
* but simply holds references to the item, thus the caller must organize for those buffers to valid while
|
||||
* held by this class.</p>
|
||||
* <p>To assist the caller, subclasses may override methods {@link #onAsyncRead()},
|
||||
* {@link #onContentConsumed(Object)} and {@link #onAllContentConsumed()} that can be implemented so that the
|
||||
* caller will know when buffers are queued and consumed.</p>
|
||||
* held by this class.
|
||||
* <p/>
|
||||
* To assist the caller, subclasses may override methods {@link #onAsyncRead()}, {@link #onContentConsumed(Object)}
|
||||
* that can be implemented so that the caller will know when buffers are queued and consumed.
|
||||
*/
|
||||
public abstract class QueuedHttpInput<T> extends HttpInput<T>
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
|
||||
|
||||
private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
|
||||
|
||||
|
||||
public QueuedHttpInput()
|
||||
{
|
||||
}
|
||||
|
||||
/** Add some content to the input stream
|
||||
* @param item
|
||||
*/
|
||||
public void content(T item)
|
||||
{
|
||||
// The buffer is not copied here. This relies on the caller not recycling the buffer
|
||||
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
|
||||
// until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
|
||||
// the signals to the caller that the buffers can be recycled.
|
||||
|
||||
|
||||
synchronized (lock())
|
||||
{
|
||||
boolean empty=_inputQ.isEmpty();
|
||||
|
||||
boolean wasEmpty = _inputQ.isEmpty();
|
||||
_inputQ.add(item);
|
||||
|
||||
if (empty)
|
||||
LOG.debug("{} queued {}", this, item);
|
||||
if (wasEmpty)
|
||||
{
|
||||
if (!onAsyncRead())
|
||||
lock().notify();
|
||||
}
|
||||
|
||||
LOG.debug("{} queued {}", this, item);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void recycle()
|
||||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
T item = _inputQ.peekUnsafe();
|
||||
T item = _inputQ.pollUnsafe();
|
||||
while (item != null)
|
||||
{
|
||||
_inputQ.pollUnsafe();
|
||||
onContentConsumed(item);
|
||||
|
||||
item = _inputQ.peekUnsafe();
|
||||
if (item == null)
|
||||
onAllContentConsumed();
|
||||
item = _inputQ.pollUnsafe();
|
||||
}
|
||||
super.recycle();
|
||||
}
|
||||
|
@ -92,28 +81,27 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
|
|||
@Override
|
||||
protected T nextContent()
|
||||
{
|
||||
T item = _inputQ.peekUnsafe();
|
||||
|
||||
// Skip empty items at the head of the queue
|
||||
while (item != null && remaining(item) == 0)
|
||||
synchronized (lock())
|
||||
{
|
||||
_inputQ.pollUnsafe();
|
||||
onContentConsumed(item);
|
||||
LOG.debug("{} consumed {}", this, item);
|
||||
item = _inputQ.peekUnsafe();
|
||||
|
||||
// If that was the last item then notify
|
||||
if (item==null)
|
||||
onAllContentConsumed();
|
||||
// Items are removed only when they are fully consumed.
|
||||
T item = _inputQ.peekUnsafe();
|
||||
// Skip consumed items at the head of the queue.
|
||||
while (item != null && remaining(item) == 0)
|
||||
{
|
||||
_inputQ.pollUnsafe();
|
||||
onContentConsumed(item);
|
||||
LOG.debug("{} consumed {}", this, item);
|
||||
item = _inputQ.peekUnsafe();
|
||||
}
|
||||
return item;
|
||||
}
|
||||
return item;
|
||||
}
|
||||
|
||||
|
||||
protected void blockForContent() throws IOException
|
||||
{
|
||||
synchronized (lock())
|
||||
{
|
||||
while (_inputQ.isEmpty() && !isFinished())
|
||||
while (_inputQ.isEmpty() && !isFinished() && !isEOF())
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -128,13 +116,12 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract void onContentConsumed(T item);
|
||||
|
||||
/** Called by this HttpInput to signal all available content has been consumed
|
||||
/**
|
||||
* Callback that signals that the given content has been consumed.
|
||||
*
|
||||
* @param item the consumed content
|
||||
*/
|
||||
protected void onAllContentConsumed()
|
||||
{
|
||||
}
|
||||
protected abstract void onContentConsumed(T item);
|
||||
|
||||
public void earlyEOF()
|
||||
{
|
||||
|
@ -144,7 +131,7 @@ public abstract class QueuedHttpInput<T> extends HttpInput<T>
|
|||
lock().notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void messageComplete()
|
||||
{
|
||||
synchronized (lock())
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class QueuedHttpInputTest
|
||||
{
|
||||
@Test
|
||||
public void testNoContentMessageComplete() throws Exception
|
||||
{
|
||||
ByteBufferQueuedHttpInput input = new ByteBufferQueuedHttpInput();
|
||||
input.messageComplete();
|
||||
|
||||
input.getNextContent();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue