444485 - Client resets stream, pending server data is failed, write hangs.

Fixed by ensuring that when a failure happens, either by catching an
exception or by failing a callback, we always call completed() and
abort the channel (via new method terminate()).
This commit is contained in:
Simone Bordet 2014-09-18 16:31:16 +02:00
parent b74c6d1b8d
commit bfda3620f3
2 changed files with 304 additions and 111 deletions

View File

@ -18,10 +18,19 @@
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
@ -33,6 +42,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise;
@ -187,4 +197,142 @@ public class StreamResetTest extends AbstractTest
Assert.assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(stream2DataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingWriteAfterStreamReceivingReset() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
Charset charset = StandardCharsets.UTF_8;
byte[] data = "AFTER RESET".getBytes(charset);
response.setStatus(200);
response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.length);
response.flushBuffer();
try
{
// Wait for the reset to happen.
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Wait for the reset to arrive to the server and be processed.
Thread.sleep(1000);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
try
{
// Write some content after the stream has
// been reset, it should throw an exception.
response.getOutputStream().write(data);
}
catch (IOException x)
{
dataLatch.countDown();
}
}
});
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, request, null, true);
client.newStream(frame, new FuturePromise<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
resetLatch.countDown();
}
});
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncWriteAfterStreamReceivingReset() throws Exception
{
final CountDownLatch resetLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
Charset charset = StandardCharsets.UTF_8;
final ByteBuffer data = ByteBuffer.wrap("AFTER RESET".getBytes(charset));
response.setStatus(200);
response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.remaining());
response.flushBuffer();
try
{
// Wait for the reset to happen.
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Wait for the reset to arrive to the server and be processed.
Thread.sleep(1000);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
// Write some content asynchronously after the stream has been reset.
final AsyncContext context = request.startAsync();
new Thread()
{
@Override
public void run()
{
try
{
// Wait for the request thread to exit
// doGet() so this is really asynchronous.
Thread.sleep(1000);
HttpOutput output = (HttpOutput)response.getOutputStream();
output.sendContent(data, new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
context.complete();
dataLatch.countDown();
}
});
}
catch (Throwable x)
{
x.printStackTrace();
}
}
}.start();
}
});
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(0, request, null, true);
client.newStream(frame, new FuturePromise<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
stream.reset(new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
resetLatch.countDown();
}
});
Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS));
}
}

View File

