Introduced Request.CommitListener to separate the pre-commit request event from the commit request event.

This commit is contained in:
Simone Bordet 2012-12-13 20:52:51 +01:00
parent bf3c5c7922
commit 2c583ccda4
5 changed files with 144 additions and 26 deletions

View File

@ -279,6 +279,13 @@ public class HttpRequest implements Request
return this;
}
@Override
public Request onRequestCommit(CommitListener listener)
{
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestSuccess(SuccessListener listener)
{

View File

@ -64,7 +64,7 @@ public class HttpSender
public void send(HttpExchange exchange)
{
if (!updateState(State.IDLE, State.SEND))
if (!updateState(State.IDLE, State.BEGIN))
throw new IllegalStateException();
// Arrange the listeners, so that if there is a request failure the proper listeners are notified
@ -170,13 +170,35 @@ public class HttpSender
}
case FLUSH:
{
switch (state.get())
out: while (true)
{
case SEND:
case COMMIT:
break;
default:
return;
State current = state.get();
switch (current)
{
case BEGIN:
{
if (!updateState(current, State.SEND))
continue;
requestNotifier.notifyHeaders(request);
break out;
}
case SEND:
case COMMIT:
{
// State update is performed after the write in commit()
break out;
}
case FAILURE:
{
// Failed concurrently, avoid the write since
// the connection is probably already closed
return;
}
default:
{
throw new IllegalStateException();
}
}
}
StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
@ -327,12 +349,16 @@ public class HttpSender
if (!updateState(current, State.COMMIT))
continue;
LOG.debug("Committed {}", request);
requestNotifier.notifyHeaders(request);
requestNotifier.notifyCommit(request);
return true;
case COMMIT:
return updateState(current, State.COMMIT);
default:
if (!updateState(current, State.COMMIT))
continue;
return true;
case FAILURE:
return false;
default:
throw new IllegalStateException();
}
}
}
@ -400,7 +426,7 @@ public class HttpSender
LOG.debug("Failed {} {}", request, failure);
Result result = completion.getReference();
boolean notCommitted = current == State.IDLE || current == State.SEND;
boolean notCommitted = current == State.IDLE || current == State.BEGIN || current == State.SEND;
if (result == null && notCommitted && request.getAbortCause() == null)
{
result = exchange.responseComplete(failure).getReference();
@ -422,7 +448,7 @@ public class HttpSender
public boolean abort(HttpExchange exchange, Throwable cause)
{
State current = state.get();
boolean abortable = current == State.IDLE || current == State.SEND ||
boolean abortable = current == State.IDLE || current == State.BEGIN || current == State.SEND ||
current == State.COMMIT && contentIterator.hasNext();
return abortable && fail(cause);
}
@ -445,7 +471,7 @@ public class HttpSender
private enum State
{
IDLE, SEND, COMMIT, FAILURE
IDLE, BEGIN, SEND, COMMIT, FAILURE
}
private static abstract class StatefulExecutorCallback implements Callback, Runnable

View File

@ -96,6 +96,27 @@ public class RequestNotifier
}
}
public void notifyCommit(Request request)
{
for (Request.CommitListener listener : request.getRequestListeners(Request.CommitListener.class))
notifyCommit(listener, request);
for (Request.Listener listener : client.getRequestListeners())
notifyCommit(listener, request);
}
private void notifyCommit(Request.CommitListener listener, Request request)
{
try
{
if (listener != null)
listener.onCommit(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifySuccess(Request request)
{
for (Request.SuccessListener listener : request.getRequestListeners(Request.SuccessListener.class))

View File

@ -243,6 +243,12 @@ public interface Request
*/
Request onRequestHeaders(HeadersListener listener);
/**
* @param listener a listener for request commit event
* @return this request object
*/
Request onRequestCommit(CommitListener listener);
/**
* @param listener a listener for request success event
* @return this request object
@ -367,9 +373,23 @@ public interface Request
}
/**
* Listener for the request committed event.
* Listener for the request headers event.
*/
public interface HeadersListener extends RequestListener
{
/**
* Callback method invoked when the request headers (and perhaps small content) are ready to be sent.
* The request has been converted into bytes, but not yet sent to the server, and further modifications
* to the request may have no effect.
* @param request the request that is about to be committed
*/
public void onHeaders(Request request);
}
/**
* Listener for the request committed event.
*/
public interface CommitListener extends RequestListener
{
/**
* Callback method invoked when the request headers (and perhaps small content) have been sent.
@ -377,7 +397,7 @@ public interface Request
* request may have no effect.
* @param request the request that has been committed
*/
public void onHeaders(Request request);
public void onCommit(Request request);
}
/**
@ -409,7 +429,7 @@ public interface Request
/**
* Listener for all request events.
*/
public interface Listener extends QueuedListener, BeginListener, HeadersListener, SuccessListener, FailureListener
public interface Listener extends QueuedListener, BeginListener, HeadersListener, CommitListener, SuccessListener, FailureListener
{
/**
* An empty implementation of {@link Listener}
@ -431,6 +451,11 @@ public interface Request
{
}
@Override
public void onCommit(Request request)
{
}
@Override
public void onSuccess(Request request)
{

View File

@ -94,7 +94,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
final Throwable cause = new Exception();
final CountDownLatch aborted = new CountDownLatch(1);
final CountDownLatch headers = new CountDownLatch(1);
final CountDownLatch committed = new CountDownLatch(1);
try
{
client.newRequest("localhost", connector.getLocalPort())
@ -109,9 +109,9 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
@Override
public void onHeaders(Request request)
public void onCommit(Request request)
{
headers.countDown();
committed.countDown();
}
})
.send().get(5, TimeUnit.SECONDS);
@ -121,12 +121,51 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
Assert.assertSame(cause, x.getCause());
Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS));
Assert.assertFalse(headers.await(1, TimeUnit.SECONDS));
Assert.assertFalse(committed.await(1, TimeUnit.SECONDS));
}
}
@Slow
@Test
public void testAbortOnHeaders() throws Exception
{
start(new EmptyServerHandler());
final Throwable cause = new Exception();
final CountDownLatch aborted = new CountDownLatch(1);
final CountDownLatch committed = new CountDownLatch(1);
try
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
if (request.abort(cause))
aborted.countDown();
}
@Override
public void onCommit(Request request)
{
committed.countDown();
}
})
.send().get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException x)
{
Assert.assertSame(cause, x.getCause());
Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS));
Assert.assertFalse(committed.await(1, TimeUnit.SECONDS));
}
}
@Test
public void testAbortOnHeaders() throws Exception
public void testAbortOnCommit() throws Exception
{
start(new EmptyServerHandler());
@ -140,10 +179,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestHeaders(new Request.HeadersListener()
.onRequestCommit(new Request.CommitListener()
{
@Override
public void onHeaders(Request request)
public void onCommit(Request request)
{
if (request.abort(cause))
aborted.countDown();
@ -161,7 +200,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
@Test
public void testAbortOnHeadersWithContent() throws Exception
public void testAbortOnCommitWithContent() throws Exception
{
final AtomicReference<IOException> failure = new AtomicReference<>();
start(new AbstractHandler()
@ -188,10 +227,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestHeaders(new Request.HeadersListener()
.onRequestCommit(new Request.CommitListener()
{
@Override
public void onHeaders(Request request)
public void onCommit(Request request)
{
request.abort(cause);
}