Fixes #6323 - HttpClient requests with redirects gets stuck/never calls onComplete()

* Reworked the total timeout handling.
* Now a CyclicTimeouts handles the exchanges in each HttpDestination,
and a CyclicTimeouts handles the exchanges in each HttpConnection
(rather than in HttpChannel).
* Now adjusting the total timeout for copied requests generated by
redirects and authentication.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
(cherry picked from commit 2e7d17400f)
This commit is contained in:
Simone Bordet 2021-06-03 15:20:42 +02:00
parent fbbe584a30
commit 21aba4a724
16 changed files with 612 additions and 53 deletions

View File

@ -19,6 +19,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -211,8 +212,25 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
// Disable the timeout so that only the one from the initial request applies.
newRequest.timeout(0, TimeUnit.MILLISECONDS);
// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = request.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed");
forwardFailureComplete(request, failure, response, failure);
return;
}
}
if (path != null)
newRequest.path(path);

View File

@ -14,28 +14,26 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class HttpChannel
public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannel.class);
private final AutoLock _lock = new AutoLock();
private final HttpDestination _destination;
private final TimeoutCompleteListener _totalTimeout;
private HttpExchange _exchange;
protected HttpChannel(HttpDestination destination)
{
_destination = destination;
_totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}
public void destroy()
{
_totalTimeout.destroy();
}
public HttpDestination getHttpDestination()
@ -106,6 +104,13 @@ public abstract class HttpChannel
}
}
@Override
public long getExpireNanoTime()
{
HttpExchange exchange = getHttpExchange();
return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE;
}
protected abstract HttpSender getHttpSender();
protected abstract HttpReceiver getHttpReceiver();
@ -114,16 +119,8 @@ public abstract class HttpChannel
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
long timeoutAt = exchange.getExpireNanoTime();
if (timeoutAt != Long.MAX_VALUE)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(exchange.getRequest(), timeoutAt);
}
send(exchange);
}
}
public abstract void send(HttpExchange exchange);

View File

@ -17,6 +17,7 @@ import java.net.CookieStore;
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -30,9 +31,11 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,6 +45,7 @@ public abstract class HttpConnection implements IConnection, Attachable
private final AutoLock lock = new AutoLock();
private final HttpDestination destination;
private final RequestTimeouts requestTimeouts;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;
@ -49,6 +53,7 @@ public abstract class HttpConnection implements IConnection, Attachable
protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler());
this.idleTimeoutStamp = System.nanoTime();
}
@ -62,6 +67,8 @@ public abstract class HttpConnection implements IConnection, Attachable
return destination;
}
protected abstract Iterator<HttpChannel> getHttpChannels();
@Override
public void send(Request request, Response.CompleteListener listener)
{
@ -99,6 +106,7 @@ public abstract class HttpConnection implements IConnection, Attachable
SendFailure result;
if (channel.associate(exchange))
{
requestTimeouts.schedule(channel);
channel.send();
result = null;
}
@ -228,16 +236,6 @@ public abstract class HttpConnection implements IConnection, Attachable
return builder;
}
private void applyProxyAuthentication(Request request, ProxyConfiguration.Proxy proxy)
{
if (proxy != null)
{
Authentication.Result result = getHttpClient().getAuthenticationStore().findAuthenticationResult(proxy.getURI());
if (result != null)
result.apply(request);
}
}
private void applyRequestAuthentication(Request request)
{
AuthenticationStore authenticationStore = getHttpClient().getAuthenticationStore();
@ -253,6 +251,16 @@ public abstract class HttpConnection implements IConnection, Attachable
}
}
private void applyProxyAuthentication(Request request, ProxyConfiguration.Proxy proxy)
{
if (proxy != null)
{
Authentication.Result result = getHttpClient().getAuthenticationStore().findAuthenticationResult(proxy.getURI());
if (result != null)
result.apply(request);
}
}
public boolean onIdleTimeout(long idleTimeout)
{
try (AutoLock l = lock.lock())
@ -288,9 +296,40 @@ public abstract class HttpConnection implements IConnection, Attachable
return attachment;
}
protected void destroy()
{
requestTimeouts.destroy();
}
@Override
public String toString()
{
return String.format("%s@%h", getClass().getSimpleName(), this);
}
private class RequestTimeouts extends CyclicTimeouts<HttpChannel>
{
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
protected Iterator<HttpChannel> iterator()
{
return getHttpChannels();
}
@Override
protected boolean onExpired(HttpChannel channel)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}
return false;
}
}
}

View File

