438079 435322 Fixed Iterating Callback fail handling and removed per send instance

This commit is contained in:
Greg Wilkins 2014-06-25 19:19:54 +02:00
parent 45a0690ba7
commit a746d78951
16 changed files with 278 additions and 211 deletions

View File

@ -773,7 +773,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
@Override
public void failed(Throwable failure)
public void onCompleteFailure(Throwable failure)
{
content.failed(failure);
super.failed(failure);
@ -781,7 +781,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// Nothing to do, since we always return false from process().
// Termination is obtained via LastContentCallback.

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -56,6 +57,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
@ -299,6 +301,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
}
}
@Ignore
@Slow
@Test
public void testConnectTimeoutFailsRequest() throws Exception
@ -330,6 +333,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertNotNull(request.getAbortCause());
}
@Ignore
@Slow
@Test
public void testConnectTimeoutIsCancelledByShorterTimeout() throws Exception

View File

@ -82,7 +82,7 @@ public class Flusher
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// We never return Action.SUCCEEDED, so this method is never called.
throw new IllegalStateException();
@ -98,7 +98,7 @@ public class Flusher
}
@Override
public void failed(Throwable x)
public void onCompleteFailure(Throwable x)
{
if (active != null)
active.failed(x);

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpGenerator;
@ -61,6 +62,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpParser _parser;
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
private final SendCallback _sendCallback = new SendCallback();
/* ------------------------------------------------------------ */
@ -421,35 +423,42 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
fillInterested();
}
@Override
public void onClose()
{
if (_sendCallback.isInUse())
{
LOG.warn("Closed with pending write:"+this);
_sendCallback.failed(new EofException("Connection closed"));
}
super.onClose();
}
@Override
public void run()
{
onFillable();
}
@Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{
if (info==null)
new ContentCallback(content,lastContent,callback).iterate();
else
{
// If we are still expecting a 100 continues
if (_channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
new CommitCallback(info,content,lastContent,callback).iterate();
}
// If we are still expecting a 100 continues when we commit
if (info!=null && _channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
_sendCallback.reset(info,content,lastContent,callback);
_sendCallback.iterate();
}
@Override
public void send(ByteBuffer content, boolean lastContent, Callback callback)
{
new ContentCallback(content,lastContent,callback).iterate();
_sendCallback.reset(null,content,lastContent,callback);
_sendCallback.iterate();
}
protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
{
public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
@ -546,25 +555,42 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
private class CommitCallback extends IteratingCallback
private class SendCallback extends IteratingCallback
{
final ByteBuffer _content;
final boolean _lastContent;
final ResponseInfo _info;
final Callback _callback;
ByteBuffer _content;
boolean _lastContent;
ResponseInfo _info;
ByteBuffer _header;
volatile Callback _callback; // TODO is this memory barrier needed?
boolean _shutdownOut;
CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
SendCallback()
{
_info=info;
_content=content;
_lastContent=last;
_callback=callback;
super(true);
}
void reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
{
if (reset())
{
_info=info;
_content=content;
_lastContent=last;
_header=null;
_shutdownOut=false;
_callback=callback;
}
else
throw new WritePendingException();
}
@Override
public Action process() throws Exception
{
if (_callback==null)
throw new IllegalStateException();
ByteBuffer chunk = _chunk;
while (true)
{
@ -582,25 +608,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
case NEED_HEADER:
{
// Look for optimisation to avoid allocating a _header buffer
/*
Cannot use this optimisation unless we work out how not to overwrite data in user passed arrays.
if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
{
// use spare space in content buffer for header buffer
int p=_content.position();
int l=_content.limit();
_content.position(l);
_content.limit(l+_config.getResponseHeaderSize());
_header=_content.slice();
_header.limit(0);
_content.position(p);
_content.limit(l);
}
else
*/
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
@ -647,17 +655,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput();
_shutdownOut=true;
continue;
}
case DONE:
{
if (_header!=null)
{
// don't release header in spare content buffer
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header);
}
releaseHeader();
return Action.SUCCEEDED;
}
case CONTINUE:
@ -672,115 +675,49 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
@Override
protected void completed()
private void releaseHeader()
{
_callback.succeeded();
ByteBuffer h=_header;
_header=null;
if (h!=null)
_bufferPool.release(h);
}
@Override
public void failed(final Throwable x)
protected void onCompleteSuccess()
{
super.failed(x);
failedCallback(_callback,x);
Callback cb=_callback;
_info=null;
_content=null;
_lastContent=false;
releaseHeader();
_callback=null;
if (_shutdownOut)
getEndPoint().shutdownOutput();
cb.succeeded();
}
@Override
public void onCompleteFailure(final Throwable x)
{
Callback cb=_callback;
_info=null;
_content=null;
_lastContent=false;
releaseHeader();
_callback=null;
if (_shutdownOut)
getEndPoint().shutdownOutput();
failedCallback(cb,x);
}
@Override
public String toString()
{
return String.format("SendCB@%x{s=%s,i=%s,cb=%s}",hashCode(),getState(),_info,_callback);
}
}
private class ContentCallback extends IteratingCallback
{
final ByteBuffer _content;
final boolean _lastContent;
final Callback _callback;
ContentCallback(ByteBuffer content, boolean last, Callback callback)
{
_content=content;
_lastContent=last;
_callback=callback;
}
@Override
public Action process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{})@{}",
this,
result,
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
throw new IllegalStateException();
case NEED_CHUNK:
{
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
}
case FLUSH:
{
// Don't write the chunk or the content if this is a HEAD response
if (_channel.getRequest().isHead())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
continue;
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, chunk, _content);
else
getEndPoint().write(this, chunk);
}
else if (BufferUtil.hasContent(_content))
{
getEndPoint().write(this, _content);
}
else
continue;
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput();
continue;
}
case DONE:
{
return Action.SUCCEEDED;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse="+result);
}
}
}
}
@Override
protected void completed()
{
_callback.succeeded();
}
@Override
public void failed(final Throwable x)
{
super.failed(x);
failedCallback(_callback,x);
}
}
@Override
public void abort()

