Added HttpInput prepend content

Allows content to be reread
This commit is contained in:
Greg Wilkins 2016-02-05 14:54:39 +01:00
parent 90efbe62c8
commit 7b5d12b338
2 changed files with 90 additions and 1 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
@ -51,7 +52,7 @@ public class HttpInput extends ServletInputStream implements Runnable
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
private final Queue<Content> _inputQ = new ArrayDeque<>();
private final Deque<Content> _inputQ = new ArrayDeque<>();
private final HttpChannelState _channelState;
private ReadListener _listener;
private State _state = STREAM;
@ -369,6 +370,33 @@ public class HttpInput extends ServletInputStream implements Runnable
}
}
/**
* Adds some content to the start of this input stream.
* <p>Typically used to push back content that has
* been read, perhaps mutated. The bytes prepended are
* deducted for the contentConsumed total</p>
* @param item the content to add
* @return true if content channel woken for read
*/
public boolean prependContent(Content item)
{
boolean woken=false;
synchronized (_inputQ)
{
_inputQ.push(item);
_contentConsumed-=item.remaining();
if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}", this, item);
if (_listener==null)
_inputQ.notify();
else
woken=_channelState.onReadPossible();
}
return woken;
}
/**
* Adds some content to this input stream.
*

View File

@ -221,6 +221,67 @@ public class HttpInputTest
assertThat(_history.poll(),nullValue());
}
@Test
public void testReRead() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
assertThat(_in.available(),equalTo(2));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.getContentConsumed(),equalTo(0L));
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.getContentConsumed(),equalTo(1L));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_in.getContentConsumed(),equalTo(2L));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'E'));
_in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde")));
assertThat(_in.available(),equalTo(5));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.getContentConsumed(),equalTo(0L));
assertThat(_in.read(),equalTo((int)'a'));
assertThat(_in.getContentConsumed(),equalTo(1L));
assertThat(_in.read(),equalTo((int)'b'));
assertThat(_in.getContentConsumed(),equalTo(2L));
assertThat(_in.read(),equalTo((int)'c'));
assertThat(_in.read(),equalTo((int)'d'));
assertThat(_in.read(),equalTo((int)'e'));
assertThat(_in.read(),equalTo((int)'F'));
assertThat(_history.poll(),equalTo("produceContent 2"));
assertThat(_history.poll(),equalTo("Content succeeded EF"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'G'));
assertThat(_in.read(),equalTo((int)'H'));
assertThat(_history.poll(),equalTo("Content succeeded GH"));
assertThat(_history.poll(),nullValue());
assertThat(_in.getContentConsumed(),equalTo(8L));
assertThat(_history.poll(),nullValue());
}
@Test
public void testBlockingRead() throws Exception
{