Merge branch 'master' into pathresource

This commit is contained in:
Joakim Erdfelt 2014-06-26 08:56:43 -07:00
commit 56751dfa72
19 changed files with 286 additions and 234 deletions

View File

@ -773,15 +773,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
} }
@Override @Override
public void failed(Throwable failure) public void onCompleteFailure(Throwable failure)
{ {
content.failed(failure); content.failed(failure);
super.failed(failure);
anyToFailure(failure); anyToFailure(failure);
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// Nothing to do, since we always return false from process(). // Nothing to do, since we always return false from process().
// Termination is obtained via LastContentCallback. // Termination is obtained via LastContentCallback.

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -56,6 +57,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class HttpClientTimeoutTest extends AbstractHttpClientServerTest public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
@ -299,6 +301,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
} }
} }
@Ignore
@Slow @Slow
@Test @Test
public void testConnectTimeoutFailsRequest() throws Exception public void testConnectTimeoutFailsRequest() throws Exception
@ -330,6 +333,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertNotNull(request.getAbortCause()); Assert.assertNotNull(request.getAbortCause());
} }
@Ignore
@Slow @Slow
@Test @Test
public void testConnectTimeoutIsCancelledByShorterTimeout() throws Exception public void testConnectTimeoutIsCancelledByShorterTimeout() throws Exception

View File

@ -82,7 +82,7 @@ public class Flusher
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// We never return Action.SUCCEEDED, so this method is never called. // We never return Action.SUCCEEDED, so this method is never called.
throw new IllegalStateException(); throw new IllegalStateException();
@ -98,7 +98,7 @@ public class Flusher
} }
@Override @Override
public void failed(Throwable x) public void onCompleteFailure(Throwable x)
{ {
if (active != null) if (active != null)
active.failed(x); active.failed(x);
@ -111,8 +111,6 @@ public class Flusher
break; break;
result.failed(x); result.failed(x);
} }
super.failed(x);
} }
} }

View File

