Removed TimedResponseListener, since the timeout mechanism is now

provided by Request.timeout(...) for both blocking and asynchronous send().
This commit is contained in:
Simone Bordet 2013-01-17 13:48:04 +01:00
parent cda27ae4a1
commit 44e64aa309
10 changed files with 125 additions and 238 deletions

View File

@ -481,10 +481,6 @@ public class HttpClient extends ContainerLifeCycle
if (!Arrays.asList("http", "https").contains(scheme))
throw new IllegalArgumentException("Invalid protocol " + scheme);
for (Response.ResponseListener listener : listeners)
if (listener instanceof Schedulable)
((Schedulable)listener).schedule(scheduler);
HttpDestination destination = provideDestination(scheme, request.getHost(), request.getPort());
destination.send(request, listeners);
}

View File

@ -22,7 +22,7 @@ import java.io.UnsupportedEncodingException;
import java.net.HttpCookie;
import java.net.URLEncoder;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
@ -108,7 +108,16 @@ public class HttpConnection extends AbstractConnection implements Connection
@Override
public void send(Request request, Response.CompleteListener listener)
{
send(request, Collections.<Response.ResponseListener>singletonList(listener));
ArrayList<Response.ResponseListener> listeners = new ArrayList<>(2);
if (request.getTimeout() > 0)
{
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(request);
timeoutListener.schedule(client.getScheduler());
listeners.add(timeoutListener);
}
if (listener != null)
listeners.add(listener);
send(request, listeners);
}
public void send(Request request, List<Response.ResponseListener> listeners)
@ -125,10 +134,6 @@ public class HttpConnection extends AbstractConnection implements Connection
setExchange(exchange);
conversation.getExchanges().offer(exchange);
for (Response.ResponseListener listener : listeners)
if (listener instanceof Schedulable)
((Schedulable)listener).schedule(client.getScheduler());
sender.send(exchange);
}

View File

@ -36,7 +36,6 @@ import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.TimedResponseListener;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@ -509,8 +508,9 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST.asString(), target);
connection.send(connect, new TimedResponseListener(client.getConnectTimeout(), TimeUnit.MILLISECONDS, connect, new Response.CompleteListener()
.header(HttpHeader.HOST.asString(), target)
.timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
connection.send(connect, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
@ -534,7 +534,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
}
}
}
}));
});
}
}
}

View File

@ -178,14 +178,7 @@ public class HttpExchange
// Request and response completed
LOG.debug("{} complete", this);
if (isLast())
{
HttpExchange first = conversation.getExchanges().peekFirst();
List<Response.ResponseListener> listeners = first.getResponseListeners();
for (Response.ResponseListener listener : listeners)
if (listener instanceof Schedulable)
((Schedulable)listener).cancel();
conversation.complete();
}
}
result = new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
}

View File

@ -426,7 +426,7 @@ public class HttpRequest implements Request
public ContentResponse send() throws InterruptedException, TimeoutException, ExecutionException
{
FutureResponseListener listener = new FutureResponseListener(this);
send(listener);
send(this, listener);
long timeout = getTimeout();
if (timeout <= 0)
@ -447,10 +447,21 @@ public class HttpRequest implements Request
@Override
public void send(Response.CompleteListener listener)
{
if (getTimeout() > 0)
{
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(this);
timeoutListener.schedule(client.getScheduler());
responseListeners.add(timeoutListener);
}
send(this, listener);
}
private void send(Request request, Response.CompleteListener listener)
{
if (listener != null)
responseListeners.add(listener);
client.send(this, responseListeners);
client.send(request, responseListeners);
}
@Override

View File

@ -1,28 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.util.thread.Scheduler;
public interface Schedulable
{
public boolean schedule(Scheduler scheduler);
public boolean cancel();
}

View File

@ -0,0 +1,70 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class TimeoutCompleteListener implements Response.CompleteListener, Runnable
{
private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class);
private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
private final Request request;
public TimeoutCompleteListener(Request request)
{
this.request = request;
}
@Override
public void onComplete(Result result)
{
Scheduler.Task task = this.task.getAndSet(null);
if (task != null)
{
boolean cancelled = task.cancel();
LOG.debug("Cancelled (successfully: {}) timeout task {}", cancelled, task);
}
}
public boolean schedule(Scheduler scheduler)
{
long timeout = request.getTimeout();
Scheduler.Task task = scheduler.schedule(this, timeout, TimeUnit.MILLISECONDS);
if (this.task.getAndSet(task) != null)
throw new IllegalStateException();
LOG.debug("Scheduled timeout task {} in {} ms", task, timeout);
return true;
}
@Override
public void run()
{
request.abort(new TimeoutException("Total timeout elapsed"));
}
}

View File

