#392733 - Implement a total timeout for asynchronous sends.

Reworked the implementation.
Instead of adding another method for asynchronous sends with
timeout parameters, we now use a TimedResponseListener utility
class, that holds the timeout information.
This commit is contained in:
Simone Bordet 2012-10-26 11:46:01 +02:00
parent 61ba84bda5
commit 58e8ff8fbf
8 changed files with 298 additions and 73 deletions

View File

@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.api.AuthenticationStore;
@ -308,7 +307,7 @@ public class HttpClient extends ContainerLifeCycle
return new ArrayList<Destination>(destinations.values());
}
protected void send(final Request request, long timeout, TimeUnit unit, Response.Listener listener)
protected void send(final Request request, Response.Listener listener)
{
String scheme = request.scheme().toLowerCase();
if (!Arrays.asList("http", "https").contains(scheme))
@ -318,17 +317,8 @@ public class HttpClient extends ContainerLifeCycle
if (port < 0)
port = "https".equals(scheme) ? 443 : 80;
if (timeout > 0)
{
scheduler.schedule(new Runnable()
{
@Override
public void run()
{
request.abort("Total timeout elapsed");
}
}, timeout, unit);
}
if (listener instanceof ResponseListener.Timed)
((ResponseListener.Timed)listener).schedule(scheduler);
HttpDestination destination = provideDestination(scheme, request.host(), port);
destination.send(request, listener);

View File

@ -116,6 +116,9 @@ public class HttpConnection extends AbstractConnection implements Connection
setExchange(exchange);
conversation.exchanges().offer(exchange);
if (listener instanceof ResponseListener.Timed)
((ResponseListener.Timed)listener).schedule(client.getScheduler());
sender.send(exchange);
}

View File

@ -160,7 +160,13 @@ public class HttpExchange
// Request and response completed
LOG.debug("{} complete", this);
if (conversation.last() == this)
{
HttpExchange first = conversation.exchanges().peekFirst();
Response.Listener listener = first.listener();
if (listener instanceof ResponseListener.Timed)
((ResponseListener.Timed)listener).cancel();
conversation.complete();
}
}
result = new Result(request(), requestFailure(), response(), responseFailure());
}

View File

@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.ContentProvider;
@ -323,13 +322,7 @@ public class HttpRequest implements Request
@Override
public void send(final Response.Listener listener)
{
send(0, TimeUnit.SECONDS, listener);
}
@Override
public void send(long timeout, TimeUnit unit, Response.Listener listener)
{
client.send(this, timeout, unit, listener);
client.send(this, listener);
}
@Override

View File

@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.thread.Scheduler;
public interface ResponseListener extends Response.Listener
{
public interface Timed extends Response.Listener
{
public boolean schedule(Scheduler scheduler);
public boolean cancel();
}
}

View File