@ -18,6 +18,7 @@ import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.AttributesMap;
import org.slf4j.Logger;
@ -138,6 +139,20 @@ public class HttpConversation extends AttributesMap
this.listeners = listeners;
}
/**
* <p>Returns the total timeout for the conversation.</p>
* <p>The conversation total timeout is the total timeout
* of the first request in the conversation.</p>
*
* @return the total timeout of the conversation
* @see Request#getTimeout()
*/
public long getTimeout()
{
HttpExchange firstExchange = exchanges.peekFirst();
return firstExchange == null ? 0 : firstExchange.getRequest().getTimeout();
}
public boolean abort(Throwable cause)
{
HttpExchange exchange = exchanges.peekLast();

View File

@ -525,7 +525,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
/**
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
* is enforced in {@link HttpConnection}.</p>
*/
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{

View File

@ -471,9 +471,9 @@ public abstract class HttpReceiver
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)

View File

@ -19,6 +19,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -106,8 +107,8 @@ public class HttpRedirector
*/
public Result redirect(Request request, Response response) throws InterruptedException, ExecutionException
{
final AtomicReference<Result> resultRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Result> resultRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Request redirect = redirect(request, response, new BufferingResponseListener()
{
@Override
@ -302,13 +303,29 @@ public class HttpRedirector
}
}
private Request sendRedirect(final HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
private Request sendRedirect(HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
{
try
{
Request redirect = client.copyRequest(httpRequest, location);
// Disable the timeout so that only the one from the initial request applies.
redirect.timeout(0, TimeUnit.MILLISECONDS);
// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = httpRequest.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
redirect.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + httpRequest.getConversation().getTimeout() + " ms elapsed");
fail(httpRequest, failure, response);
return null;
}
}
// Use given method
redirect.method(method);
@ -325,17 +342,27 @@ public class HttpRedirector
}
catch (Throwable x)
{
fail(httpRequest, response, x);
fail(httpRequest, x, response);
return null;
}
}
protected void fail(Request request, Response response, Throwable failure)
{
fail(request, null, response, failure);
}
protected void fail(Request request, Throwable failure, Response response)
{
fail(request, failure, response, failure);
}
private void fail(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> listeners = conversation.getResponseListeners();
notifier.notifyFailure(listeners, response, failure);
notifier.notifyComplete(listeners, new Result(request, response, failure));
notifier.notifyFailure(listeners, response, responseFailure);
notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
}

View File

@ -21,10 +21,15 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @deprecated Do not use it, use {@link CyclicTimeouts} instead.
*/
@Deprecated
public class TimeoutCompleteListener extends CyclicTimeout implements Response.CompleteListener
{
private static final Logger LOG = LoggerFactory.getLogger(TimeoutCompleteListener.class);

View File

@ -15,6 +15,8 @@ package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -22,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpConversation;
@ -266,6 +269,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
super(destination);
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return Collections.<HttpChannel>singleton(channel).iterator();
}
@Override
public SendFailure send(HttpExchange exchange)
{
@ -322,6 +331,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
public void close()
{
HttpConnectionOverHTTP.this.close();
destroy();
}
@Override

View File

@ -516,6 +516,117 @@ public class HttpClientRedirectTest extends AbstractHttpClientServerTest
});
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testRedirectToDifferentHostThenRequestToFirstHostExpires(Scenario scenario) throws Exception
{
long timeout = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
if ("/one".equals(target))
{
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader(HttpHeader.LOCATION.asString(), scenario.getScheme() + "://127.0.0.1:" + connector.getLocalPort() + "/two");
}
else if ("/two".equals(target))
{
try
{
// Send another request to "localhost", therefore reusing the
// connection used for the first request, it must timeout.
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/three")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
if (result.getFailure() instanceof TimeoutException)
latch.countDown();
});
// Wait for the request to fail as it should.
assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
}
catch (Throwable x)
{
throw new ServletException(x);
}
}
else if ("/three".equals(target))
{
try
{
// The third request must timeout.
Thread.sleep(2 * timeout);
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/one")
// The timeout should not expire, but must be present to trigger the test conditions.
.timeout(3 * timeout, TimeUnit.MILLISECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testManyRedirectsTotalTimeoutExpires(Scenario scenario) throws Exception
{
long timeout = 1000;
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
try
{
String serverURI = scenario.getScheme() + "://localhost:" + connector.getLocalPort();
if ("/one".equals(target))
{
Thread.sleep(timeout);
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/two");
}
else if ("/two".equals(target))
{
Thread.sleep(timeout);
response.setStatus(HttpStatus.SEE_OTHER_303);
response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/three");
}
else if ("/three".equals(target))
{
Thread.sleep(2 * timeout);
}
}
catch (InterruptedException x)
{
throw new ServletException(x);
}
}
});
assertThrows(TimeoutException.class, () ->
{
client.setMaxRedirects(-1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/one")
.timeout(3 * timeout, TimeUnit.MILLISECONDS)
.send();
});
}
private void testSameMethodRedirect(final Scenario scenario, final HttpMethod method, int redirectCode) throws Exception
{
testMethodRedirect(scenario, method, method, redirectCode);
@ -564,7 +675,7 @@ public class HttpClientRedirectTest extends AbstractHttpClientServerTest
assertEquals(200, response.getStatus());
}
private class RedirectHandler extends EmptyServerHandler
private static class RedirectHandler extends EmptyServerHandler
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.fcgi.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
@ -366,6 +367,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
super(destination);
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return new IteratorWrapper<>(activeChannels.values().iterator());
}
@Override
public SendFailure send(HttpExchange exchange)
{
@ -384,6 +391,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
public void close()
{
HttpConnectionOverFCGI.this.close();
destroy();
}
protected void close(Throwable failure)
@ -504,4 +512,32 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
LOG.debug("Channel not found for request {}", request);
}
}
private static final class IteratorWrapper<T> implements Iterator<T>
{
private final Iterator<? extends T> iterator;
private IteratorWrapper(Iterator<? extends T> iterator)
{
this.iterator = iterator;
}
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public T next()
{
return iterator.next();
}
@Override
public void remove()
{
iterator.remove();
}
}
}

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http2.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -77,6 +78,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
this.recycleHttpChannels = recycleHttpChannels;
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return activeChannels.iterator();
}
@Override
public SendFailure send(HttpExchange exchange)
{
@ -184,6 +191,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
public void close()
{
close(new AsynchronousCloseException());
destroy();
}
protected void close(Throwable failure)

View File

@ -51,14 +51,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
public CyclicTimeouts(Scheduler scheduler)
{
cyclicTimeout = new CyclicTimeout(scheduler)
{
@Override
public void onTimeoutExpired()
{
CyclicTimeouts.this.onTimeoutExpired();
}
};
cyclicTimeout = new Timeouts(scheduler);
}
/**
@ -68,6 +61,9 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
/**
* <p>Invoked during the iteration when the given entity is expired.</p>
* <p>This method may be invoked multiple times, and even concurrently,
* for the same expirable entity and therefore the expiration of the
* entity, if any, should be an idempotent action.</p>
*
* @param expirable the entity that is expired
* @return whether the entity should be removed from the iterator via {@link Iterator#remove()}
@ -77,7 +73,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
private void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);
LOG.debug("Timeouts check for {}", this);
long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
@ -87,18 +83,29 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
// be seen during the iteration below.
earliestTimeout.set(earliest);
Iterator<T> iterator = iterator();
if (iterator == null)
return;
// Scan the entities to abort expired entities
// and to find the entity that expires the earliest.
Iterator<T> iterator = iterator();
while (iterator.hasNext())
{
T expirable = iterator.next();
long expiresAt = expirable.getExpireNanoTime();
if (LOG.isDebugEnabled())
LOG.debug("Entity {} expires in {} ms for {}", expirable, TimeUnit.NANOSECONDS.toMillis(expiresAt - now), this);
if (expiresAt == -1)
continue;
if (expiresAt <= now)
{
if (onExpired(expirable))
boolean remove = onExpired(expirable);
if (LOG.isDebugEnabled())
LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this);
if (remove)
iterator.remove();
continue;
}
@ -127,13 +134,19 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
// When the timeout expires, scan the entities for the next
// earliest entity that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
long expires = expiresAt;
while (expires < prevEarliest)
{
// A new entity expires earlier than previous entities, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
long delay = Math.max(0, expires - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
cyclicTimeout.schedule(delay, TimeUnit.NANOSECONDS);
LOG.debug("Scheduling timeout in {} ms for {}", TimeUnit.NANOSECONDS.toMillis(delay), this);
schedule(cyclicTimeout, delay, TimeUnit.NANOSECONDS);
// If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
// is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
prevEarliest = expires;
expires = earliestTimeout.get();
}
}
@ -143,6 +156,11 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
cyclicTimeout.destroy();
}
boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
{
return cyclicTimeout.schedule(delay, unit);
}
/**
* <p>An entity that may expire.</p>
*/
@ -159,4 +177,18 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme
*/
public long getExpireNanoTime();
}
private class Timeouts extends CyclicTimeout
{
private Timeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
public void onTimeoutExpired()
{
CyclicTimeouts.this.onTimeoutExpired();
}
}
}