@ -23,9 +23,9 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -281,8 +281,12 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable, H
_response.getHttpOutput().reopen(); _response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.REQUEST); _request.setDispatcherType(DispatcherType.REQUEST);
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) List<HttpConfiguration.Customizer> customizers = _configuration.getCustomizers();
if (!customizers.isEmpty())
{
for (HttpConfiguration.Customizer customizer : customizers)
customizer.customize(getConnector(), _configuration, _request); customizer.customize(getConnector(), _configuration, _request);
}
getServer().handle(this); getServer().handle(this);
break; break;

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator;
@ -61,6 +62,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpParser _parser; private final HttpParser _parser;
private volatile ByteBuffer _requestBuffer = null; private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null; private volatile ByteBuffer _chunk = null;
private final SendCallback _sendCallback = new SendCallback();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -421,35 +423,42 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
fillInterested(); fillInterested();
} }
@Override
public void onClose()
{
if (_sendCallback.isInUse())
{
LOG.warn("Closed with pending write:"+this);
_sendCallback.failed(new EofException("Connection closed"));
}
super.onClose();
}
@Override @Override
public void run() public void run()
{ {
onFillable(); onFillable();
} }
@Override @Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{ {
if (info==null) // If we are still expecting a 100 continues when we commit
new ContentCallback(content,lastContent,callback).iterate(); if (info!=null && _channel.isExpecting100Continue())
else
{
// If we are still expecting a 100 continues
if (_channel.isExpecting100Continue())
// then we can't be persistent // then we can't be persistent
_generator.setPersistent(false); _generator.setPersistent(false);
new CommitCallback(info,content,lastContent,callback).iterate();
} _sendCallback.reset(info,content,lastContent,callback);
_sendCallback.iterate();
} }
@Override @Override
public void send(ByteBuffer content, boolean lastContent, Callback callback) public void send(ByteBuffer content, boolean lastContent, Callback callback)
{ {
new ContentCallback(content,lastContent,callback).iterate(); _sendCallback.reset(null,content,lastContent,callback);
_sendCallback.iterate();
} }
protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer> protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
{ {
public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input) public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
@ -546,25 +555,43 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
} }
private class CommitCallback extends IteratingCallback private class SendCallback extends IteratingCallback
{ {
final ByteBuffer _content; private ResponseInfo _info;
final boolean _lastContent; private ByteBuffer _content;
final ResponseInfo _info; private boolean _lastContent;
final Callback _callback; private Callback _callback;
ByteBuffer _header; private ByteBuffer _header;
private boolean _shutdownOut;
CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback) private SendCallback()
{
super(true);
}
private void reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
{
if (reset())
{ {
_info = info; _info = info;
_content = content; _content = content;
_lastContent = last; _lastContent = last;
_callback = callback; _callback = callback;
_header = null;
_shutdownOut = false;
}
else
{
callback.failed(new WritePendingException());
}
} }
@Override @Override
public Action process() throws Exception public Action process() throws Exception
{ {
if (_callback==null)
throw new IllegalStateException();
ByteBuffer chunk = _chunk; ByteBuffer chunk = _chunk;
while (true) while (true)
{ {
@ -582,25 +609,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{ {
case NEED_HEADER: case NEED_HEADER:
{ {
// Look for optimisation to avoid allocating a _header buffer
/*
Cannot use this optimisation unless we work out how not to overwrite data in user passed arrays.
if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
{
// use spare space in content buffer for header buffer
int p=_content.position();
int l=_content.limit();
_content.position(l);
_content.limit(l+_config.getResponseHeaderSize());
_header=_content.slice();
_header.limit(0);
_content.position(p);
_content.limit(l);
}
else
*/
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT); _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue; continue;
} }
case NEED_CHUNK: case NEED_CHUNK:
@ -647,17 +656,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
case SHUTDOWN_OUT: case SHUTDOWN_OUT:
{ {
getEndPoint().shutdownOutput(); _shutdownOut=true;
continue; continue;
} }
case DONE: case DONE:
{ {
if (_header!=null) releaseHeader();
{
// don't release header in spare content buffer
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header);
}
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
case CONTINUE: case CONTINUE:
@ -672,116 +676,40 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
} }
@Override private void releaseHeader()
protected void completed()
{ {
ByteBuffer h=_header;
_header=null;
if (h!=null)
_bufferPool.release(h);
}
@Override
protected void onCompleteSuccess()
{
releaseHeader();
_callback.succeeded(); _callback.succeeded();
} if (_shutdownOut)
@Override
public void failed(final Throwable x)
{
super.failed(x);
failedCallback(_callback,x);
}
}
private class ContentCallback extends IteratingCallback
{
final ByteBuffer _content;
final boolean _lastContent;
final Callback _callback;
ContentCallback(ByteBuffer content, boolean last, Callback callback)
{
_content=content;
_lastContent=last;
_callback=callback;
}
@Override
public Action process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{})@{}",
this,
result,
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_HEADER:
throw new IllegalStateException();
case NEED_CHUNK:
{
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
continue;
}
case FLUSH:
{
// Don't write the chunk or the content if this is a HEAD response
if (_channel.getRequest().isHead())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
continue;
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(_content))
getEndPoint().write(this, chunk, _content);
else
getEndPoint().write(this, chunk);
}
else if (BufferUtil.hasContent(_content))
{
getEndPoint().write(this, _content);
}
else
continue;
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
getEndPoint().shutdownOutput(); getEndPoint().shutdownOutput();
continue;
}
case DONE:
{
return Action.SUCCEEDED;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse="+result);
}
}
}
} }
@Override @Override
protected void completed() public void onCompleteFailure(final Throwable x)
{ {
_callback.succeeded(); releaseHeader();
}
@Override
public void failed(final Throwable x)
{
super.failed(x);
failedCallback(_callback,x); failedCallback(_callback,x);
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public String toString()
{
return String.format("SendCB@%x{s=%s,i=%s,cb=%s}",hashCode(),getState(),_info,_callback);
} }
} }
@Override @Override
public void abort() public void abort()
{ {

View File

@ -799,7 +799,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private abstract class AsyncICB extends IteratingCallback private abstract class AsyncICB extends IteratingCallback
{ {
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
while(true) while(true)
{ {
@ -828,9 +828,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
public void failed(Throwable e) public void onCompleteFailure(Throwable e)
{ {
super.failed(e);
_onError=e; _onError=e;
_channel.getState().onWritePossible(); _channel.getState().onWritePossible();
} }
@ -950,9 +949,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
super.completed(); super.onCompleteSuccess();
if (_complete) if (_complete)
closed(); closed();
} }
@ -1015,9 +1014,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
public void failed(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.failed(x); super.onCompleteFailure(x);
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
try try
{ {
@ -1079,9 +1078,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
@Override @Override
public void failed(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.failed(x); super.onCompleteFailure(x);
_channel.getByteBufferPool().release(_buffer); _channel.getByteBufferPool().release(_buffer);
try try
{ {
@ -1093,5 +1092,4 @@ public class HttpOutput extends ServletOutputStream implements Runnable
} }
} }
} }
} }

View File

@ -72,7 +72,9 @@ public class HttpConnectionTest
connector.setIdleTimeout(500); connector.setIdleTimeout(500);
server.addConnector(connector); server.addConnector(connector);
server.setHandler(new DumpHandler()); server.setHandler(new DumpHandler());
server.addBean(new ErrorHandler()); ErrorHandler eh=new ErrorHandler();
eh.setServer(server);
server.addBean(eh);
server.start(); server.start();
} }

