Merge pull request #3632 from eclipse/jetty-9.4.x-3605-http2_client_channel_recycle

Fixes #3605 - IdleTimeout with Jetty HTTP/2 and InputStreamResponseListener
This commit is contained in:
Simone Bordet 2019-05-20 10:38:01 +02:00 committed by GitHub
commit 6e52c8cb61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 15 deletions

View File

@ -51,6 +51,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger(); private final AtomicInteger sweeps = new AtomicInteger();
private final Session session; private final Session session;
private boolean recycleHttpChannels;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session) public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{ {
@ -63,6 +64,16 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
return session; return session;
} }
public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;
}
public void setRecycleHttpChannels(boolean recycleHttpChannels)
{
this.recycleHttpChannels = recycleHttpChannels;
}
@Override @Override
protected SendFailure send(HttpExchange exchange) protected SendFailure send(HttpExchange exchange)
{ {
@ -99,7 +110,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
// Recycle only non-failed channels. // Recycle only non-failed channels.
if (channel.isFailed()) if (channel.isFailed())
channel.destroy(); channel.destroy();
else else if (isRecycleHttpChannels())
idleChannels.offer(channel); idleChannels.offer(channel);
} }
else else

View File

@ -213,21 +213,21 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
@Override @Override
protected Action process() protected Action process()
{ {
DataInfo dataInfo; if (dataInfo != null)
{
dataInfo.callback.succeeded();
if (dataInfo.frame.isEndStream())
return Action.SUCCEEDED;
}
synchronized (this) synchronized (this)
{ {
dataInfo = queue.poll(); dataInfo = queue.poll();
} }
if (dataInfo == null) if (dataInfo == null)
{
DataInfo prevDataInfo = this.dataInfo;
if (prevDataInfo != null && prevDataInfo.frame.isEndStream())
return Action.SUCCEEDED;
return Action.IDLE; return Action.IDLE;
}
this.dataInfo = dataInfo;
ByteBuffer buffer = dataInfo.frame.getData(); ByteBuffer buffer = dataInfo.frame.getData();
if (buffer.hasRemaining()) if (buffer.hasRemaining())
responseContent(dataInfo.exchange, buffer, this); responseContent(dataInfo.exchange, buffer, this);
@ -244,13 +244,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
((Retainable)callback).retain(); ((Retainable)callback).retain();
} }
@Override
public void succeeded()
{
dataInfo.callback.succeeded();
super.succeeded();
}
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
@ -263,6 +256,14 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
dataInfo.callback.failed(failure); dataInfo.callback.failed(failure);
responseFailure(failure); responseFailure(failure);
} }
@Override
public boolean reset()
{
queue.clear();
dataInfo = null;
return super.reset();
}
} }
private static class DataInfo private static class DataInfo