View File

@ -0,0 +1,261 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CyclicTimeoutsTest
{
private Scheduler scheduler;
private CyclicTimeouts<ConstantExpirable> timeouts;
@BeforeEach
public void prepare()
{
scheduler = new ScheduledExecutorScheduler();
LifeCycle.start(scheduler);
}
@AfterEach
public void dispose()
{
if (timeouts != null)
timeouts.destroy();
LifeCycle.stop(scheduler);
}
@Test
public void testNoExpirationForNonExpiringEntity() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<>(scheduler)
{
@Override
protected Iterator<ConstantExpirable> iterator()
{
latch.countDown();
return null;
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
return false;
}
};
// Schedule an entity that does not expire.
timeouts.schedule(ConstantExpirable.noExpire());
Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
}
@Test
public void testScheduleZero() throws Exception
{
ConstantExpirable entity = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
CountDownLatch iteratorLatch = new CountDownLatch(1);
CountDownLatch expiredLatch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<>(scheduler)
{
@Override
protected Iterator<ConstantExpirable> iterator()
{
iteratorLatch.countDown();
return Collections.emptyIterator();
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
expiredLatch.countDown();
return false;
}
};
timeouts.schedule(entity);
Assertions.assertTrue(iteratorLatch.await(1, TimeUnit.SECONDS));
Assertions.assertFalse(expiredLatch.await(1, TimeUnit.SECONDS));
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testIterateAndExpire(boolean remove) throws Exception
{
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS);
Collection<ConstantExpirable> collection = new ArrayList<>();
collection.add(one);
AtomicInteger iterations = new AtomicInteger();
CountDownLatch expiredLatch = new CountDownLatch(1);
timeouts = new CyclicTimeouts<>(scheduler)
{
@Override
protected Iterator<ConstantExpirable> iterator()
{
iterations.incrementAndGet();
return collection.iterator();
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
assertSame(one, expirable);
expiredLatch.countDown();
return remove;
}
};
// Triggers immediate call to iterator(), which
// returns an entity that expires in 1 second.
timeouts.schedule(zero);
// After 1 second there is a second call to
// iterator(), which returns the now expired
// entity, which is passed to onExpired().
assertTrue(expiredLatch.await(2, TimeUnit.SECONDS));
// Wait for the collection to be processed
// with the return value of onExpired().
Thread.sleep(1000);
// Verify the processing of the return value of onExpired().
assertEquals(remove ? 0 : 1, collection.size());
// Wait to see if iterator() is called again (it should not).
Thread.sleep(1000);
assertEquals(2, iterations.get());
}
@Test
public void testScheduleOvertake() throws Exception
{
ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
long delayMs = 2000;
ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
Collection<ConstantExpirable> collection = new ArrayList<>();
collection.add(two);
CountDownLatch expiredLatch = new CountDownLatch(2);
List<ConstantExpirable> expired = new ArrayList<>();
timeouts = new CyclicTimeouts<>(scheduler)
{
private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
@Override
protected Iterator<ConstantExpirable> iterator()
{
return collection.iterator();
}
@Override
protected boolean onExpired(ConstantExpirable expirable)
{
expired.add(expirable);
expiredLatch.countDown();
return true;
}
@Override
boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
{
if (delay <= 0)
return super.schedule(cyclicTimeout, delay, unit);
// Simulate that an entity with a shorter timeout
// overtakes the entity that is currently being scheduled.
// Only schedule the overtake once.
if (overtakeScheduled.compareAndSet(false, true))
{
collection.add(overtake);
schedule(overtake);
}
return super.schedule(cyclicTimeout, delay, unit);
}
};
// Trigger the initial call to iterator().
timeouts.schedule(zero);
// Make sure that the overtake entity expires properly.
assertTrue(expiredLatch.await(2 * delayMs, TimeUnit.MILLISECONDS));
// Make sure all entities expired properly.
assertSame(overtake, expired.get(0));
assertSame(two, expired.get(1));
}
private static class ConstantExpirable implements CyclicTimeouts.Expirable
{
private static ConstantExpirable noExpire()
{
return new ConstantExpirable();
}
private static ConstantExpirable ofDelay(long delay, TimeUnit unit)
{
return new ConstantExpirable(delay, unit);
}
private final long expireNanos;
private final String asString;
private ConstantExpirable()
{
this.expireNanos = Long.MAX_VALUE;
this.asString = "noexp";
}
public ConstantExpirable(long delay, TimeUnit unit)
{
this.expireNanos = System.nanoTime() + unit.toNanos(delay);
this.asString = String.valueOf(unit.toMillis(delay));
}
@Override
public long getExpireNanoTime()
{
return expireNanos;
}
@Override
public String toString()
{
return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
}
}
}

View File

@ -33,7 +33,7 @@ class AsyncContentProducer implements ContentProducer
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new IOException("Unconsumed content")
{
@Override
public synchronized Throwable fillInStackTrace()
public Throwable fillInStackTrace()
{
return this;
}

View File

@ -681,7 +681,7 @@ public class HttpChannelState
}
finally
{
synchronized (this)
try (AutoLock l = lock())
{
_onTimeoutThread = null;
}