View File

@ -125,6 +125,40 @@ public class LocalConnectorTest
assertThat(response,containsString("pathInfo=/R2")); assertThat(response,containsString("pathInfo=/R2"));
} }
@Test
public void testManyGETs() throws Exception
{
String response=_connector.getResponses(
"GET /R1 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R2 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R3 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R4 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R5 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"\r\n"+
"GET /R6 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Connection: close\r\n"+
"\r\n");
String r=response;
for (int i=1;i<=6;i++)
{
assertThat(r,containsString("HTTP/1.1 200 OK"));
assertThat(r,containsString("pathInfo=/R"+i));
r=r.substring(r.indexOf("</html>")+8);
}
}
@Test @Test
public void testGETandGET() throws Exception public void testGETandGET() throws Exception
{ {

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -63,6 +65,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog; import org.eclipse.jetty.util.log.StdErrLog;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -960,7 +963,6 @@ public class RequestTest
assertEquals("other", cookies.get(1).getName()); assertEquals("other", cookies.get(1).getName());
assertEquals("quoted=;value", cookies.get(1).getValue()); assertEquals("quoted=;value", cookies.get(1).getValue());
cookies.clear(); cookies.clear();
response=_connector.getResponses( response=_connector.getResponses(
"GET /other HTTP/1.1\n"+ "GET /other HTTP/1.1\n"+
@ -975,7 +977,8 @@ public class RequestTest
"Connection: close\n"+ "Connection: close\n"+
"\n" "\n"
); );
assertTrue(response.startsWith("HTTP/1.1 200 OK")); assertThat(response,startsWith("HTTP/1.1 200 OK"));
assertThat(response.substring(15),containsString("HTTP/1.1 200 OK"));
assertEquals(4,cookies.size()); assertEquals(4,cookies.size());
assertEquals("name", cookies.get(0).getName()); assertEquals("name", cookies.get(0).getName());
assertEquals("value", cookies.get(0).getValue()); assertEquals("value", cookies.get(0).getValue());
@ -999,7 +1002,8 @@ public class RequestTest
"Connection: close\n"+ "Connection: close\n"+
"\n" "\n"
); );
assertTrue(response.startsWith("HTTP/1.1 200 OK")); assertThat(response,startsWith("HTTP/1.1 200 OK"));
assertThat(response.substring(15),containsString("HTTP/1.1 200 OK"));
assertEquals(4,cookies.size()); assertEquals(4,cookies.size());
assertEquals("name", cookies.get(0).getName()); assertEquals("name", cookies.get(0).getName());
assertEquals("value", cookies.get(0).getValue()); assertEquals("value", cookies.get(0).getValue());

View File

@ -218,7 +218,7 @@ public class Flusher
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// will never be called as process always returns SCHEDULED or IDLE // will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException(); throw new IllegalStateException();
@ -242,7 +242,7 @@ public class Flusher
} }
@Override @Override
public void failed(Throwable x) public void onCompleteFailure(Throwable x)
{ {
List<StandardSession.FrameBytes> failed = new ArrayList<>(); List<StandardSession.FrameBytes> failed = new ArrayList<>();
synchronized (lock) synchronized (lock)
@ -261,7 +261,6 @@ public class Flusher
// Notify outside the synchronized block. // Notify outside the synchronized block.
for (StandardSession.FrameBytes frame : failed) for (StandardSession.FrameBytes frame : failed)
frame.failed(x); frame.failed(x);
super.failed(x);
} }
} }
} }

View File

