442086 - Review HttpOutput blocking writes.

Reviewed blocking writes call sites and handled exceptions in the same
way for all of them, calling HttpChannel.abort(Throwable).

Modified HttpChannel.abort() to take the failure as parameter, so that
subclasses may inspect the failure and decide what to do.
This commit is contained in:
Simone Bordet 2014-08-19 18:37:53 +02:00
parent d4f140ff65
commit 8e62a50500
9 changed files with 118 additions and 79 deletions

View File

@ -129,7 +129,7 @@ public class HttpTransportOverFCGI implements HttpTransport
}
@Override
public void abort()
public void abort(Throwable failure)
{
aborted = true;
}

View File

@ -25,7 +25,6 @@ import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import javax.servlet.http.HttpServletRequest;
@ -45,7 +44,6 @@ import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpChannelState.Action;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
@ -411,7 +409,7 @@ public class HttpChannel implements Runnable
}
else if (isCommitted())
{
_transport.abort();
_transport.abort(x);
if (!(x instanceof EofException))
LOG.warn("Could not send response error 500: "+x);
}
@ -551,6 +549,13 @@ public class HttpChannel implements Runnable
blocker.block();
return committing;
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
public boolean isCommitted()
@ -580,7 +585,6 @@ public class HttpChannel implements Runnable
return _connector.getScheduler();
}
/* ------------------------------------------------------------ */
/**
* @return true if the HttpChannel can efficiently use direct buffer (typically this means it is not over SSL or a multiplexed protocol)
*/
@ -590,12 +594,16 @@ public class HttpChannel implements Runnable
}
/**
* If a write or similar to this channel fails this method should be called. The standard implementation
* is to call {@link HttpTransport#abort()}
* If a write or similar operation to this channel fails,
* then this method should be called.
* <p />
* The standard implementation calls {@link HttpTransport#abort(Throwable)}.
*
* @param failure the failure that caused the abort.
*/
public void abort()
public void abort(Throwable failure)
{
_transport.abort();
_transport.abort(failure);
}
private class CommitCallback implements Callback

View File

@ -31,13 +31,11 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.EndPoint;
import org.omg.stub.java.rmi._Remote_Stub;
/**
* A HttpChannel customized to be transported over the HTTP/1 protocol
@ -304,9 +302,9 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
}
@Override
public void abort()
public void abort(Throwable failure)
{
super.abort();
super.abort(failure);
_httpConnection._generator.setPersistent(false);
}

View File

@ -628,7 +628,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
public void abort()
public void abort(Throwable failure)
{
// Do a direct close of the output, as this may indicate to a client that the
// response is bad either with RST or by abnormal completion of chunked response.

View File

@ -24,7 +24,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
@ -119,11 +122,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected void write(ByteBuffer content, boolean complete) throws IOException
{
try (Blocker blocker=_writeblock.acquire())
try (Blocker blocker = _writeblock.acquire())
{
write(content,complete,blocker);
write(content, complete, blocker);
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
throw failure;
}
}
protected void write(ByteBuffer content, boolean complete, Callback callback)
@ -140,28 +150,34 @@ public class HttpOutput extends ServletOutputStream implements Runnable
switch (state)
{
case CLOSED:
{
break loop;
}
case UNREADY:
{
if (_state.compareAndSet(state,OutputState.ERROR))
_writeListener.onError(_onError==null?new EofException("Async close"):_onError);
continue;
}
default:
{
if (_state.compareAndSet(state,OutputState.CLOSED))
{
try
{
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
break loop;
}
catch(IOException e)
catch (IOException x)
{
LOG.debug(e);
_channel.abort();
// Ignore it, it's been already logged in write().
}
finally
{
releaseBuffer();
}
releaseBuffer();
return;
}
}
}
}
}
@ -189,10 +205,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
_channel.getResponse().closeOutput();
}
catch(IOException e)
catch(Throwable e)
{
LOG.debug(e);
_channel.abort();
_channel.abort(e);
}
releaseBuffer();
return;
@ -367,11 +383,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
write(view,complete);
}
else if (complete)
write(BufferUtil.EMPTY_BUFFER,complete);
{
write(BufferUtil.EMPTY_BUFFER,true);
}
if (complete)
closed();
}
public void write(ByteBuffer buffer) throws IOException
@ -424,7 +441,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (len>0)
write(buffer, complete);
else if (complete)
write(BufferUtil.EMPTY_BUFFER,complete);
write(BufferUtil.EMPTY_BUFFER, true);
if (complete)
closed();
@ -449,11 +466,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
try(Blocker blocker=_writeblock.acquire())
{
write(_aggregate, complete, blocker);
blocker.block();
}
write(_aggregate, complete);
if (complete)
closed();
}
@ -512,11 +525,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ByteBuffer content) throws IOException
{
try(Blocker blocker=_writeblock.acquire())
{
write(content,true,blocker);
blocker.block();
}
write(content, true);
}
/* ------------------------------------------------------------ */
@ -528,9 +537,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try(Blocker blocker=_writeblock.acquire())
{
new InputStreamWritingCB(in,blocker).iterate();
new InputStreamWritingCB(in, blocker).iterate();
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
throw failure;
}
}
/* ------------------------------------------------------------ */
@ -542,9 +558,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try(Blocker blocker=_writeblock.acquire())
{
new ReadableByteChannelWritingCB(in,blocker).iterate();
new ReadableByteChannelWritingCB(in, blocker).iterate();
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
throw failure;
}
}
@ -557,9 +580,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
try(Blocker blocker=_writeblock.acquire())
{
sendContent(content,blocker);
sendContent(content, blocker);
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
_channel.abort(failure);
throw failure;
}
}
/* ------------------------------------------------------------ */
@ -569,7 +599,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ByteBuffer content, final Callback callback)
{
write(content,true,new Callback()
write(content, true, new Callback()
{
@Override
public void succeeded()
@ -594,7 +624,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(InputStream in, Callback callback)
{
new InputStreamWritingCB(in,callback).iterate();
new InputStreamWritingCB(in, callback).iterate();
}
/* ------------------------------------------------------------ */
@ -605,7 +635,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ReadableByteChannel in, Callback callback)
{
new ReadableByteChannelWritingCB(in,callback).iterate();
new ReadableByteChannelWritingCB(in, callback).iterate();
}
/* ------------------------------------------------------------ */
@ -751,10 +781,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
case CLOSED:
case ERROR:
{
_onError=null;
break loop;
}
default:
{
if (_state.compareAndSet(state, OutputState.ERROR))
{
Throwable th=_onError;
@ -763,10 +795,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
LOG.debug("onError",th);
_writeListener.onError(th);
close();
break loop;
}
}
}
continue;
}
@ -775,6 +806,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
case READY:
case CLOSED:
{
// even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed.
@ -785,11 +817,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
catch (Throwable e)
{
_onError=e;
_onError = e;
}
break;
}
default:
{
break;
}
}
}
}
@ -838,8 +873,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_channel.getState().onWritePossible();
}
}
private class AsyncFlush extends AsyncICB
{
protected volatile boolean _flushed;
@ -869,8 +903,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
}
private class AsyncWrite extends AsyncICB
{
private final ByteBuffer _buffer;
@ -959,11 +991,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (_complete)
closed();
}
}
/* ------------------------------------------------------------ */
/** An iterating callback that will take content from an
* InputStream and write it to the associated {@link HttpChannel}.
@ -1031,7 +1060,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
LOG.ignore(e);
}
}
}
/* ------------------------------------------------------------ */