@ -18,13 +18,14 @@
package org.eclipse.jetty.server;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
@ -54,6 +55,7 @@ import org.eclipse.jetty.util.log.Logger;
public class HttpOutput extends ServletOutputStream implements Runnable
{
private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel _channel;
private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
{
@ -69,19 +71,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private int _commitSize;
private WriteListener _writeListener;
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
-----------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise ise
write() OPEN ise PENDING wpe wpe eof
flush() OPEN ise PENDING wpe wpe eof
close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp -
*/
enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel channel)
@ -127,7 +127,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _writeblock.acquire();
}
protected void write(ByteBuffer content, boolean complete) throws IOException
private void write(ByteBuffer content, boolean complete) throws IOException
{
try (Blocker blocker = _writeblock.acquire())
{
@ -138,88 +138,107 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
terminate(failure);
throw failure;
}
}
protected void write(ByteBuffer content, boolean complete, Callback callback)
{
_channel.write(content,complete,callback);
_channel.write(content, complete, callback);
}
private void terminate(Throwable failure)
{
closed();
_channel.abort(failure);
}
@Override
public void close()
{
loop: while(true)
while(true)
{
OutputState state=_state.get();
switch (state)
{
case CLOSED:
{
break loop;
return;
}
case UNREADY:
{
if (_state.compareAndSet(state,OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async close"):_onError);
continue;
break;
}
default:
{
if (_state.compareAndSet(state,OutputState.CLOSED))
if (!_state.compareAndSet(state,OutputState.CLOSED))
break;
try
{
try
{
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
break loop;
}
catch (IOException x)
{
// Ignore it, it's been already logged in write().
}
finally
{
releaseBuffer();
}
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
}
catch (IOException x)
{
// Ignore it, it's been already logged in write().
}
finally
{
releaseBuffer();
}
// Return even if an exception is thrown by write().
return;
}
}
}
}
/* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
/**
* Called to indicate that the last write has been performed.
* It updates the state and performs cleanup operations.
*/
void closed()
{
loop: while(true)
while(true)
{
OutputState state=_state.get();
switch (state)
{
case CLOSED:
break loop;
{
return;
}
case UNREADY:
{
if (_state.compareAndSet(state,OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
continue;
break;
}
default:
if (_state.compareAndSet(state,OutputState.CLOSED))
{
if (!_state.compareAndSet(state, OutputState.CLOSED))
break;
try
{
try
{
_channel.getResponse().closeOutput();
}
catch(Throwable e)
{
LOG.debug(e);
_channel.abort(e);
}
releaseBuffer();
return;
_channel.getResponse().closeOutput();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
terminate(x);
}
finally
{
releaseBuffer();
}
// Return even if an exception is thrown by closeOutput().
return;
}
}
}
}
@ -267,12 +286,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
return;
default:
throw new IllegalStateException();
}
break;
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
@ -330,11 +350,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
// handle blocking write
// Should we aggregate?
@ -432,6 +454,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
@ -511,6 +536,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
@ -525,20 +553,22 @@ public class HttpOutput extends ServletOutputStream implements Runnable
write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
}
/* ------------------------------------------------------------ */
/** Blocking send of content.
* @param content The content to send.
* @throws IOException
/**
* Blocking send of whole content.
*
* @param content The whole content to send
* @throws IOException if the send fails
*/
public void sendContent(ByteBuffer content) throws IOException
{
write(content, true);
}
/* ------------------------------------------------------------ */
/** Blocking send of content.
* @param in The content to send
* @throws IOException
/**
* Blocking send of stream content.
*
* @param in The stream content to send
* @throws IOException if the send fails
*/
public void sendContent(InputStream in) throws IOException
{
@ -551,15 +581,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
terminate(failure);
throw failure;
}
}
/* ------------------------------------------------------------ */
/** Blocking send of content.
* @param in The content to send
* @throws IOException
/**
* Blocking send of channel content.
*
* @param in The channel content to send
* @throws IOException if the send fails
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
@ -572,16 +603,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
terminate(failure);
throw failure;
}
}
/* ------------------------------------------------------------ */
/** Blocking send of content.
* @param content The content to send
* @throws IOException
/**
* Blocking send of HTTP content.
*
* @param content The HTTP content to send
* @throws IOException if the send fails
*/
public void sendContent(HttpContent content) throws IOException
{
@ -594,14 +625,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
terminate(failure);
throw failure;
}
}
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
* @param content The content to send
/**
* Asynchronous send of whole content.
* @param content The whole content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(ByteBuffer content, final Callback callback)
@ -618,15 +649,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void failed(Throwable x)
{
terminate(x);
callback.failed(x);
}
});
}
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
* @param in The content to send as a stream. The stream will be closed
* after reading all content.
/**
* Asynchronous send of stream content.
* The stream will be closed after reading all content.
*
* @param in The stream content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(InputStream in, Callback callback)
@ -634,10 +667,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
new InputStreamWritingCB(in, callback).iterate();
}
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
* @param in The content to send as a channel. The channel will be closed
* after reading all content.
/**
* Asynchronous send of channel content.
* The channel will be closed after reading all content.
*
* @param in The channel content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(ReadableByteChannel in, Callback callback)
@ -645,9 +679,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
new ReadableByteChannelWritingCB(in, callback).iterate();
}
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
* @param httpContent The content to send
/**
* Asynchronous send of HTTP content.
*
* @param httpContent The HTTP content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(HttpContent httpContent, Callback callback)
@ -671,6 +706,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
continue;
break;
case ERROR:
callback.failed(new EofException(_onError));
return;
@ -678,12 +714,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
callback.failed(new EofException("Closed"));
return;
default:
throw new IllegalStateException();
}
break;
}
ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
if (buffer == null)
buffer = httpContent.getIndirectBuffer();
@ -709,21 +747,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
InputStream in = httpContent.getInputStream();
if ( in!=null )
if (in!=null)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
sendContent(in,callback);
return;
}
throw new IllegalArgumentException("unknown content for "+httpContent);
}
catch(Throwable th)
{
terminate(th);
callback.failed(th);
return;
}
callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
}
public int getBufferSize()
@ -770,16 +808,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
case OPEN:
return true;
case ASYNC:
if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
continue;
return true;
case READY:
return true;
case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
continue;
return false;
case UNREADY:
return false;
@ -788,6 +830,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case CLOSED:
return true;
default:
throw new IllegalStateException();
}
}
}
@ -829,10 +874,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
switch(_state.get())
{
case CLOSED:
// even though a write is not possible, because a close has
// Even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed.
// so fall through
// So fall through
case READY:
try
{
@ -850,13 +895,25 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
}
}
private void close(Closeable resource)
{
try
{
resource.close();
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
@Override
public String toString()
{
return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
}
private abstract class AsyncICB extends IteratingCallback
{
@Override
@ -1021,8 +1078,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
}
/* ------------------------------------------------------------ */
/** An iterating callback that will take content from an
/**
* An iterating callback that will take content from an
* InputStream and write it to the associated {@link HttpChannel}.
* A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
@ -1077,16 +1134,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void onCompleteFailure(Throwable x)
{
super.onCompleteFailure(x);
terminate(x);
_channel.getByteBufferPool().release(_buffer);
try
{
_in.close();
}
catch (IOException e)
{
LOG.ignore(e);
}
HttpOutput.this.close(_in);
super.onCompleteFailure(x);
}
}
@ -1140,16 +1191,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void onCompleteFailure(Throwable x)
{
super.onCompleteFailure(x);
terminate(x);
_channel.getByteBufferPool().release(_buffer);
try
{
_in.close();
}
catch (IOException e)
{
LOG.ignore(e);
}
HttpOutput.this.close(_in);
super.onCompleteFailure(x);
}
}
}