Added test cases and improved isReady and isFinished handling

This commit is contained in:
Greg Wilkins 2015-02-05 14:26:34 +11:00
parent f6cfe07a69
commit 7db7ef3020
5 changed files with 247 additions and 93 deletions

View File

@ -296,19 +296,6 @@ public class HttpChannelState
throw new IllegalStateException(this.getStatusString());
}
if (_asyncRead)
{
_state=State.ASYNC_IO;
_asyncRead=false;
return Action.READ_CALLBACK;
}
if (_asyncWrite)
{
_asyncWrite=false;
_state=State.ASYNC_IO;
return Action.WRITE_CALLBACK;
}
if (_async!=null)
{
@ -327,8 +314,25 @@ public class HttpChannelState
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_EXPIRED;
case EXPIRING:
case STARTED:
if (_asyncRead)
{
_state=State.ASYNC_IO;
_asyncRead=false;
return Action.READ_CALLBACK;
}
if (_asyncWrite)
{
_asyncWrite=false;
_state=State.ASYNC_IO;
return Action.WRITE_CALLBACK;
}
scheduleTimeout();
_state=State.ASYNC_WAIT;
return Action.WAIT;
case EXPIRING:
scheduleTimeout();
_state=State.ASYNC_WAIT;
return Action.WAIT;

View File

@ -54,7 +54,7 @@ public class HttpConfiguration
private boolean _sendServerVersion = true;
private boolean _sendXPoweredBy = false;
private boolean _sendDateHeader = true;
private boolean _delayDispatchUntilContent = false;
private boolean _delayDispatchUntilContent = false; // TODO change to true
/* ------------------------------------------------------------ */
/**

View File

@ -27,6 +27,7 @@ import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -112,7 +113,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
int read = read(_oneByteBuffer, 0, 1);
if (read==0)
throw new IllegalStateException("unready");
throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@ -148,12 +149,9 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed.
* <p/>
* Calls to {@link #get(Content, byte[], int, int)}
* or {@link #skip(Content, int)} are required to consume data from the content.
*
* Get the next content from the inputQ, calling {@link #produceContent()}
* if need be. EOF is processed and state changed.
*
* @return the content or null if none available.
* @throws IOException if retrieving the content fails
*/
@ -161,16 +159,21 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
Content content = pollInputQ();
Content content = pollContent();
if (content==null && !isFinished())
{
produceContent();
content = pollInputQ();
content = pollContent();
}
return content;
}
protected Content pollInputQ()
/** Poll the inputQ for Content.
* Consumed buffers and {@link PoisonPillContent}s are removed and
* EOF state updated if need be.
* @return Content or null
*/
protected Content pollContent()
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
@ -203,7 +206,55 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return content;
}
/**
* Get the next readable from the inputQ, calling {@link #produceContent()}
* if need be. EOF is NOT processed and state is not changed.
*
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
*/
protected Content nextReadable() throws IOException
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
Content content = pollReadable();
if (content==null && !isFinished())
{
produceContent();
content = pollReadable();
}
return content;
}
/** Poll the inputQ for Content or EOF.
* Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
* EOF state is not updated.
* @return Content, EOF or null
*/
protected Content pollReadable()
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
// Items are removed only when they are fully consumed.
Content content = _inputQ.peekUnsafe();
// Skip consumed items at the head of the queue except EOF
while (content != null)
{
if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
return content;
_inputQ.pollUnsafe();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peekUnsafe();
}
return content;
}
/**
* @param item the content
@ -214,7 +265,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return item.remaining();
}
/**
* Copies the given content into the given byte buffer.
*
@ -230,7 +280,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
content.getContent().get(buffer, offset, l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
pollInputQ(); // hungry succeed
pollContent(); // hungry succeed
return l;
}
@ -248,7 +298,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
buffer.position(buffer.position()+l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
pollInputQ(); // hungry succeed
pollContent(); // hungry succeed
}
@ -391,34 +441,29 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public boolean isReady()
{
synchronized (_inputQ)
try
{
if (_listener == null )
return true;
if (_unready)
return false;
if (_state instanceof EOFState)
return true;
if (_inputQ.isEmpty())
synchronized (_inputQ)
{
try
{
produceContent();
}
catch(IOException e)
{
failed(e);
}
}
if (!_inputQ.isEmpty())
return true;
if (_listener == null )
return true;
if (_unready)
return false;
if (_state instanceof EOFState)
return true;
if (nextReadable()!=null)
return true;
_unready = true;
_unready = true;
}
unready();
return false;
}
catch(IOException e)
{
LOG.ignore(e);
return true;
}
unready();
return false;
}
protected void unready()
@ -429,15 +474,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
public void setReadListener(ReadListener readListener)
{
readListener = Objects.requireNonNull(readListener);
synchronized (_inputQ)
boolean content;
try
{
if (_state != STREAM)
throw new IllegalStateException("state=" + _state);
_state = ASYNC;
_listener = readListener;
_unready = true;
synchronized (_inputQ)
{
if (_state != STREAM)
throw new IllegalStateException("state=" + _state);
_state = ASYNC;
_listener = readListener;
_unready = true;
content=nextContent()!=null;
}
}
onReadPossible();
catch(IOException e)
{
throw new RuntimeIOException(e);
}
if (content)
onReadPossible();
else
unready();
}
public void failed(Throwable x)
@ -488,30 +546,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
try
{
if (error != null)
listener.onError(error);
else if (aeof)
listener.onAllDataRead();
else
{
if (aeof)
listener.onAllDataRead();
else if (error == null)
listener.onDataAvailable();
synchronized (_inputQ)
{
if (_state==AEOF)
{
_state=EOF;
aeof=true;
}
}
if (aeof)
listener.onAllDataRead();
}
else
listener.onError(error);
}
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
listener.onError(e);
try
{
if (aeof || error==null)
listener.onError(e);
}
catch (Throwable e2)
{
LOG.warn(e2.toString());
LOG.debug(e2);
throw new RuntimeIOException(e2);
}
}
}

View File

@ -27,22 +27,21 @@ import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ReadListener;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class HttpInputTest
{
Queue<String> _history = new ConcurrentArrayQueue<String>()
@ -328,7 +327,8 @@ public class HttpInputTest
public void testAsyncEmpty() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -349,7 +349,8 @@ public class HttpInputTest
public void testAsyncRead() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -401,7 +402,8 @@ public class HttpInputTest
public void testAsyncEOF() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -423,7 +425,8 @@ public class HttpInputTest
public void testAsyncReadEOF() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -475,7 +478,8 @@ public class HttpInputTest
public void testAsyncError() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));

View File

@ -39,6 +39,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
@ -69,6 +70,7 @@ public class AsyncIOServletTest
server = new Server();
connector = new ServerConnector(server);
connector.setIdleTimeout(30000);
connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setDelayDispatchUntilContent(false);
server.addConnector(connector);
context = new ServletContextHandler(server, "/", false, false);
@ -149,11 +151,16 @@ public class AsyncIOServletTest
output.write(request.getBytes("UTF-8"));
output.flush();
SimpleHttpParser parser = new SimpleHttpParser();
SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8")));
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String line=in.readLine();
assertThat(line, containsString("500 Server Error"));
while (line.length()>0)
{
line=in.readLine();
}
line=in.readLine();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals("500", response.getCode());
}
}
@ -248,10 +255,10 @@ public class AsyncIOServletTest
}
@Override
public void onError(Throwable t)
public void onError(final Throwable t)
{
errors.incrementAndGet();
throw new NullPointerException("explicitly_thrown_by_test_2");
throw new NullPointerException("explicitly_thrown_by_test_2"){{this.initCause(t);}};
}
});
}
@ -486,6 +493,7 @@ public class AsyncIOServletTest
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
output.write(data);
output.flush();
@ -588,4 +596,86 @@ public class AsyncIOServletTest
assertThat(line, containsString("OK"));
}
}
@Test
public void testCompleteBeforeOnAllDataRead() throws Exception
{
String text = "XYZ";
final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1);
final AtomicBoolean allDataRead = new AtomicBoolean(false);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
response.flushBuffer();
final AsyncContext async = request.startAsync();
final ServletInputStream in = request.getInputStream();
final ServletOutputStream out = response.getOutputStream();
in.setReadListener(new ReadListener()
{
@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
@Override
public void onDataAvailable() throws IOException
{
while (in.isReady())
{
int b = in.read();
if (b<0)
{
out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1));
async.complete();
return;
}
}
}
@Override
public void onAllDataRead() throws IOException
{
out.write("BAD!!!\n".getBytes(StandardCharsets.ISO_8859_1));
allDataRead.set(true);
throw new IllegalStateException();
}
});
}
});
String request = "GET " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Content-Type: text/plain\r\n"+
"Content-Length: "+data.length+"\r\n" +
"Connection: close\r\n" +
"\r\n";
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
Thread.sleep(100);
output.write(data);
output.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String line=in.readLine();
assertThat(line, containsString("200 OK"));
while (line.length()>0)
{
line=in.readLine();
}
line=in.readLine();
assertThat(line, containsString("OK"));
Assert.assertFalse(allDataRead.get());
}
}
}