@ -23,7 +23,6 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
@ -242,25 +241,9 @@ public interface Request
* as they happen, or when the application needs to efficiently manage the response content.
*
* @param listener the listener that receives response events
* @see #send(long, TimeUnit, Response.Listener)
*/
void send(Response.Listener listener);
/**
* Sends this request and asynchronously notifies the given listener for response events.
* <p />
* This method should be used when the application needs to be notified of the various response events
* as they happen, or when the application needs to efficiently manage the response content.
* <p />
* This method waits for the given timeout before aborting the HTTP conversation. A {@code timeout}
* value of zero means to wait indefinitely to the conversation to complete.
*
* @param timeout the total timeout in the given {@code unit}
* @param unit the timeout unit
* @param listener the listener that receives response events
*/
void send(long timeout, TimeUnit unit, Response.Listener listener);
/**
* Attempts to abort the send of this request.
*

View File

@ -0,0 +1,126 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.ResponseListener;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class TimedResponseListener implements ResponseListener.Timed, Runnable
{
private static final Logger LOG = Log.getLogger(TimedResponseListener.class);
private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
private final long timeout;
private final TimeUnit unit;
private final Request request;
private final Response.Listener delegate;
public TimedResponseListener(long timeout, TimeUnit unit, Request request)
{
this(timeout, unit, request, new Empty());
}
public TimedResponseListener(long timeout, TimeUnit unit, Request request, Response.Listener delegate)
{
this.timeout = timeout;
this.unit = unit;
this.request = request;
this.delegate = delegate;
}
@Override
public void onBegin(Response response)
{
delegate.onBegin(response);
}
@Override
public void onHeaders(Response response)
{
delegate.onHeaders(response);
}
@Override
public void onContent(Response response, ByteBuffer content)
{
delegate.onContent(response, content);
}
@Override
public void onSuccess(Response response)
{
delegate.onSuccess(response);
}
@Override
public void onFailure(Response response, Throwable failure)
{
delegate.onFailure(response, failure);
}
@Override
public void onComplete(Result result)
{
delegate.onComplete(result);
}
public boolean schedule(Scheduler scheduler)
{
Scheduler.Task task = this.task.get();
if (task != null)
return false;
task = scheduler.schedule(this, timeout, unit);
if (this.task.compareAndSet(null, task))
{
LOG.debug("Scheduled timeout task {} in {} ms", task, unit.toMillis(timeout));
return true;
}
else
{
task.cancel();
return false;
}
}
@Override
public void run()
{
request.abort("Total timeout elapsed");
}
public boolean cancel()
{
Scheduler.Task task = this.task.get();
if (task == null)
return false;
boolean result = task.cancel();
LOG.debug("Cancelled timeout task {}", task);
return result;
}
}

View File

@ -26,11 +26,17 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.TimedResponseListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
@ -62,17 +68,16 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
start(new TimeoutHandler(2 * timeout));
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
}
@ -88,31 +93,29 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
// The first request has a long timeout
final CountDownLatch firstLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(4 * timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
firstLatch.countDown();
}
});
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
request.send(new TimedResponseListener(4 * timeout, TimeUnit.MILLISECONDS, request)
{
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
firstLatch.countDown();
}
});
// Second request has a short timeout and should fail in the queue
final CountDownLatch secondLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
secondLatch.countDown();
}
});
request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
secondLatch.countDown();
}
});
Assert.assertTrue(secondLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
// The second request must fail before the first request has completed
@ -120,6 +123,94 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertTrue(firstLatch.await(5 * timeout, TimeUnit.MILLISECONDS));
}
@Slow
@Test
public void testTimeoutIsCancelledOnSuccess() throws Exception
{
long timeout = 1000;
start(new TimeoutHandler(timeout));
final CountDownLatch latch = new CountDownLatch(1);
final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(new BytesContentProvider(content));
request.send(new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request, new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
Assert.assertArrayEquals(content, getContent());
latch.countDown();
}
}));
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
TimeUnit.MILLISECONDS.sleep(2 * timeout);
Assert.assertFalse(request.aborted());
}
@Slow
@Test
public void testTimeoutOnListenerWithExplicitConnection() throws Exception
{
long timeout = 1000;
start(new TimeoutHandler(2 * timeout));
final CountDownLatch latch = new CountDownLatch(1);
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
connection.send(request, new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
}
}
@Slow
@Test
public void testTimeoutIsCancelledOnSuccessWithExplicitConnection() throws Exception
{
long timeout = 1000;
start(new TimeoutHandler(timeout));
final CountDownLatch latch = new CountDownLatch(1);
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest(destination.host(), destination.port()).scheme(scheme);
connection.send(request, new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request)
{
@Override
public void onComplete(Result result)
{
Response response = result.getResponse();
Assert.assertEquals(200, response.status());
Assert.assertFalse(result.isFailed());
latch.countDown();
}
});
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
TimeUnit.MILLISECONDS.sleep(2 * timeout);
Assert.assertFalse(request.aborted());
}
}
private class TimeoutHandler extends AbstractHandler
{
private final long timeout;
@ -130,12 +221,13 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
try
{
TimeUnit.MILLISECONDS.sleep(timeout);
IO.copy(request.getInputStream(), response.getOutputStream());
}
catch (InterruptedException x)
{