improved HttpInputTest

This commit is contained in:
Greg Wilkins 2014-12-21 12:42:27 +01:00
parent ebaf84b97e
commit 24ffe377ce
2 changed files with 221 additions and 27 deletions

View File

@ -51,7 +51,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
private final byte[] _oneByteBuffer = new byte[1];
private final ArrayQueue<Content> _inputQ = new ArrayQueue<>();
private ReadListener _listener;
private boolean _notReady;
private boolean _unready;
private State _state = STREAM;
private long _contentConsumed;
@ -77,7 +77,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
item = _inputQ.pollUnsafe();
}
_listener = null;
_notReady = false;
_unready = false;
_state = STREAM;
_contentConsumed = 0;
}
@ -209,7 +209,9 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l);
_contentConsumed+=length;
_contentConsumed+=l;
if (l>0 && !content.hasContent())
pollInputQ(); // hungry succeed
return l;
}
@ -222,9 +224,13 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
protected void skip(Content content, int length)
{
int l = Math.min(content.remaining(), length);
ByteBuffer buffer = content.getContent();
buffer.position(buffer.position()+length);
_contentConsumed+=length;
buffer.position(buffer.position()+l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
pollInputQ(); // hungry succeed
}
/**
@ -274,12 +280,15 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
protected boolean onAsyncRead()
{
boolean needReadCB=false;
synchronized (_inputQ)
{
if (_listener == null)
return false;
needReadCB=_unready;
}
onReadPossible();
if (needReadCB)
onReadPossible();
return true;
}
@ -416,15 +425,15 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
boolean finished;
synchronized (_inputQ)
{
if (available() > 0)
return true;
if (_state.isEOF())
return true;
if (_listener == null )
return true;
if (available() > 0)
return true;
if (_notReady)
if (_unready)
return false;
_notReady = true;
_unready = true;
finished = isFinished();
}
if (finished)
@ -448,7 +457,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
throw new IllegalStateException("state=" + _state);
_state = ASYNC;
_listener = readListener;
_notReady = true;
_unready = true;
}
onReadPossible();
}
@ -474,7 +483,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
synchronized (_inputQ)
{
if (!_notReady || _listener == null)
if (!_unready || _listener == null)
return;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
@ -491,7 +500,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
eof = !available && isFinished();
_notReady = !available && !eof;
_unready = !available && !eof;
}
try

View File

@ -20,11 +20,14 @@ package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
@ -35,6 +38,7 @@ import javax.servlet.ReadListener;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -74,6 +78,29 @@ public class HttpInputTest
}
};
public class TContent extends HttpInput.Content
{
private final String _content;
public TContent(String content)
{
super(BufferUtil.toBuffer(content));
_content=content;
}
@Override
public void succeeded()
{
_history.add("Content succeeded "+_content);
super.succeeded();
}
@Override
public void failed(Throwable x)
{
_history.add("Content failed "+_content);
super.failed(x);
}
}
@Before
public void before()
@ -92,7 +119,12 @@ public class HttpInputTest
_history.add("produceContent "+_content.size());
for (String s=_content.poll();s!=null;s=_content.poll())
_in.content(new Content(BufferUtil.toBuffer(s)));
{
if ("_EOF_".equals(s))
_in.eof();
else
_in.content(new TContent(s));
}
}
@Override
@ -110,11 +142,21 @@ public class HttpInputTest
};
}
@After
public void after()
{
assertThat(_history.poll(),nullValue());
}
@Test
public void testEmpty() throws Exception
{
assertThat(_in.available(),equalTo(0));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
}
@ -122,26 +164,43 @@ public class HttpInputTest
@Test
public void testRead() throws Exception
{
_in.content(new HttpInput.Content(BufferUtil.toBuffer("AB")));
_in.content(new HttpInput.Content(BufferUtil.toBuffer("CD")));
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_content.offer("EF");
_content.offer("GH");
assertThat(_in.available(),equalTo(2));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.getContentConsumed(),equalTo(0L));
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.getContentConsumed(),equalTo(1L));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'E'));
assertThat(_in.read(),equalTo((int)'F'));
assertThat(_history.poll(),equalTo("produceContent 2"));
assertThat(_history.poll(),equalTo("Content succeeded EF"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'G'));
assertThat(_in.read(),equalTo((int)'H'));
assertThat(_history.poll(),equalTo("Content succeeded GH"));
assertThat(_history.poll(),nullValue());
assertThat(_in.getContentConsumed(),equalTo(8L));
assertThat(_history.poll(),equalTo("produceContent 2"));
assertThat(_history.poll(),nullValue());
}
@ -155,7 +214,7 @@ public class HttpInputTest
try
{
Thread.sleep(500);
_in.content(new HttpInput.Content(BufferUtil.toBuffer("AB")));
_in.content(new TContent("AB"));
}
catch(Throwable th)
{
@ -171,13 +230,16 @@ public class HttpInputTest
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
}
@Test
public void testReadEOF() throws Exception
{
_in.content(new HttpInput.Content(BufferUtil.toBuffer("AB")));
_in.content(new HttpInput.Content(BufferUtil.toBuffer("CD")));
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_in.eof();
assertThat(_in.isFinished(),equalTo(false));
@ -186,10 +248,16 @@ public class HttpInputTest
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
@ -199,8 +267,8 @@ public class HttpInputTest
@Test
public void testReadEarlyEOF() throws Exception
{
_in.content(new HttpInput.Content(BufferUtil.toBuffer("AB")));
_in.content(new HttpInput.Content(BufferUtil.toBuffer("CD")));
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_in.earlyEOF();
assertThat(_in.isFinished(),equalTo(false));
@ -209,10 +277,10 @@ public class HttpInputTest
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_in.isFinished(),equalTo(false));
try
{
@ -223,7 +291,9 @@ public class HttpInputTest
{
assertThat(_in.isFinished(),equalTo(true));
}
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
}
@ -280,21 +350,47 @@ public class HttpInputTest
public void testAsyncRead() throws Exception
{
_in.setReadListener(_listener);
_in.run();
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.content(new HttpInput.Content(BufferUtil.toBuffer("AB")));
_in.content(new TContent("AB"));
_content.add("CD");
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
}
@ -322,6 +418,53 @@ public class HttpInputTest
assertThat(_history.poll(),nullValue());
}
@Test
public void testAsyncReadEOF() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.content(new TContent("AB"));
_content.add("_EOF_");
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),nullValue());
}
@Test
public void testAsyncError() throws Exception
{
@ -364,4 +507,46 @@ public class HttpInputTest
_in.recycle();
testReadEOF();
}
@Test
public void testConsumeAll() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_content.offer("EF");
_content.offer("GH");
assertThat(_in.read(),equalTo((int)'A'));
assertFalse(_in.consumeAll());
assertThat(_in.getContentConsumed(),equalTo(8L));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),equalTo("produceContent 2"));
assertThat(_history.poll(),equalTo("Content succeeded EF"));
assertThat(_history.poll(),equalTo("Content succeeded GH"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
}
@Test
public void testConsumeAllEOF() throws Exception
{
_in.content(new TContent("AB"));
_in.content(new TContent("CD"));
_content.offer("EF");
_content.offer("GH");
_content.offer("_EOF_");
assertThat(_in.read(),equalTo((int)'A'));
assertTrue(_in.consumeAll());
assertThat(_in.getContentConsumed(),equalTo(8L));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),equalTo("produceContent 3"));
assertThat(_history.poll(),equalTo("Content succeeded EF"));
assertThat(_history.poll(),equalTo("Content succeeded GH"));
assertThat(_history.poll(),nullValue());
}
}