Issue #2796 - Max local stream count exceeded when request fails.

Reviewed other possible places where max local stream count may
overflow.

Fixed handling of HTTP/2 stream idle timeouts.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-10-09 15:53:53 +02:00
parent a056695687
commit cec84cf1bf
12 changed files with 75 additions and 48 deletions

View File

@ -550,14 +550,14 @@ public abstract class HttpReceiver
// respect to concurrency between request and response. // respect to concurrency between request and response.
Result result = exchange.terminateResponse(); Result result = exchange.terminateResponse();
terminateResponse(exchange, result); terminateResponse(exchange, result);
return true;
} }
else else
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
return false;
} }
return true;
} }
private boolean updateResponseState(ResponseState from, ResponseState to) private boolean updateResponseState(ResponseState from, ResponseState to)

View File

@ -76,7 +76,7 @@ public class HttpRequest implements Request
private String query; private String query;
private String method = HttpMethod.GET.asString(); private String method = HttpMethod.GET.asString();
private HttpVersion version = HttpVersion.HTTP_1_1; private HttpVersion version = HttpVersion.HTTP_1_1;
private long idleTimeout; private long idleTimeout = -1;
private long timeout; private long timeout;
private long timeoutAt; private long timeoutAt;
private ContentProvider content; private ContentProvider content;
@ -99,7 +99,6 @@ public class HttpRequest implements Request
extractParams(query); extractParams(query);
followRedirects(client.isFollowRedirects()); followRedirects(client.isFollowRedirects());
idleTimeout = client.getIdleTimeout();
HttpField acceptEncodingField = client.getAcceptEncodingField(); HttpField acceptEncodingField = client.getAcceptEncodingField();
if (acceptEncodingField != null) if (acceptEncodingField != null)
headers.put(acceptEncodingField); headers.put(acceptEncodingField);

View File

@ -579,14 +579,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
// respect to concurrency between request and response. // respect to concurrency between request and response.
Result result = exchange.terminateRequest(); Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result); terminateRequest(exchange, failure, result);
return true;
} }
else else
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
return false;
} }
return true;
} }
private boolean updateRequestState(RequestState from, RequestState to) private boolean updateRequestState(RequestState from, RequestState to)

View File

@ -247,7 +247,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
// Save the old idle timeout to restore it. // Save the old idle timeout to restore it.
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout(); idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout()); long requestIdleTimeout = request.getIdleTimeout();
if (requestIdleTimeout >= 0)
endPoint.setIdleTimeout(requestIdleTimeout);
// One channel per connection, just delegate the send. // One channel per connection, just delegate the send.
return send(channel, exchange); return send(channel, exchange);

View File

@ -166,7 +166,7 @@ public class HttpChannelOverFCGI extends HttpChannel
{ {
super(connection.getHttpDestination().getHttpClient().getScheduler()); super(connection.getHttpDestination().getHttpClient().getScheduler());
this.connection = connection; this.connection = connection;
setIdleTimeout(idleTimeout); setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout());
} }
@Override @Override

View File

@ -341,7 +341,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
case SettingsFrame.MAX_CONCURRENT_STREAMS: case SettingsFrame.MAX_CONCURRENT_STREAMS:
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Updating max local concurrent streams to {} for {}", maxLocalStreams, this); LOG.debug("Updating max local concurrent streams to {} for {}", value, this);
maxLocalStreams = value; maxLocalStreams = value;
break; break;
} }
@ -561,7 +561,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream stream = createLocalStream(streamId); IStream stream = createLocalStream(streamId);
stream.setListener(listener); stream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream)); ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
queued = flusher.append(entry); queued = flusher.append(entry);
} }
// Iterate outside the synchronized block. // Iterate outside the synchronized block.
@ -605,7 +605,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream pushStream = createLocalStream(streamId); IStream pushStream = createLocalStream(streamId);
pushStream.setListener(listener); pushStream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream)); ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
queued = flusher.append(entry); queued = flusher.append(entry);
} }
// Iterate outside the synchronized block. // Iterate outside the synchronized block.
@ -779,6 +779,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
else else
{ {
localStreamCount.decrementAndGet();
throw new IllegalStateException("Duplicate stream " + streamId); throw new IllegalStateException("Duplicate stream " + streamId);
} }
} }
@ -815,6 +816,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
else else
{ {
remoteStreamCount.addAndGetHi(-1);
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream"); onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream");
return null; return null;
} }
@ -1461,21 +1463,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
private static class PromiseCallback<C> implements Callback private static class StreamPromiseCallback implements Callback
{ {
private final Promise<C> promise; private final Promise<Stream> promise;
private final C value; private final IStream stream;
private PromiseCallback(Promise<C> promise, C value) private StreamPromiseCallback(Promise<Stream> promise, IStream stream)
{ {
this.promise = promise; this.promise = promise;
this.value = value; this.stream = stream;
} }
@Override @Override
public void succeeded() public void succeeded()
{ {
promise.succeeded(value); promise.succeeded(stream);
} }
@Override @Override

View File

@ -139,6 +139,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{ {
if (writing.compareAndSet(null, callback)) if (writing.compareAndSet(null, callback))
return true; return true;
close();
callback.failed(new WritePendingException()); callback.failed(new WritePendingException());
return false; return false;
} }
@ -275,8 +276,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onHeaders(HeadersFrame frame, Callback callback) private void onHeaders(HeadersFrame frame, Callback callback)
{ {
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
MetaData metaData = frame.getMetaData(); MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse()) if (metaData.isRequest() || metaData.isResponse())
{ {
@ -286,6 +285,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString()); length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
dataLength = length >= 0 ? length : Long.MIN_VALUE; dataLength = length >= 0 ? length : Long.MIN_VALUE;
} }
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
callback.succeeded(); callback.succeeded();
} }