@ -1,169 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.Schedulable;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* Implementation of {@link Response.Listener} that allows to specify a timeout for asynchronous
* operations.
* <p />
* {@link TimedResponseListener} may be used to decorate a delegate {@link Response.CompleteListener}
* provided by the application. Events are forwarded by {@link TimedResponseListener} to the delegate
* listener.
* Alternatively, {@link TimedResponseListener} may be subclassed to override callbacks that are
* interesting to the application, typically {@link #onComplete(Result)}.
* <p />
* If the timeout specified at the constructor elapses, the request is {@link Request#abort(Throwable) aborted}
* with a {@link TimeoutException}.
* <p />
* Typical usage is:
* <pre>
* Request request = httpClient.newRequest(...)...;
* TimedResponseListener listener = new TimedResponseListener(5, TimeUnit.SECONDS, request, new Response.CompleteListener()
* {
* public void onComplete(Result result)
* {
* // Invoked when request/response completes or when timeout elapses
*
* // Your logic here
* }
* });
* request.send(listener); // Asynchronous send
* </pre>
*/
public class TimedResponseListener implements Response.Listener, Schedulable, Runnable
{
private static final Logger LOG = Log.getLogger(TimedResponseListener.class);
private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
private final long timeout;
private final TimeUnit unit;
private final Request request;
private final Response.CompleteListener delegate;
public TimedResponseListener(long timeout, TimeUnit unit, Request request)
{
this(timeout, unit, request, new Empty());
}
public TimedResponseListener(long timeout, TimeUnit unit, Request request, Response.CompleteListener delegate)
{
this.timeout = timeout;
this.unit = unit;
this.request = request;
this.delegate = delegate;
}
@Override
public void onBegin(Response response)
{
if (delegate instanceof Response.BeginListener)
((Response.BeginListener)delegate).onBegin(response);
}
@Override
public boolean onHeader(Response response, HttpField field)
{
if (delegate instanceof Response.HeaderListener)
return ((Response.HeaderListener)delegate).onHeader(response, field);
return true;
}
@Override
public void onHeaders(Response response)
{
if (delegate instanceof Response.HeadersListener)
((Response.HeadersListener)delegate).onHeaders(response);
}
@Override
public void onContent(Response response, ByteBuffer content)
{
if (delegate instanceof Response.ContentListener)
((Response.ContentListener)delegate).onContent(response, content);
}
@Override
public void onSuccess(Response response)
{
if (delegate instanceof Response.SuccessListener)
((Response.SuccessListener)delegate).onSuccess(response);
}
@Override
public void onFailure(Response response, Throwable failure)
{
if (delegate instanceof Response.FailureListener)
((Response.FailureListener)delegate).onFailure(response, failure);
}
@Override
public void onComplete(Result result)
{
delegate.onComplete(result);
}
public boolean schedule(Scheduler scheduler)
{
Scheduler.Task task = this.task.get();
if (task != null)
return false;
task = scheduler.schedule(this, timeout, unit);
if (this.task.compareAndSet(null, task))
{
LOG.debug("Scheduled timeout task {} in {} ms", task, unit.toMillis(timeout));
return true;
}
else
{
task.cancel();
return false;
}
}
@Override
public void run()
{
request.abort(new TimeoutException("Total timeout elapsed"));
}
public boolean cancel()
{
Scheduler.Task task = this.task.get();
if (task == null)
return false;
boolean result = task.cancel();
LOG.debug("Cancelled timeout task {}", task);
return result;
}
}

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -35,7 +34,6 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.TimedResponseListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
@ -71,8 +69,10 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
start(new TimeoutHandler(2 * timeout));
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(timeout, TimeUnit.MILLISECONDS);
request.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
@ -96,8 +96,10 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
// The first request has a long timeout
final CountDownLatch firstLatch = new CountDownLatch(1);
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
request.send(new TimedResponseListener(4 * timeout, TimeUnit.MILLISECONDS, request)
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(4 * timeout, TimeUnit.MILLISECONDS);
request.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
@ -109,8 +111,10 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
// Second request has a short timeout and should fail in the queue
final CountDownLatch secondLatch = new CountDownLatch(1);
request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(timeout, TimeUnit.MILLISECONDS);
request.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
@ -137,8 +141,9 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(new InputStreamContentProvider(new ByteArrayInputStream(content)));
request.send(new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request, new BufferingResponseListener()
.content(new InputStreamContentProvider(new ByteArrayInputStream(content)))
.timeout(2 * timeout, TimeUnit.MILLISECONDS);
request.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
@ -147,7 +152,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertArrayEquals(content, getContent());
latch.countDown();
}
}));
});
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
@ -167,8 +172,10 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
connection.send(request, new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(timeout, TimeUnit.MILLISECONDS);
connection.send(request, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
@ -193,8 +200,10 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest(destination.getHost(), destination.getPort()).scheme(scheme);
connection.send(request, new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request)
Request request = client.newRequest(destination.getHost(), destination.getPort())
.scheme(scheme)
.timeout(2 * timeout, TimeUnit.MILLISECONDS);
connection.send(request, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)

View File

@ -43,7 +43,6 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.TimedResponseListener;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
@ -221,7 +220,7 @@ public class ProxyServlet extends HttpServlet
* <tr>
* <td>timeout</td>
* <td>60000</td>
* <td>The total timeout in milliseconds, see {@link TimedResponseListener}</td>
* <td>The total timeout in milliseconds, see {@link Request#timeout(long, TimeUnit)}</td>
* </tr>
* <tr>
* <td>requestBufferSize</td>
@ -463,7 +462,8 @@ public class ProxyServlet extends HttpServlet
proxyRequest.getHeaders().toString().trim());
}
proxyRequest.send(new TimedResponseListener(getTimeout(), TimeUnit.MILLISECONDS, proxyRequest, new ProxyResponseListener(request, response)));
proxyRequest.timeout(getTimeout(), TimeUnit.MILLISECONDS);
proxyRequest.send(new ProxyResponseListener(request, response));
}
protected void onResponseHeaders(HttpServletRequest request, HttpServletResponse response, Response proxyResponse)