View File

@ -32,10 +32,18 @@ public interface HttpTransport
void completed();
/* ------------------------------------------------------------ */
/** Abort transport.
* This is called when an error response needs to be sent, but the response is already committed.
* Abort to should terminate the transport in a way that can indicate abnormal response to the client.
/**
* Aborts this transport.
* <p />
* This method should terminate the transport in a way that
* can indicate an abnormal response to the client, for example
* by abruptly close the connection.
* <p />
* This method is called when an error response needs to be sent,
* but the response is already committed, or when a write failure
* is detected.
*
* @param failure the failure that caused the abort.
*/
void abort();
void abort(Throwable failure);
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.server;
import static org.eclipse.jetty.util.QuotedStringTokenizer.isQuoted;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.channels.IllegalSelectorException;
@ -30,7 +28,6 @@ import java.util.Enumeration;
import java.util.Iterator;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.Cookie;
@ -300,7 +297,9 @@ public class Response implements HttpServletResponse
boolean quote_path = has_path && isQuoteNeededForCookie(path);
// Upgrade the version if we have a comment or we need to quote value/path/domain or if they were already quoted
if (version==0 && ( comment!=null || quote_name || quote_value || quote_domain || quote_path || isQuoted(name) || isQuoted(value) || isQuoted(path) || isQuoted(domain)))
if (version==0 && ( comment!=null || quote_name || quote_value || quote_domain || quote_path ||
QuotedStringTokenizer.isQuoted(name) || QuotedStringTokenizer.isQuoted(value) ||
QuotedStringTokenizer.isQuoted(path) || QuotedStringTokenizer.isQuoted(domain)))
version=1;
// Append version
@ -557,7 +556,7 @@ public class Response implements HttpServletResponse
switch(code)
{
case -1:
_channel.abort();
_channel.abort(new IOException());
return;
case 102:
sendProcessing();

View File

@ -18,13 +18,6 @@
package org.eclipse.jetty.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
@ -36,7 +29,6 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Locale;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.Cookie;
@ -65,6 +57,13 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ResponseTest
{
private Server _server;
@ -102,9 +101,8 @@ public class ResponseTest
}
@Override
public void abort()
public void abort(Throwable failure)
{
}
}, input);

View File

@ -419,7 +419,7 @@ public class HttpTransportOverSPDY implements HttpTransport
}
@Override
public void abort()
public void abort(Throwable failure)
{
// TODO close the stream in a way to indicate an incomplete response?
}