Fixed release of connections when the exchange is terminated.
This commit is contained in:
parent
05f4367790
commit
df60fd5c2d
|
@ -27,11 +27,15 @@ import org.eclipse.jetty.client.api.Result;
|
||||||
import org.eclipse.jetty.fcgi.generator.Flusher;
|
import org.eclipse.jetty.fcgi.generator.Flusher;
|
||||||
import org.eclipse.jetty.fcgi.generator.Generator;
|
import org.eclipse.jetty.fcgi.generator.Generator;
|
||||||
import org.eclipse.jetty.http.HttpField;
|
import org.eclipse.jetty.http.HttpField;
|
||||||
|
import org.eclipse.jetty.http.HttpFields;
|
||||||
|
import org.eclipse.jetty.http.HttpHeader;
|
||||||
|
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||||
import org.eclipse.jetty.http.HttpVersion;
|
import org.eclipse.jetty.http.HttpVersion;
|
||||||
import org.eclipse.jetty.io.IdleTimeout;
|
import org.eclipse.jetty.io.IdleTimeout;
|
||||||
|
|
||||||
public class HttpChannelOverFCGI extends HttpChannel
|
public class HttpChannelOverFCGI extends HttpChannel
|
||||||
{
|
{
|
||||||
|
private final HttpConnectionOverFCGI connection;
|
||||||
private final Flusher flusher;
|
private final Flusher flusher;
|
||||||
private final int request;
|
private final int request;
|
||||||
private final HttpSenderOverFCGI sender;
|
private final HttpSenderOverFCGI sender;
|
||||||
|
@ -42,6 +46,7 @@ public class HttpChannelOverFCGI extends HttpChannel
|
||||||
public HttpChannelOverFCGI(final HttpConnectionOverFCGI connection, Flusher flusher, int request, long idleTimeout)
|
public HttpChannelOverFCGI(final HttpConnectionOverFCGI connection, Flusher flusher, int request, long idleTimeout)
|
||||||
{
|
{
|
||||||
super(connection.getHttpDestination());
|
super(connection.getHttpDestination());
|
||||||
|
this.connection = connection;
|
||||||
this.flusher = flusher;
|
this.flusher = flusher;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.sender = new HttpSenderOverFCGI(this);
|
this.sender = new HttpSenderOverFCGI(this);
|
||||||
|
@ -121,6 +126,13 @@ public class HttpChannelOverFCGI extends HttpChannel
|
||||||
{
|
{
|
||||||
super.exchangeTerminated(result);
|
super.exchangeTerminated(result);
|
||||||
idle.onClose();
|
idle.onClose();
|
||||||
|
boolean close = result.isFailed();
|
||||||
|
HttpFields responseHeaders = result.getResponse().getHeaders();
|
||||||
|
close |= responseHeaders.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
|
||||||
|
if (close)
|
||||||
|
connection.close();
|
||||||
|
else
|
||||||
|
connection.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void flush(Generator.Result... results)
|
protected void flush(Generator.Result... results)
|
||||||
|
|
|
@ -24,11 +24,13 @@ import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.eclipse.jetty.client.HttpClient;
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
import org.eclipse.jetty.client.HttpConnection;
|
import org.eclipse.jetty.client.HttpConnection;
|
||||||
import org.eclipse.jetty.client.HttpDestination;
|
import org.eclipse.jetty.client.HttpDestination;
|
||||||
import org.eclipse.jetty.client.HttpExchange;
|
import org.eclipse.jetty.client.HttpExchange;
|
||||||
|
import org.eclipse.jetty.client.PoolingHttpDestination;
|
||||||
import org.eclipse.jetty.client.api.Connection;
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.api.Response;
|
import org.eclipse.jetty.client.api.Response;
|
||||||
|
@ -49,6 +51,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
||||||
|
|
||||||
private final LinkedList<Integer> requests = new LinkedList<>();
|
private final LinkedList<Integer> requests = new LinkedList<>();
|
||||||
private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
|
private final Map<Integer, HttpChannelOverFCGI> channels = new ConcurrentHashMap<>();
|
||||||
|
private final AtomicBoolean closed = new AtomicBoolean();
|
||||||
private final Flusher flusher;
|
private final Flusher flusher;
|
||||||
private final HttpDestination destination;
|
private final HttpDestination destination;
|
||||||
private final Delegate delegate;
|
private final Delegate delegate;
|
||||||
|
@ -153,14 +156,28 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void release()
|
||||||
|
{
|
||||||
|
if (destination instanceof PoolingHttpDestination)
|
||||||
|
{
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
PoolingHttpDestination<HttpConnectionOverFCGI> fcgiDestination =
|
||||||
|
(PoolingHttpDestination<HttpConnectionOverFCGI>)destination;
|
||||||
|
fcgiDestination.release(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
getHttpDestination().close(this);
|
if (closed.compareAndSet(false, true))
|
||||||
getEndPoint().shutdownOutput();
|
{
|
||||||
LOG.debug("{} oshut", this);
|
getHttpDestination().close(this);
|
||||||
getEndPoint().close();
|
getEndPoint().shutdownOutput();
|
||||||
LOG.debug("{} closed", this);
|
LOG.debug("{} oshut", this);
|
||||||
|
getEndPoint().close();
|
||||||
|
LOG.debug("{} closed", this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int acquireRequest()
|
private int acquireRequest()
|
||||||
|
|
Loading…
Reference in New Issue