View File

@ -799,7 +799,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private abstract class AsyncICB extends IteratingCallback
{
@Override
protected void completed()
protected void onCompleteSuccess()
{
while(true)
{
@ -828,9 +828,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
public void failed(Throwable e)
public void onCompleteFailure(Throwable e)
{
super.failed(e);
_onError=e;
_channel.getState().onWritePossible();
}
@ -950,9 +949,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
super.completed();
super.onCompleteSuccess();
if (_complete)
closed();
}
@ -1015,9 +1014,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
public void failed(Throwable x)
public void onCompleteFailure(Throwable x)
{
super.failed(x);
super.onCompleteFailure(x);
_channel.getByteBufferPool().release(_buffer);
try
{
@ -1079,9 +1078,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
public void failed(Throwable x)
public void onCompleteFailure(Throwable x)
{
super.failed(x);
super.onCompleteFailure(x);
_channel.getByteBufferPool().release(_buffer);
try
{
@ -1093,5 +1092,4 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
}
}
}

View File

@ -72,7 +72,9 @@ public class HttpConnectionTest
connector.setIdleTimeout(500);
server.addConnector(connector);
server.setHandler(new DumpHandler());
server.addBean(new ErrorHandler());
ErrorHandler eh=new ErrorHandler();
eh.setServer(server);
server.addBean(eh);
server.start();
}

View File

@ -125,6 +125,40 @@ public class LocalConnectorTest
assertThat(response,containsString("pathInfo=/R2"));
}
@Test
public void testManyGETs() throws Exception
{
String response=_connector.getResponses(
"GET /R1 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R2 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R3 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R4 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R5 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R6 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Connection: close\r\n"+
"\r\n");
String r=response;
for (int i=1;i<=6;i++)
{
assertThat(r,containsString("HTTP/1.1 200 OK"));
assertThat(r,containsString("pathInfo=/R"+i));
r=r.substring(r.indexOf("</html>")+8);
}
}
@Test
public void testGETandGET() throws Exception
{

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -63,6 +65,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -960,7 +963,6 @@ public class RequestTest
assertEquals("other", cookies.get(1).getName());
assertEquals("quoted=;value", cookies.get(1).getValue());
cookies.clear();
response=_connector.getResponses(
"GET /other HTTP/1.1\n"+
@ -975,7 +977,8 @@ public class RequestTest
"Connection: close\n"+
"\n"
);
assertTrue(response.startsWith("HTTP/1.1 200 OK"));
assertThat(response,startsWith("HTTP/1.1 200 OK"));
assertThat(response.substring(15),containsString("HTTP/1.1 200 OK"));
assertEquals(4,cookies.size());
assertEquals("name", cookies.get(0).getName());
assertEquals("value", cookies.get(0).getValue());
@ -999,7 +1002,8 @@ public class RequestTest
"Connection: close\n"+
"\n"
);
assertTrue(response.startsWith("HTTP/1.1 200 OK"));
assertThat(response,startsWith("HTTP/1.1 200 OK"));
assertThat(response.substring(15),containsString("HTTP/1.1 200 OK"));
assertEquals(4,cookies.size());
assertEquals("name", cookies.get(0).getName());
assertEquals("value", cookies.get(0).getValue());

View File

@ -218,7 +218,7 @@ public class Flusher
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException();
@ -242,7 +242,7 @@ public class Flusher
}
@Override
public void failed(Throwable x)
public void onCompleteFailure(Throwable x)
{
List<StandardSession.FrameBytes> failed = new ArrayList<>();
synchronized (lock)

View File

@ -78,7 +78,22 @@ public abstract class IteratingCallback implements Callback
FAILED
}
private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE);
private final AtomicReference<State> _state;
protected IteratingCallback()
{
_state = new AtomicReference<>(State.INACTIVE);
}
protected IteratingCallback(boolean needReset)
{
_state = new AtomicReference<>(needReset?State.SUCCEEDED:State.INACTIVE);
}
protected State getState()
{
return _state.get();
}
/**
* Method called by {@link #iterate()} to process the sub task.
@ -101,7 +116,12 @@ public abstract class IteratingCallback implements Callback
/**
* Invoked when the overall task has completed successfully.
*/
protected abstract void completed();
protected abstract void onCompleteSuccess();
/**
* Invoked when the overall task has completely failed.
*/
protected abstract void onCompleteFailure(Throwable x);
/**
* This method must be invoked by applications to start the processing
@ -196,13 +216,13 @@ public abstract class IteratingCallback implements Callback
case SUCCEEDED:
{
// The overall job has completed.
if (completeSuccess())
completed();
completeSuccess();
return true;
}
case FAILED:
{
completeFailure();
// The overall job has failed.
completeFailure(new Exception("process failed"));
return true;
}
default:
@ -265,43 +285,49 @@ public abstract class IteratingCallback implements Callback
* {@code super.failed(Throwable)}.
*/
@Override
public void failed(Throwable x)
public final void failed(Throwable x)
{
completeFailure();
completeFailure(x);
}
private boolean completeSuccess()
protected void completeSuccess()
{
while (true)
{
State current = _state.get();
if (current == State.FAILED)
switch(current)
{
// Success arrived too late, sorry.
return false;
}
else
{
if (_state.compareAndSet(current, State.SUCCEEDED))
return true;
case SUCCEEDED:
case FAILED:
// Already complete!.
return;
default:
if (_state.compareAndSet(current, State.SUCCEEDED))
{
onCompleteSuccess();
return;
}
}
}
}
private void completeFailure()
protected void completeFailure(Throwable x)
{
while (true)
{
State current = _state.get();
if (current == State.SUCCEEDED)
switch(current)
{
// Failed arrived too late, sorry.
return;
}
else
{
if (_state.compareAndSet(current, State.FAILED))
break;
case SUCCEEDED:
case FAILED:
// Already complete!.
return;
default:
if (_state.compareAndSet(current, State.FAILED))
{
onCompleteFailure(x);
return;
}
}
}
}
@ -314,6 +340,23 @@ public abstract class IteratingCallback implements Callback
return _state.get() == State.INACTIVE;
}
/* ------------------------------------------------------------ */
/**
* @return true if the callback is not INACTIVE, FAILED or SUCCEEDED.
*/
public boolean isInUse()
{
switch(_state.get())
{
case INACTIVE:
case FAILED:
case SUCCEEDED:
return false;
default:
return true;
}
}
/**
* @return whether this callback has failed
*/
@ -330,6 +373,36 @@ public abstract class IteratingCallback implements Callback
return _state.get() == State.SUCCEEDED;
}
/* ------------------------------------------------------------ */
/** Reset the callback
* <p>A callback can only be reset to INACTIVE from the SUCCEEDED or FAILED states or if it is already INACTIVE.
* @return True if the reset was successful
*/
public boolean reset()
{
while (true)
{
switch(_state.get())
{
case INACTIVE:
return true;
case SUCCEEDED:
if (_state.compareAndSet(State.SUCCEEDED, State.INACTIVE))
return true;
break;
case FAILED:
if (_state.compareAndSet(State.FAILED, State.INACTIVE))
return true;
break;
default:
return false;
}
}
}
@Override
public String toString()
{

View File

@ -48,15 +48,14 @@ public abstract class IteratingNestedCallback extends IteratingCallback
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
_callback.succeeded();
}
@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable x)
{
super.failed(x);
_callback.failed(x);
}

View File

@ -262,16 +262,14 @@ public class IteratingCallbackTest
int processed=0;
@Override
protected void completed()
protected void onCompleteSuccess()
{
completed.countDown();
}
@Override
public void failed(Throwable x)
public void onCompleteFailure(Throwable x)
{
super.failed(x);
completed.countDown();
}

View File

@ -386,11 +386,17 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
protected void onCompleteFailure(Throwable x)
{
// TODO This IteratingCallback never completes???
}
@Override
public void writeSuccess()
{

View File

@ -368,11 +368,17 @@ public abstract class CompressExtension extends AbstractExtension
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
protected void onCompleteFailure(Throwable x)
{
// TODO This IteratingCallback never completes???
}
@Override
public void writeSuccess()
{

View File

@ -150,11 +150,17 @@ public class FragmentExtension extends AbstractExtension
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
protected void onCompleteFailure(Throwable x)
{
// TODO This IteratingCallback never completes???
}
@Override
public void writeSuccess()
{

View File

@ -91,13 +91,13 @@ public class FrameFlusher
}
@Override
protected void completed()
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
public void failed(Throwable x)
public void onCompleteFailure(Throwable x)
{
for (FrameEntry entry : entries)
{