Fixes #955 - Response listeners not invoked when using Connection.send().

This commit is contained in:
Simone Bordet 2016-09-23 17:04:11 +02:00
parent 4edc50b59f
commit f3751da475
3 changed files with 36 additions and 7 deletions

View File

@ -68,21 +68,23 @@ public abstract class HttpConnection implements Connection
@Override @Override
public void send(Request request, Response.CompleteListener listener) public void send(Request request, Response.CompleteListener listener)
{ {
ArrayList<Response.ResponseListener> listeners = new ArrayList<>(2); HttpRequest httpRequest = (HttpRequest)request;
if (request.getTimeout() > 0)
ArrayList<Response.ResponseListener> listeners = new ArrayList<>(httpRequest.getResponseListeners());
if (httpRequest.getTimeout() > 0)
{ {
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(request); TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(httpRequest);
timeoutListener.schedule(getHttpClient().getScheduler()); timeoutListener.schedule(getHttpClient().getScheduler());
listeners.add(timeoutListener); listeners.add(timeoutListener);
} }
if (listener != null) if (listener != null)
listeners.add(listener); listeners.add(listener);
HttpExchange exchange = new HttpExchange(getHttpDestination(), (HttpRequest)request, listeners); HttpExchange exchange = new HttpExchange(getHttpDestination(), httpRequest, listeners);
SendFailure result = send(exchange); SendFailure result = send(exchange);
if (result != null) if (result != null)
request.abort(result.failure); httpRequest.abort(result.failure);
} }
protected abstract SendFailure send(HttpExchange exchange); protected abstract SendFailure send(HttpExchange exchange);

View File

@ -693,6 +693,11 @@ public class HttpRequest implements Request
client.send(request, responseListeners); client.send(request, responseListeners);
} }
protected List<Response.ResponseListener> getResponseListeners()
{
return responseListeners;
}
@Override @Override
public boolean abort(Throwable cause) public boolean abort(Throwable cause)
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
@ -27,7 +28,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert; import org.junit.Assert;
@ -65,7 +66,6 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
} }
} }
@Slow
@Test @Test
public void testExplicitConnectionIsClosedOnRemoteClose() throws Exception public void testExplicitConnectionIsClosedOnRemoteClose() throws Exception
{ {
@ -98,4 +98,26 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty()); Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty()); Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
} }
@Test
public void testExplicitConnectionResponseListeners() throws Exception
{
start(new EmptyServerHandler());
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
Connection connection = futureConnection.get(5, TimeUnit.SECONDS);
CountDownLatch responseLatch = new CountDownLatch(1);
Request request = client.newRequest(destination.getHost(), destination.getPort())
.scheme(scheme)
.onResponseSuccess(response -> responseLatch.countDown());
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
} }