View File

@ -163,6 +163,13 @@ public interface Stream
*/ */
public void onData(Stream stream, DataFrame frame, Callback callback); public void onData(Stream stream, DataFrame frame, Callback callback);
/**
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
*
* @param stream the stream
* @param frame the RST_FRAME received
* @param callback the callback to complete when the reset has been handled
*/
public default void onReset(Stream stream, ResetFrame frame, Callback callback) public default void onReset(Stream stream, ResetFrame frame, Callback callback)
{ {
try try
@ -214,6 +221,14 @@ public interface Stream
return true; return true;
} }
/**
* <p>Callback method invoked when the stream failed.</p>
*
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param callback the callback to complete when the failure has been handled
*/
public default void onFailure(Stream stream, int error, String reason, Callback callback) public default void onFailure(Stream stream, int error, String reason, Callback callback)
{ {
callback.succeeded(); callback.succeeded();

View File

@ -171,8 +171,10 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
@Override @Override
public boolean onIdleTimeout(Stream stream, Throwable x) public boolean onIdleTimeout(Stream stream, Throwable x)
{ {
responseFailure(x); HttpExchange exchange = getHttpExchange();
return true; if (exchange == null)
return false;
return !exchange.abort(x);
} }
@Override @Override

View File

@ -67,7 +67,9 @@ public class HttpSenderOverHTTP2 extends HttpSender
{ {
channel.setStream(stream); channel.setStream(stream);
((IStream)stream).setAttachment(channel); ((IStream)stream).setAttachment(channel);
stream.setIdleTimeout(request.getIdleTimeout()); long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
if (content.hasContent() && !expects100Continue(request)) if (content.hasContent() && !expects100Continue(request))
{ {

View File

@ -18,6 +18,22 @@
package org.eclipse.jetty.http2.client.http; package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AbstractConnectionPool; import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
@ -43,21 +59,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -370,7 +371,7 @@ public class MaxConcurrentStreamsTest extends AbstractTest
} }
@Test @Test
public void testTwoConcurrentStreamsFirstTimesOut() throws Exception public void testTwoStreamsFirstTimesOut() throws Exception
{ {
long timeout = 1000; long timeout = 1000;
start(1, new EmptyServerHandler() start(1, new EmptyServerHandler()

View File

@ -18,15 +18,6 @@
package org.eclipse.jetty.http.client; package org.eclipse.jetty.http.client;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -65,6 +56,15 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class HttpClientContinueTest extends AbstractTest<TransportScenario> public class HttpClientContinueTest extends AbstractTest<TransportScenario>
{ {
@Override @Override
@ -344,13 +344,14 @@ public class HttpClientContinueTest extends AbstractTest<TransportScenario>
} }
}); });
scenario.client.setIdleTimeout(idleTimeout); scenario.client.setIdleTimeout(2 * idleTimeout);
byte[] content = new byte[1024]; byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI()) scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content)) .content(new BytesContentProvider(content))
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.send(new BufferingResponseListener() .send(new BufferingResponseListener()
{ {
@Override @Override