@ -71,14 +71,25 @@ public abstract class IteratingCallback implements Callback
/** /**
* Indicates that {@link #process()} has completed the overall job. * Indicates that {@link #process()} has completed the overall job.
*/ */
SUCCEEDED, SUCCEEDED
/**
* Indicates that {@link #process()} has failed the overall job.
*/
FAILED
} }
private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE); private final AtomicReference<State> _state;
protected IteratingCallback()
{
_state = new AtomicReference<>(State.INACTIVE);
}
protected IteratingCallback(boolean needReset)
{
_state = new AtomicReference<>(needReset?State.SUCCEEDED:State.INACTIVE);
}
protected State getState()
{
return _state.get();
}
/** /**
* Method called by {@link #iterate()} to process the sub task. * Method called by {@link #iterate()} to process the sub task.
@ -91,7 +102,6 @@ public abstract class IteratingCallback implements Callback
* <li>{@link Action#SCHEDULED} when the sub task asynchronous execution * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
* has been started</li> * has been started</li>
* <li>{@link Action#SUCCEEDED} when the overall job is completed</li> * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
* <li>{@link Action#FAILED} when the overall job cannot be completed</li>
* </ul> * </ul>
* *
* @throws Exception if the sub task processing throws * @throws Exception if the sub task processing throws
@ -101,7 +111,12 @@ public abstract class IteratingCallback implements Callback
/** /**
* Invoked when the overall task has completed successfully. * Invoked when the overall task has completed successfully.
*/ */
protected abstract void completed(); protected abstract void onCompleteSuccess();
/**
* Invoked when the overall task has completely failed.
*/
protected abstract void onCompleteFailure(Throwable x);
/** /**
* This method must be invoked by applications to start the processing * This method must be invoked by applications to start the processing
@ -196,13 +211,7 @@ public abstract class IteratingCallback implements Callback
case SUCCEEDED: case SUCCEEDED:
{ {
// The overall job has completed. // The overall job has completed.
if (completeSuccess()) completeSuccess();
completed();
return true;
}
case FAILED:
{
completeFailure();
return true; return true;
} }
default: default:
@ -265,43 +274,49 @@ public abstract class IteratingCallback implements Callback
* {@code super.failed(Throwable)}. * {@code super.failed(Throwable)}.
*/ */
@Override @Override
public void failed(Throwable x) public final void failed(Throwable x)
{ {
completeFailure(); completeFailure(x);
} }
private boolean completeSuccess() protected void completeSuccess()
{ {
while (true) while (true)
{ {
State current = _state.get(); State current = _state.get();
if (current == State.FAILED) switch(current)
{
// Success arrived too late, sorry.
return false;
}
else
{ {
case SUCCEEDED:
case FAILED:
// Already complete!.
return;
default:
if (_state.compareAndSet(current, State.SUCCEEDED)) if (_state.compareAndSet(current, State.SUCCEEDED))
return true;
}
}
}
private void completeFailure()
{ {
while (true) onCompleteSuccess();
{
State current = _state.get();
if (current == State.SUCCEEDED)
{
// Failed arrived too late, sorry.
return; return;
} }
else }
}
}
protected void completeFailure(Throwable x)
{ {
while (true)
{
State current = _state.get();
switch(current)
{
case SUCCEEDED:
case FAILED:
// Already complete!.
return;
default:
if (_state.compareAndSet(current, State.FAILED)) if (_state.compareAndSet(current, State.FAILED))
break; {
onCompleteFailure(x);
return;
}
} }
} }
} }
@ -314,6 +329,23 @@ public abstract class IteratingCallback implements Callback
return _state.get() == State.INACTIVE; return _state.get() == State.INACTIVE;
} }
/* ------------------------------------------------------------ */
/**
* @return true if the callback is not INACTIVE, FAILED or SUCCEEDED.
*/
public boolean isInUse()
{
switch(_state.get())
{
case INACTIVE:
case FAILED:
case SUCCEEDED:
return false;
default:
return true;
}
}
/** /**
* @return whether this callback has failed * @return whether this callback has failed
*/ */
@ -330,6 +362,36 @@ public abstract class IteratingCallback implements Callback
return _state.get() == State.SUCCEEDED; return _state.get() == State.SUCCEEDED;
} }
/* ------------------------------------------------------------ */
/** Reset the callback
* <p>A callback can only be reset to INACTIVE from the SUCCEEDED or FAILED states or if it is already INACTIVE.
* @return True if the reset was successful
*/
public boolean reset()
{
while (true)
{
switch(_state.get())
{
case INACTIVE:
return true;
case SUCCEEDED:
if (_state.compareAndSet(State.SUCCEEDED, State.INACTIVE))
return true;
break;
case FAILED:
if (_state.compareAndSet(State.FAILED, State.INACTIVE))
return true;
break;
default:
return false;
}
}
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -48,15 +48,14 @@ public abstract class IteratingNestedCallback extends IteratingCallback
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
_callback.succeeded(); _callback.succeeded();
} }
@Override @Override
public void failed(Throwable x) protected void onCompleteFailure(Throwable x)
{ {
super.failed(x);
_callback.failed(x); _callback.failed(x);
} }

