414891 - Errors thrown by ReadListener and WriteListener not handled correctly.

Now correctly catching Throwable to handle all exception cases.
This commit is contained in:
Simone Bordet 2013-08-12 18:52:14 +02:00
parent 1e25778d57
commit 0cc46e8beb
3 changed files with 342 additions and 89 deletions

View File

@ -19,16 +19,11 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import javax.servlet.ServletInputStream;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -55,12 +50,12 @@ import org.eclipse.jetty.util.log.Logger;
public abstract class HttpInput<T> extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private HttpChannelState _channelState;
private Throwable _onError;
private ReadListener _listener;
private boolean _notReady;
protected State _state = BLOCKING;
private State _eof=null;
private final Object _lock;
@ -69,12 +64,12 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
{
this(null);
}
protected HttpInput(Object lock)
{
_lock=lock==null?this:lock;
}
public final Object lock()
{
return _lock;
@ -91,16 +86,16 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
}
/**
* Access the next content to be consumed from. Returning the next item does not consume it
* Access the next content to be consumed from. Returning the next item does not consume it
* and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
* or {@link #consume(Object, int)} are required to consume data from the content.
* @return Content or null if none available.
* @throws IOException
*/
protected abstract T nextContent() throws IOException;
/**
* A convenience method to call nextContent and to check the return value, which if null then the
* A convenience method to call nextContent and to check the return value, which if null then the
* a check is made for EOF and the state changed accordingly.
* @see #nextContent()
* @return Content or null if none available.
@ -109,17 +104,17 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected T getNextContent() throws IOException
{
T content=nextContent();
if (content==null && _eof!=null)
if (content==null && _eof!=null)
{
LOG.debug("{} eof {}",this,_eof);
_state=_eof;
_eof=null;
}
return content;
}
@Override
public int read() throws IOException
{
@ -155,7 +150,7 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
// Get the current head of the input Q
item = getNextContent();
// If we have no item
if (item == null)
{
@ -171,11 +166,11 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected abstract int remaining(T item);
protected abstract int get(T item, byte[] buffer, int offset, int length);
protected abstract void consume(T item, int length);
protected abstract void blockForContent() throws IOException;
protected boolean onAsyncRead()
{
if (_listener==null)
@ -189,7 +184,7 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
* @param item
*/
public abstract void content(T item);
/* ------------------------------------------------------------ */
/** This method should be called to signal to the HttpInput
@ -283,7 +278,7 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
protected void unready()
{
}
@Override
public void setReadListener(ReadListener readListener)
{
@ -308,17 +303,17 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
if (_onError==null)
LOG.warn(x);
else
_onError=x;
_onError=x;
}
}
@Override
public void run()
{
final boolean available;
final boolean eof;
final Throwable x;
synchronized (lock())
{
if (!_notReady || _listener==null)
@ -352,26 +347,26 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
else
unready();
}
catch(Exception e)
catch(Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
_listener.onError(e);
}
}
}
protected static class State
{
public void waitForContent(HttpInput<?> in) throws IOException
{
}
public int noContent() throws IOException
{
return -1;
}
public boolean isEOF()
{
return false;
@ -390,7 +385,7 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
return "OPEN";
}
};
protected static final State ASYNC= new State()
{
@Override
@ -398,16 +393,16 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
{
return 0;
}
@Override
@Override
public String toString()
{
return "ASYNC";
}
};
protected static final State EARLY_EOF= new State()
{
@Override
@Override
public int noContent() throws IOException
{
throw new EofException();

View File

@ -32,7 +32,6 @@ import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -60,12 +59,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private int _bufferSize;
private WriteListener _writeListener;
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY
/*
ACTION OPEN ASYNC READY PENDING UNREADY
-------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise
write() OPEN ise PENDING wpe wpe
setWriteListener() READY->owp ise ise ise ise
write() OPEN ise PENDING wpe wpe
flush() OPEN ise PENDING wpe wpe
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
write completed - - - ASYNC READY->owp
@ -73,8 +72,8 @@ write completed - - - ASYNC READY->owp
*/
enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
public HttpOutput(HttpChannel<?> channel)
{
@ -107,7 +106,7 @@ write completed - - - ASYNC READY->owp
{
return _channel.getResponse().isAllContentWritten(_written);
}
@Override
public void close()
{
@ -134,7 +133,7 @@ write completed - - - ASYNC READY->owp
state=_state.get();
}
}
/* Called to indicated that the output is already closed and the state needs to be updated to match */
void closed()
{
@ -145,7 +144,7 @@ write completed - - - ASYNC READY->owp
{
try
{
_channel.getResponse().closeOutput();
_channel.getResponse().closeOutput();
}
catch(IOException e)
{
@ -186,20 +185,20 @@ write completed - - - ASYNC READY->owp
else
_channel.write(BufferUtil.EMPTY_BUFFER, false);
return;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
continue;
new AsyncFlush().process();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case CLOSED:
return;
}
@ -207,10 +206,10 @@ write completed - - - ASYNC READY->owp
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
{
_written+=len;
boolean complete=_channel.getResponse().isAllContentWritten(_written);
@ -222,7 +221,7 @@ write completed - - - ASYNC READY->owp
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
@ -267,9 +266,9 @@ write completed - - - ASYNC READY->owp
break;
}
// handle blocking write
// Should we aggregate?
int capacity = getBufferSize();
if (!complete && len<=capacity/4)
@ -316,7 +315,7 @@ write completed - - - ASYNC READY->owp
}
public void write(ByteBuffer buffer) throws IOException
{
{
_written+=buffer.remaining();
boolean complete=_channel.getResponse().isAllContentWritten(_written);
@ -328,7 +327,7 @@ write completed - - - ASYNC READY->owp
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
@ -350,7 +349,7 @@ write completed - - - ASYNC READY->owp
break;
}
// handle blocking write
int len=BufferUtil.length(buffer);
@ -383,7 +382,7 @@ write completed - - - ASYNC READY->owp
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
@ -394,7 +393,7 @@ write completed - - - ASYNC READY->owp
closed();
}
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
@ -510,13 +509,13 @@ write completed - - - ASYNC READY->owp
public void failed(Throwable x)
{
callback.failed(x);
}
}
});
}
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
* @param in The content to send as a stream. The stream will be closed
* @param in The content to send as a stream. The stream will be closed
* after reading all content.
* @param callback The callback to use to notify success or failure
*/
@ -527,7 +526,7 @@ write completed - - - ASYNC READY->owp
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
* @param in The content to send as a channel. The channel will be closed
* @param in The content to send as a channel. The channel will be closed
* after reading all content.
* @param callback The callback to use to notify success or failure
*/
@ -611,7 +610,7 @@ write completed - - - ASYNC READY->owp
{
if (!_channel.getState().isAsync())
throw new IllegalStateException("!ASYNC");
if (_state.compareAndSet(State.OPEN, State.READY))
{
_writeListener = writeListener;
@ -621,7 +620,7 @@ write completed - - - ASYNC READY->owp
throw new IllegalStateException();
}
/**
/**
* @see javax.servlet.ServletOutputStream#isReady()
*/
@Override
@ -667,7 +666,7 @@ write completed - - - ASYNC READY->owp
{
_writeListener.onWritePossible();
}
catch (IOException e)
catch (Throwable e)
{
_writeListener.onError(e);
close();
@ -681,14 +680,14 @@ write completed - - - ASYNC READY->owp
private final boolean _complete;
private final int _len;
public AsyncWrite(byte[] b, int off, int len, boolean complete)
public AsyncWrite(byte[] b, int off, int len, boolean complete)
{
_buffer=ByteBuffer.wrap(b, off, len);
_complete=complete;
_len=len;
}
public AsyncWrite(ByteBuffer buffer, boolean complete)
public AsyncWrite(ByteBuffer buffer, boolean complete)
{
_buffer=buffer;
_complete=complete;
@ -696,7 +695,7 @@ write completed - - - ASYNC READY->owp
}
@Override
protected boolean process()
protected boolean process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
@ -732,9 +731,9 @@ write completed - - - ASYNC READY->owp
private class AsyncFlush extends IteratingCallback
{
protected boolean _flushed;
protected boolean _flushed;
public AsyncFlush()
public AsyncFlush()
{
}
@ -754,7 +753,7 @@ write completed - - - ASYNC READY->owp
_channel.write(BufferUtil.EMPTY_BUFFER,false,this);
return false;
}
return true;
}
@ -772,13 +771,13 @@ write completed - - - ASYNC READY->owp
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
continue;
break;
case UNREADY:
if (!_state.compareAndSet(State.UNREADY, State.READY))
continue;
_channel.getState().onWritePossible();
break;
case CLOSED:
_onError=new EofException("Closed");
break;
@ -796,7 +795,7 @@ write completed - - - ASYNC READY->owp
_channel.getState().onWritePossible();
}
}
@Override
public void failed(Throwable e)
{
@ -809,12 +808,12 @@ write completed - - - ASYNC READY->owp
/* ------------------------------------------------------------ */
/** An iterating callback that will take content from an
/** An iterating callback that will take content from an
* InputStream and write it to the associated {@link HttpChannel}.
* A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
* A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
*/
private class InputStreamWritingCB extends IteratingNestedCallback
{
@ -822,7 +821,7 @@ write completed - - - ASYNC READY->owp
final ByteBuffer _buffer;
public InputStreamWritingCB(InputStream in, Callback callback)
{
{
super(callback);
_in=in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
@ -833,7 +832,7 @@ write completed - - - ASYNC READY->owp
{
boolean eof=false;
int len=_in.read(_buffer.array(),0,_buffer.capacity());
if (len<0)
{
eof=true;
@ -849,12 +848,12 @@ write completed - - - ASYNC READY->owp
else
len+=len2;
}
// write what we have
_buffer.position(0);
_buffer.limit(len);
_channel.write(_buffer,eof,this);
// Handle EOF
if (eof)
{
@ -884,13 +883,13 @@ write completed - - - ASYNC READY->owp
}
/* ------------------------------------------------------------ */
/** An iterating callback that will take content from a
/** An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
*/
private class ReadableByteChannelWritingCB extends IteratingNestedCallback
{
@ -898,7 +897,7 @@ write completed - - - ASYNC READY->owp
final ByteBuffer _buffer;
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{
{
super(callback);
_in=in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
@ -910,7 +909,7 @@ write completed - - - ASYNC READY->owp
_buffer.clear();
boolean eof=false;
int len=_in.read(_buffer);
if (len<0)
{
eof=true;
@ -926,11 +925,11 @@ write completed - - - ASYNC READY->owp
else
len+=len2;
}
// write what we have
_buffer.flip();
_channel.write(_buffer,eof,this);
// Handle EOF
if (eof)
{

View File

@ -0,0 +1,259 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlet;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class AsyncIOServletTest
{
private Server server;
private ServerConnector connector;
private ServletContextHandler context;
private String path = "/path";
public void startServer(HttpServlet servlet) throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
context = new ServletContextHandler(server, "", false, false);
ServletHolder holder = new ServletHolder(servlet);
holder.setAsyncSupported(true);
context.addServlet(holder, path);
server.start();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testAsyncReadThrowsException() throws Exception
{
testAsyncReadThrows(new NullPointerException("explicitly_thrown_by_test"));
}
@Test
public void testAsyncReadThrowsError() throws Exception
{
testAsyncReadThrows(new Error("explicitly_thrown_by_test"));
}
private void testAsyncReadThrows(final Throwable throwable) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext asyncContext = request.startAsync(request, response);
request.getInputStream().setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
if (throwable instanceof RuntimeException)
throw (RuntimeException)throwable;
if (throwable instanceof Error)
throw (Error)throwable;
throw new IOException(throwable);
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
Assert.assertSame(throwable, t);
latch.countDown();
response.setStatus(500);
asyncContext.complete();
}
});
}
});
String data = "0123456789";
String request = "GET " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Content-Length: " + data.length() + "\r\n" +
"\r\n" +
data;
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
SimpleHttpParser parser = new SimpleHttpParser();
SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8")));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals("500", response.getCode());
}
}
@Test
public void testOnErrorThrows() throws Exception
{
final AtomicInteger errors = new AtomicInteger();
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext asyncContext = request.startAsync(request, response);
request.getInputStream().setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
throw new NullPointerException("explicitly_thrown_by_test_1");
}
@Override
public void onAllDataRead() throws IOException
{
}
@Override
public void onError(Throwable t)
{
errors.incrementAndGet();
throw new NullPointerException("explicitly_thrown_by_test_2");
}
});
}
});
String data = "0123456789";
String request = "GET " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Content-Length: " + data.length() + "\r\n" +
"\r\n" +
data;
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
SimpleHttpParser parser = new SimpleHttpParser();
SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8")));
Assert.assertEquals("500", response.getCode());
Assert.assertEquals(1, errors.get());
}
}
@Test
public void testAsyncWriteThrowsException() throws Exception
{
testAsyncWriteThrows(new NullPointerException("explicitly_thrown_by_test"));
}
@Test
public void testAsyncWriteThrowsError() throws Exception
{
testAsyncWriteThrows(new Error("explicitly_thrown_by_test"));
}
private void testAsyncWriteThrows(final Throwable throwable) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext asyncContext = request.startAsync(request, response);
response.getOutputStream().setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
if (throwable instanceof RuntimeException)
throw (RuntimeException)throwable;
if (throwable instanceof Error)
throw (Error)throwable;
throw new IOException(throwable);
}
@Override
public void onError(Throwable t)
{
Assert.assertSame(throwable, t);
latch.countDown();
response.setStatus(500);
asyncContext.complete();
}
});
}
});
String request = "GET " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"\r\n";
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
SimpleHttpParser parser = new SimpleHttpParser();
SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8")));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals("500", response.getCode());
}
}
}