mirror of
https://github.com/jetty/jetty.project.git
synced 2025-03-03 12:29:31 +00:00
Merge pull request #2965 from eclipse/jetty-9.4.x-2796-http2_max_concurrent_streams
Issue #2796 - Max local stream count exceeded when request fails.
This commit is contained in:
commit
11abe53df7
@ -550,14 +550,14 @@ public abstract class HttpReceiver
|
||||
// respect to concurrency between request and response.
|
||||
Result result = exchange.terminateResponse();
|
||||
terminateResponse(exchange, result);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean updateResponseState(ResponseState from, ResponseState to)
|
||||
|
@ -76,7 +76,7 @@ public class HttpRequest implements Request
|
||||
private String query;
|
||||
private String method = HttpMethod.GET.asString();
|
||||
private HttpVersion version = HttpVersion.HTTP_1_1;
|
||||
private long idleTimeout;
|
||||
private long idleTimeout = -1;
|
||||
private long timeout;
|
||||
private long timeoutAt;
|
||||
private ContentProvider content;
|
||||
@ -99,7 +99,6 @@ public class HttpRequest implements Request
|
||||
extractParams(query);
|
||||
|
||||
followRedirects(client.isFollowRedirects());
|
||||
idleTimeout = client.getIdleTimeout();
|
||||
HttpField acceptEncodingField = client.getAcceptEncodingField();
|
||||
if (acceptEncodingField != null)
|
||||
headers.put(acceptEncodingField);
|
||||
|
@ -579,14 +579,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
|
||||
// respect to concurrency between request and response.
|
||||
Result result = exchange.terminateRequest();
|
||||
terminateRequest(exchange, failure, result);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean updateRequestState(RequestState from, RequestState to)
|
||||
|
@ -247,7 +247,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
||||
// Save the old idle timeout to restore it.
|
||||
EndPoint endPoint = getEndPoint();
|
||||
idleTimeout = endPoint.getIdleTimeout();
|
||||
endPoint.setIdleTimeout(request.getIdleTimeout());
|
||||
long requestIdleTimeout = request.getIdleTimeout();
|
||||
if (requestIdleTimeout >= 0)
|
||||
endPoint.setIdleTimeout(requestIdleTimeout);
|
||||
|
||||
// One channel per connection, just delegate the send.
|
||||
return send(channel, exchange);
|
||||
|
@ -166,7 +166,7 @@ public class HttpChannelOverFCGI extends HttpChannel
|
||||
{
|
||||
super(connection.getHttpDestination().getHttpClient().getScheduler());
|
||||
this.connection = connection;
|
||||
setIdleTimeout(idleTimeout);
|
||||
setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -562,7 +562,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||
IStream stream = createLocalStream(streamId);
|
||||
stream.setListener(listener);
|
||||
|
||||
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
|
||||
ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
|
||||
queued = flusher.append(entry);
|
||||
}
|
||||
// Iterate outside the synchronized block.
|
||||
@ -606,7 +606,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||
IStream pushStream = createLocalStream(streamId);
|
||||
pushStream.setListener(listener);
|
||||
|
||||
ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
|
||||
ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
|
||||
queued = flusher.append(entry);
|
||||
}
|
||||
// Iterate outside the synchronized block.
|
||||
@ -764,7 +764,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||
int localCount = localStreamCount.get();
|
||||
int maxCount = getMaxLocalStreams();
|
||||
if (maxCount >= 0 && localCount >= maxCount)
|
||||
throw new IllegalStateException("Max local stream count " + maxCount + " exceeded");
|
||||
// TODO: remove the dump() in the exception message.
|
||||
throw new IllegalStateException("Max local stream count " + maxCount + " exceeded" + System.lineSeparator() + dump());
|
||||
if (localStreamCount.compareAndSet(localCount, localCount + 1))
|
||||
break;
|
||||
}
|
||||
@ -780,6 +781,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||
}
|
||||
else
|
||||
{
|
||||
localStreamCount.decrementAndGet();
|
||||
throw new IllegalStateException("Duplicate stream " + streamId);
|
||||
}
|
||||
}
|
||||
@ -816,6 +818,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||
}
|
||||
else
|
||||
{
|
||||
remoteStreamCount.addAndGetHi(-1);
|
||||
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream");
|
||||
return null;
|
||||
}
|
||||
@ -1461,21 +1464,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
||||
}
|
||||
}
|
||||
|
||||
private static class PromiseCallback<C> implements Callback
|
||||
private static class StreamPromiseCallback implements Callback
|
||||
{
|
||||
private final Promise<C> promise;
|
||||
private final C value;
|
||||
private final Promise<Stream> promise;
|
||||
private final IStream stream;
|
||||
|
||||
private PromiseCallback(Promise<C> promise, C value)
|
||||
private StreamPromiseCallback(Promise<Stream> promise, IStream stream)
|
||||
{
|
||||
this.promise = promise;
|
||||
this.value = value;
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
promise.succeeded(value);
|
||||
promise.succeeded(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -138,6 +138,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||
{
|
||||
if (writing.compareAndSet(null, callback))
|
||||
return true;
|
||||
close();
|
||||
callback.failed(new WritePendingException());
|
||||
return false;
|
||||
}
|
||||
@ -275,8 +276,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||
|
||||
private void onHeaders(HeadersFrame frame, Callback callback)
|
||||
{
|
||||
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
MetaData metaData = frame.getMetaData();
|
||||
if (metaData.isRequest() || metaData.isResponse())
|
||||
{
|
||||
@ -286,6 +285,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
|
||||
dataLength = length >= 0 ? length : Long.MIN_VALUE;
|
||||
}
|
||||
|
||||
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@ -507,6 +510,13 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose()
|
||||
{
|
||||
super.onClose();
|
||||
notifyClosed(this);
|
||||
}
|
||||
|
||||
private void updateStreamCount(int deltaStream, int deltaClosing)
|
||||
{
|
||||
((HTTP2Session)session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
|
||||
@ -612,6 +622,21 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyClosed(Stream stream)
|
||||
{
|
||||
Listener listener = this.listener;
|
||||
if (listener == null)
|
||||
return;
|
||||
try
|
||||
{
|
||||
listener.onClosed(stream);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Failure while notifying listener " + listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dump()
|
||||
{
|
||||
|
@ -163,6 +163,13 @@ public interface Stream
|
||||
*/
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback);
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
|
||||
*
|
||||
* @param stream the stream
|
||||
* @param frame the RST_FRAME received
|
||||
* @param callback the callback to complete when the reset has been handled
|
||||
*/
|
||||
public default void onReset(Stream stream, ResetFrame frame, Callback callback)
|
||||
{
|
||||
try
|
||||
@ -214,11 +221,28 @@ public interface Stream
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when the stream failed.</p>
|
||||
*
|
||||
* @param stream the stream
|
||||
* @param error the error code
|
||||
* @param reason the error reason, or null
|
||||
* @param callback the callback to complete when the failure has been handled
|
||||
*/
|
||||
public default void onFailure(Stream stream, int error, String reason, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked after the stream has been closed.</p>
|
||||
*
|
||||
* @param stream the stream
|
||||
*/
|
||||
public default void onClosed(Stream stream)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Empty implementation of {@link Listener}</p>
|
||||
*/
|
||||
|
@ -25,6 +25,7 @@ import org.eclipse.jetty.client.HttpReceiver;
|
||||
import org.eclipse.jetty.client.HttpSender;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
@ -101,6 +102,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
||||
connection.release(this);
|
||||
}
|
||||
|
||||
void onStreamClosed(IStream stream)
|
||||
{
|
||||
connection.onStreamClosed(stream, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exchangeTerminated(HttpExchange exchange, Result result)
|
||||
{
|
||||
|
@ -35,12 +35,17 @@ import org.eclipse.jetty.client.HttpRequest;
|
||||
import org.eclipse.jetty.client.SendFailure;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnection.class);
|
||||
|
||||
private final Set<HttpChannel> activeChannels = ConcurrentHashMap.newKeySet();
|
||||
private final Queue<HttpChannelOverHTTP2> idleChannels = new ConcurrentLinkedQueue<>();
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
@ -87,16 +92,15 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
||||
|
||||
protected void release(HttpChannelOverHTTP2 channel)
|
||||
{
|
||||
// Only non-push channels are released.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Released {}", channel);
|
||||
if (activeChannels.remove(channel))
|
||||
{
|
||||
channel.setStream(null);
|
||||
// Recycle only non-failed channels.
|
||||
if (channel.isFailed())
|
||||
channel.destroy();
|
||||
else
|
||||
idleChannels.offer(channel);
|
||||
getHttpDestination().release(this);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -104,6 +108,16 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
||||
}
|
||||
}
|
||||
|
||||
void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} closed for {}", stream, channel);
|
||||
channel.setStream(null);
|
||||
// Only non-push channels are released.
|
||||
if (stream.isLocal())
|
||||
getHttpDestination().release(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(long idleTimeout)
|
||||
{
|
||||
|
@ -38,6 +38,7 @@ import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
@ -171,8 +172,10 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
|
||||
@Override
|
||||
public boolean onIdleTimeout(Stream stream, Throwable x)
|
||||
{
|
||||
responseFailure(x);
|
||||
return true;
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange == null)
|
||||
return false;
|
||||
return !exchange.abort(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -182,6 +185,12 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(Stream stream)
|
||||
{
|
||||
getHttpChannel().onStreamClosed((IStream)stream);
|
||||
}
|
||||
|
||||
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
|
||||
{
|
||||
contentNotifier.offer(new DataInfo(exchange, frame, callback));
|
||||
|
@ -67,7 +67,9 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
||||
{
|
||||
channel.setStream(stream);
|
||||
((IStream)stream).setAttachment(channel);
|
||||
stream.setIdleTimeout(request.getIdleTimeout());
|
||||
long idleTimeout = request.getIdleTimeout();
|
||||
if (idleTimeout >= 0)
|
||||
stream.setIdleTimeout(idleTimeout);
|
||||
|
||||
if (content.hasContent() && !expects100Continue(request))
|
||||
{
|
||||
|
@ -18,6 +18,22 @@
|
||||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.AbstractConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
@ -43,21 +59,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ -370,7 +371,7 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoConcurrentStreamsFirstTimesOut() throws Exception
|
||||
public void testTwoStreamsFirstTimesOut() throws Exception
|
||||
{
|
||||
long timeout = 1000;
|
||||
start(1, new EmptyServerHandler()
|
||||
|
@ -18,15 +18,6 @@
|
||||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import static org.eclipse.jetty.http.client.Transport.FCGI;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@ -65,6 +56,15 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
||||
import static org.eclipse.jetty.http.client.Transport.FCGI;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
||||
public class HttpClientContinueTest extends AbstractTest<TransportScenario>
|
||||
{
|
||||
@Override
|
||||
@ -344,13 +344,14 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
|
||||
}
|
||||
});
|
||||
|
||||
scenario.client.setIdleTimeout(idleTimeout);
|
||||
scenario.client.setIdleTimeout(2 * idleTimeout);
|
||||
|
||||
byte[] content = new byte[1024];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
|
||||
.content(new BytesContentProvider(content))
|
||||
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
|
@ -18,11 +18,8 @@
|
||||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -30,11 +27,11 @@ import java.util.Locale;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
@ -68,6 +65,9 @@ import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTransportScenario>
|
||||
{
|
||||
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
|
||||
@ -186,7 +186,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
// Choose a random method
|
||||
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
|
||||
|
||||
boolean ssl = scenario.isTransportSecure();
|
||||
boolean ssl = scenario.transport.isTlsBased();
|
||||
|
||||
// Choose randomly whether to close the connection on the client or on the server
|
||||
boolean clientClose = false;
|
||||
@ -196,13 +196,17 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
if (!ssl && random.nextInt(100) < 5)
|
||||
serverClose = true;
|
||||
|
||||
long clientTimeout = 0;
|
||||
// if (!ssl && random.nextInt(100) < 5)
|
||||
// clientTimeout = random.nextInt(500) + 500;
|
||||
|
||||
int maxContentLength = 64 * 1024;
|
||||
int contentLength = random.nextInt(maxContentLength) + 1;
|
||||
|
||||
test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
|
||||
test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, clientTimeout, contentLength, true, latch, failures);
|
||||
}
|
||||
|
||||
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
|
||||
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
|
||||
{
|
||||
long requestId = requestCount.incrementAndGet();
|
||||
Request request = scenario.client.newRequest(host, scenario.getNetworkConnectorLocalPortInt().orElse(0))
|
||||
@ -215,6 +219,12 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
else if (serverClose)
|
||||
request.header("X-Close", "true");
|
||||
|
||||
if (clientTimeout > 0)
|
||||
{
|
||||
request.header("X-Timeout", String.valueOf(clientTimeout));
|
||||
request.idleTimeout(clientTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
switch (method)
|
||||
{
|
||||
case "GET":
|
||||
@ -254,12 +264,18 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
{
|
||||
if (result.isFailed())
|
||||
{
|
||||
result.getFailure().printStackTrace();
|
||||
failures.add("Result failed " + result);
|
||||
Throwable failure = result.getFailure();
|
||||
if (!(clientTimeout > 0 && failure instanceof TimeoutException))
|
||||
{
|
||||
failure.printStackTrace();
|
||||
failures.add("Result failed " + result);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (checkContentLength && contentLength.get() != 0)
|
||||
failures.add("Content length mismatch " + contentLength);
|
||||
}
|
||||
|
||||
if (checkContentLength && contentLength.get() != 0)
|
||||
failures.add("Content length mismatch " + contentLength);
|
||||
|
||||
requestLatch.countDown();
|
||||
latch.countDown();
|
||||
@ -288,8 +304,14 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
private class LoadHandler extends AbstractHandler
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
|
||||
String timeout = request.getHeader("X-Timeout");
|
||||
if (timeout != null)
|
||||
sleep(2 * Integer.parseInt(timeout));
|
||||
|
||||
String method = request.getMethod().toUpperCase(Locale.ENGLISH);
|
||||
switch (method)
|
||||
{
|
||||
@ -313,8 +335,18 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
|
||||
if (Boolean.parseBoolean(request.getHeader("X-Close")))
|
||||
response.setHeader("Connection", "close");
|
||||
}
|
||||
|
||||
baseRequest.setHandled(true);
|
||||
private void sleep(long time) throws InterruptedIOException
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(time);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -329,8 +361,9 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connector newServerConnector( Server server) throws Exception {
|
||||
if (transport == UNIX_SOCKET)
|
||||
public Connector newServerConnector( Server server)
|
||||
{
|
||||
if (transport == Transport.UNIX_SOCKET)
|
||||
{
|
||||
UnixSocketConnector
|
||||
unixSocketConnector = new UnixSocketConnector( server, provideServerConnectionFactory( transport ));
|
||||
|
Loading…
x
Reference in New Issue
Block a user