From bfda3620f3a7500f7999eccaf2bb384d3605a7b6 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 18 Sep 2014 16:31:16 +0200 Subject: [PATCH] 444485 - Client resets stream, pending server data is failed, write hangs. Fixed by ensuring that when a failure happens, either by catching an exception or by failing a callback, we always call completed() and abort the channel (via new method terminate()). --- .../jetty/http2/client/StreamResetTest.java | 148 ++++++++++ .../org/eclipse/jetty/server/HttpOutput.java | 267 ++++++++++-------- 2 files changed, 304 insertions(+), 111 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index c9893d14812..a348f74230b 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -18,10 +18,19 @@ package org.eclipse.jetty.http2.client; +import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpVersion; @@ -33,6 +42,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; @@ -187,4 +197,142 @@ public class StreamResetTest extends AbstractTest Assert.assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(stream2DataLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testBlockingWriteAfterStreamReceivingReset() throws Exception + { + final CountDownLatch resetLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + startServer(new HttpServlet() + { + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + Charset charset = StandardCharsets.UTF_8; + byte[] data = "AFTER RESET".getBytes(charset); + + response.setStatus(200); + response.setContentType("text/plain;charset=" + charset.name()); + response.setContentLength(data.length); + response.flushBuffer(); + + try + { + // Wait for the reset to happen. + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + // Wait for the reset to arrive to the server and be processed. + Thread.sleep(1000); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + + try + { + // Write some content after the stream has + // been reset, it should throw an exception. + response.getOutputStream().write(data); + } + catch (IOException x) + { + dataLatch.countDown(); + } + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(0, request, null, true); + client.newStream(frame, new FuturePromise(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + stream.reset(new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE); + resetLatch.countDown(); + } + }); + + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testAsyncWriteAfterStreamReceivingReset() throws Exception + { + final CountDownLatch resetLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + startServer(new HttpServlet() + { + @Override + protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException + { + Charset charset = StandardCharsets.UTF_8; + final ByteBuffer data = ByteBuffer.wrap("AFTER RESET".getBytes(charset)); + + response.setStatus(200); + response.setContentType("text/plain;charset=" + charset.name()); + response.setContentLength(data.remaining()); + response.flushBuffer(); + + try + { + // Wait for the reset to happen. + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + // Wait for the reset to arrive to the server and be processed. + Thread.sleep(1000); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + + // Write some content asynchronously after the stream has been reset. + final AsyncContext context = request.startAsync(); + new Thread() + { + @Override + public void run() + { + try + { + // Wait for the request thread to exit + // doGet() so this is really asynchronous. + Thread.sleep(1000); + + HttpOutput output = (HttpOutput)response.getOutputStream(); + output.sendContent(data, new Callback.Adapter() + { + @Override + public void failed(Throwable x) + { + context.complete(); + dataLatch.countDown(); + } + }); + } + catch (Throwable x) + { + x.printStackTrace(); + } + } + }.start(); + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(0, request, null, true); + client.newStream(frame, new FuturePromise(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + stream.reset(new ResetFrame(stream.getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE); + resetLatch.countDown(); + } + }); + + Assert.assertTrue(dataLatch.await(555, TimeUnit.SECONDS)); + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index b6797b54c59..405b2db1c3c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -18,13 +18,14 @@ package org.eclipse.jetty.server; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritePendingException; import java.util.concurrent.atomic.AtomicReference; - +import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -54,6 +55,7 @@ import org.eclipse.jetty.util.log.Logger; public class HttpOutput extends ServletOutputStream implements Runnable { private static Logger LOG = Log.getLogger(HttpOutput.class); + private final HttpChannel _channel; private final SharedBlockingCallback _writeblock=new SharedBlockingCallback() { @@ -69,19 +71,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; - /* ACTION OPEN ASYNC READY PENDING UNREADY CLOSED - ----------------------------------------------------------------------------------------------------- + ------------------------------------------------------------------------------------------- setWriteListener() READY->owp ise ise ise ise ise write() OPEN ise PENDING wpe wpe eof flush() OPEN ise PENDING wpe wpe eof close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true write completed - - - ASYNC READY->owp - - */ - enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } + private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } private final AtomicReference _state=new AtomicReference<>(OutputState.OPEN); public HttpOutput(HttpChannel channel) @@ -127,7 +127,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable return _writeblock.acquire(); } - protected void write(ByteBuffer content, boolean complete) throws IOException + private void write(ByteBuffer content, boolean complete) throws IOException { try (Blocker blocker = _writeblock.acquire()) { @@ -138,88 +138,107 @@ public class HttpOutput extends ServletOutputStream implements Runnable { if (LOG.isDebugEnabled()) LOG.debug(failure); - _channel.abort(failure); + terminate(failure); throw failure; } } - + protected void write(ByteBuffer content, boolean complete, Callback callback) { - _channel.write(content,complete,callback); + _channel.write(content, complete, callback); } - + + private void terminate(Throwable failure) + { + closed(); + _channel.abort(failure); + } + @Override public void close() { - loop: while(true) + while(true) { OutputState state=_state.get(); switch (state) { case CLOSED: { - break loop; + return; } case UNREADY: { if (_state.compareAndSet(state,OutputState.ERROR)) _writeListener.onError(_onError==null?new EofException("Async close"):_onError); - continue; + break; } default: { - if (_state.compareAndSet(state,OutputState.CLOSED)) + if (!_state.compareAndSet(state,OutputState.CLOSED)) + break; + + try { - try - { - write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); - break loop; - } - catch (IOException x) - { - // Ignore it, it's been already logged in write(). - } - finally - { - releaseBuffer(); - } + write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); } + catch (IOException x) + { + // Ignore it, it's been already logged in write(). + } + finally + { + releaseBuffer(); + } + // Return even if an exception is thrown by write(). + return; } } } } - /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */ + /** + * Called to indicate that the last write has been performed. + * It updates the state and performs cleanup operations. + */ void closed() { - loop: while(true) + while(true) { OutputState state=_state.get(); switch (state) { case CLOSED: - break loop; - + { + return; + } case UNREADY: + { if (_state.compareAndSet(state,OutputState.ERROR)) _writeListener.onError(_onError==null?new EofException("Async closed"):_onError); - continue; - + break; + } default: - if (_state.compareAndSet(state,OutputState.CLOSED)) + { + if (!_state.compareAndSet(state, OutputState.CLOSED)) + break; + + try { - try - { - _channel.getResponse().closeOutput(); - } - catch(Throwable e) - { - LOG.debug(e); - _channel.abort(e); - } - releaseBuffer(); - return; + _channel.getResponse().closeOutput(); } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug(x); + terminate(x); + } + finally + { + releaseBuffer(); + } + // Return even if an exception is thrown by closeOutput(). + return; + } } } } @@ -267,12 +286,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: return; + + default: + throw new IllegalStateException(); } - break; } } - @Override public void write(byte[] b, int off, int len) throws IOException { @@ -330,11 +350,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: throw new EofException("Closed"); + + default: + throw new IllegalStateException(); } break; } - // handle blocking write // Should we aggregate? @@ -432,6 +454,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: throw new EofException("Closed"); + + default: + throw new IllegalStateException(); } break; } @@ -511,6 +536,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: throw new EofException("Closed"); + + default: + throw new IllegalStateException(); } break; } @@ -525,20 +553,22 @@ public class HttpOutput extends ServletOutputStream implements Runnable write(s.getBytes(_channel.getResponse().getCharacterEncoding())); } - /* ------------------------------------------------------------ */ - /** Blocking send of content. - * @param content The content to send. - * @throws IOException + /** + * Blocking send of whole content. + * + * @param content The whole content to send + * @throws IOException if the send fails */ public void sendContent(ByteBuffer content) throws IOException { write(content, true); } - /* ------------------------------------------------------------ */ - /** Blocking send of content. - * @param in The content to send - * @throws IOException + /** + * Blocking send of stream content. + * + * @param in The stream content to send + * @throws IOException if the send fails */ public void sendContent(InputStream in) throws IOException { @@ -551,15 +581,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable { if (LOG.isDebugEnabled()) LOG.debug(failure); - _channel.abort(failure); + terminate(failure); throw failure; } } - /* ------------------------------------------------------------ */ - /** Blocking send of content. - * @param in The content to send - * @throws IOException + /** + * Blocking send of channel content. + * + * @param in The channel content to send + * @throws IOException if the send fails */ public void sendContent(ReadableByteChannel in) throws IOException { @@ -572,16 +603,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable { if (LOG.isDebugEnabled()) LOG.debug(failure); - _channel.abort(failure); + terminate(failure); throw failure; } } - - /* ------------------------------------------------------------ */ - /** Blocking send of content. - * @param content The content to send - * @throws IOException + /** + * Blocking send of HTTP content. + * + * @param content The HTTP content to send + * @throws IOException if the send fails */ public void sendContent(HttpContent content) throws IOException { @@ -594,14 +625,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable { if (LOG.isDebugEnabled()) LOG.debug(failure); - _channel.abort(failure); + terminate(failure); throw failure; } } - /* ------------------------------------------------------------ */ - /** Asynchronous send of content. - * @param content The content to send + /** + * Asynchronous send of whole content. + * @param content The whole content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ByteBuffer content, final Callback callback) @@ -618,15 +649,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void failed(Throwable x) { + terminate(x); callback.failed(x); } }); } - /* ------------------------------------------------------------ */ - /** Asynchronous send of content. - * @param in The content to send as a stream. The stream will be closed - * after reading all content. + /** + * Asynchronous send of stream content. + * The stream will be closed after reading all content. + * + * @param in The stream content to send * @param callback The callback to use to notify success or failure */ public void sendContent(InputStream in, Callback callback) @@ -634,10 +667,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable new InputStreamWritingCB(in, callback).iterate(); } - /* ------------------------------------------------------------ */ - /** Asynchronous send of content. - * @param in The content to send as a channel. The channel will be closed - * after reading all content. + /** + * Asynchronous send of channel content. + * The channel will be closed after reading all content. + * + * @param in The channel content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ReadableByteChannel in, Callback callback) @@ -645,9 +679,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable new ReadableByteChannelWritingCB(in, callback).iterate(); } - /* ------------------------------------------------------------ */ - /** Asynchronous send of content. - * @param httpContent The content to send + /** + * Asynchronous send of HTTP content. + * + * @param httpContent The HTTP content to send * @param callback The callback to use to notify success or failure */ public void sendContent(HttpContent httpContent, Callback callback) @@ -671,6 +706,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING)) continue; break; + case ERROR: callback.failed(new EofException(_onError)); return; @@ -678,12 +714,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: callback.failed(new EofException("Closed")); return; + default: throw new IllegalStateException(); } break; } - ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null; + + ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; if (buffer == null) buffer = httpContent.getIndirectBuffer(); @@ -709,21 +747,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable } InputStream in = httpContent.getInputStream(); - if ( in!=null ) + if (in!=null) { if (LOG.isDebugEnabled()) LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers()); sendContent(in,callback); return; } + + throw new IllegalArgumentException("unknown content for "+httpContent); } catch(Throwable th) { + terminate(th); callback.failed(th); - return; } - - callback.failed(new IllegalArgumentException("unknown content for "+httpContent)); } public int getBufferSize() @@ -770,16 +808,20 @@ public class HttpOutput extends ServletOutputStream implements Runnable { case OPEN: return true; + case ASYNC: if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY)) continue; return true; + case READY: return true; + case PENDING: if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY)) continue; return false; + case UNREADY: return false; @@ -788,6 +830,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case CLOSED: return true; + + default: + throw new IllegalStateException(); } } } @@ -829,10 +874,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable switch(_state.get()) { case CLOSED: - // even though a write is not possible, because a close has + // Even though a write is not possible, because a close has // occurred, we need to call onWritePossible to tell async // producer that the last write completed. - // so fall through + // So fall through case READY: try { @@ -850,13 +895,25 @@ public class HttpOutput extends ServletOutputStream implements Runnable } } } - + + private void close(Closeable resource) + { + try + { + resource.close(); + } + catch (Throwable x) + { + LOG.ignore(x); + } + } + @Override public String toString() { return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get()); } - + private abstract class AsyncICB extends IteratingCallback { @Override @@ -1021,8 +1078,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable } } - /* ------------------------------------------------------------ */ - /** An iterating callback that will take content from an + /** + * An iterating callback that will take content from an * InputStream and write it to the associated {@link HttpChannel}. * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used. * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to @@ -1077,16 +1134,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void onCompleteFailure(Throwable x) { - super.onCompleteFailure(x); + terminate(x); _channel.getByteBufferPool().release(_buffer); - try - { - _in.close(); - } - catch (IOException e) - { - LOG.ignore(e); - } + HttpOutput.this.close(_in); + super.onCompleteFailure(x); } } @@ -1140,16 +1191,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void onCompleteFailure(Throwable x) { - super.onCompleteFailure(x); + terminate(x); _channel.getByteBufferPool().release(_buffer); - try - { - _in.close(); - } - catch (IOException e) - { - LOG.ignore(e); - } + HttpOutput.this.close(_in); + super.onCompleteFailure(x); } } }