View File

@ -262,16 +262,14 @@ public class IteratingCallbackTest
int processed=0; int processed=0;
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
completed.countDown(); completed.countDown();
} }
@Override @Override
public void failed(Throwable x) public void onCompleteFailure(Throwable x)
{ {
super.failed(x);
completed.countDown(); completed.countDown();
} }

View File

@ -33,6 +33,7 @@ import java.net.URI;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.file.FileSystemException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.InvalidPathException; import java.nio.file.InvalidPathException;
import java.nio.file.Path; import java.nio.file.Path;
@ -86,7 +87,7 @@ public abstract class AbstractFSResourceTest
} }
else if (OS.IS_WINDOWS) else if (OS.IS_WINDOWS)
{ {
newResource(new URI("file:/dev/null")); newResource(new URI("file://some\"text\""));
} }
} }
@ -339,7 +340,7 @@ public abstract class AbstractFSResourceTest
Files.createFile(foo); Files.createFile(foo);
Files.createSymbolicLink(bar,foo); Files.createSymbolicLink(bar,foo);
} }
catch (UnsupportedOperationException e) catch (UnsupportedOperationException | FileSystemException e)
{ {
// if unable to create symlink, no point testing the rest // if unable to create symlink, no point testing the rest
// this is the path that Microsoft Windows takes. // this is the path that Microsoft Windows takes.

View File

@ -386,11 +386,19 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// This IteratingCallback never completes. // This IteratingCallback never completes.
} }
@Override
protected void onCompleteFailure(Throwable x)
{
// This IteratingCallback never fails.
// The callback are those provided by WriteCallback (implemented
// below) and even in case of writeFailed() we call succeeded().
}
@Override @Override
public void writeSuccess() public void writeSuccess()
{ {

View File

@ -368,11 +368,20 @@ public abstract class CompressExtension extends AbstractExtension
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// This IteratingCallback never completes. // This IteratingCallback never completes.
} }
@Override
protected void onCompleteFailure(Throwable x)
{
// Fail all the frames in the queue.
FrameEntry entry;
while ((entry = entries.poll()) != null)
notifyCallbackFailure(entry.callback, x);
}
@Override @Override
public void writeSuccess() public void writeSuccess()
{ {
@ -388,10 +397,6 @@ public abstract class CompressExtension extends AbstractExtension
// If something went wrong, very likely the compression context // If something went wrong, very likely the compression context
// will be invalid, so we need to fail this IteratingCallback. // will be invalid, so we need to fail this IteratingCallback.
failed(x); failed(x);
// Now no more frames can be queued, fail those in the queue.
FrameEntry entry;
while ((entry = entries.poll()) != null)
notifyCallbackFailure(entry.callback, x);
} }
} }
} }

View File

@ -150,11 +150,19 @@ public class FragmentExtension extends AbstractExtension
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// This IteratingCallback never completes. // This IteratingCallback never completes.
} }
@Override
protected void onCompleteFailure(Throwable x)
{
// This IteratingCallback never fails.
// The callback are those provided by WriteCallback (implemented
// below) and even in case of writeFailed() we call succeeded().
}
@Override @Override
public void writeSuccess() public void writeSuccess()
{ {

View File

@ -91,13 +91,13 @@ public class FrameFlusher
} }
@Override @Override
protected void completed() protected void onCompleteSuccess()
{ {
// This IteratingCallback never completes. // This IteratingCallback never completes.
} }
@Override @Override
public void failed(Throwable x) public void onCompleteFailure(Throwable x)
{ {
for (FrameEntry entry : entries) for (FrameEntry entry : entries)
{ {
@ -105,7 +105,6 @@ public class FrameFlusher
entry.release(); entry.release();
} }
entries.clear(); entries.clear();
super.failed(x);
failure = x; failure = x;
onFailure(x); onFailure(x);
} }

View File

@ -258,9 +258,11 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration> <configuration>
<argLine>-showversion -Xmx1g -Xms1g -XX:+PrintGCDetails</argLine> <argLine>-showversion -Xmx1g -Xms1g -XX:+PrintGCDetails</argLine>
<failIfNoTests>false</failIfNoTests> <failIfNoTests>false</failIfNoTests>
<runMode>random</runMode>
<systemProperties> <systemProperties>
<!-- <!--
<property> <property>