HTTPCLIENT-2184: Fix classic client connection reuse
This fixes an issue in which connections were not returned to the pool when requests contained non-repeatable bodies AND responses were streamed. When both of these criteria were met, responses were returned without ResponseEntityProxy enhancements so that closing the response entity or stream no longer completed the exchange, thus leaking the connection which forever lived in the `leased` state in the connection pool.
This commit is contained in:
parent
4d0caa4f42
commit
e09c5d0691
|
@ -27,11 +27,18 @@
|
||||||
|
|
||||||
package org.apache.hc.client5.testing.sync;
|
package org.apache.hc.client5.testing.sync;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hc.client5.http.classic.methods.HttpGet;
|
import org.apache.hc.client5.http.classic.methods.HttpGet;
|
||||||
|
import org.apache.hc.client5.http.classic.methods.HttpPost;
|
||||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||||
|
import org.apache.hc.core5.http.ClassicHttpRequest;
|
||||||
|
import org.apache.hc.core5.http.ContentType;
|
||||||
import org.apache.hc.core5.http.EntityDetails;
|
import org.apache.hc.core5.http.EntityDetails;
|
||||||
import org.apache.hc.core5.http.Header;
|
import org.apache.hc.core5.http.Header;
|
||||||
import org.apache.hc.core5.http.HeaderElements;
|
import org.apache.hc.core5.http.HeaderElements;
|
||||||
|
@ -42,6 +49,7 @@ import org.apache.hc.core5.http.HttpResponse;
|
||||||
import org.apache.hc.core5.http.HttpResponseInterceptor;
|
import org.apache.hc.core5.http.HttpResponseInterceptor;
|
||||||
import org.apache.hc.core5.http.impl.HttpProcessors;
|
import org.apache.hc.core5.http.impl.HttpProcessors;
|
||||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||||
|
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
|
||||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||||
import org.apache.hc.core5.http.protocol.HttpProcessor;
|
import org.apache.hc.core5.http.protocol.HttpProcessor;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -80,6 +88,42 @@ public class TestConnectionReuse extends LocalServerTestBase {
|
||||||
Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
|
Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
|
||||||
|
this.connManager.setMaxTotal(5);
|
||||||
|
this.connManager.setDefaultMaxPerRoute(5);
|
||||||
|
|
||||||
|
final HttpHost target = start();
|
||||||
|
|
||||||
|
final WorkerThread[] workers = new WorkerThread[10];
|
||||||
|
for (int i = 0; i < workers.length; i++) {
|
||||||
|
final List<ClassicHttpRequest> requests = new ArrayList<>();
|
||||||
|
for (int j = 0; j < 10; j++) {
|
||||||
|
final HttpPost post = new HttpPost(new URI("/random/2000"));
|
||||||
|
// non-repeatable
|
||||||
|
post.setEntity(new InputStreamEntity(
|
||||||
|
new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
|
||||||
|
ContentType.APPLICATION_OCTET_STREAM));
|
||||||
|
requests.add(post);
|
||||||
|
}
|
||||||
|
workers[i] = new WorkerThread(this.httpclient, target, false, requests);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final WorkerThread worker : workers) {
|
||||||
|
worker.start();
|
||||||
|
}
|
||||||
|
for (final WorkerThread worker : workers) {
|
||||||
|
worker.join(10000);
|
||||||
|
final Exception ex = worker.getException();
|
||||||
|
if (ex != null) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expect some connection in the pool
|
||||||
|
Assert.assertTrue(this.connManager.getTotalStats().getAvailable() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
private static class AlwaysCloseConn implements HttpResponseInterceptor {
|
private static class AlwaysCloseConn implements HttpResponseInterceptor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -203,11 +247,10 @@ public class TestConnectionReuse extends LocalServerTestBase {
|
||||||
|
|
||||||
private static class WorkerThread extends Thread {
|
private static class WorkerThread extends Thread {
|
||||||
|
|
||||||
private final URI requestURI;
|
|
||||||
private final HttpHost target;
|
private final HttpHost target;
|
||||||
private final CloseableHttpClient httpclient;
|
private final CloseableHttpClient httpclient;
|
||||||
private final int repetitions;
|
|
||||||
private final boolean forceClose;
|
private final boolean forceClose;
|
||||||
|
private final List<ClassicHttpRequest> requests;
|
||||||
|
|
||||||
private volatile Exception exception;
|
private volatile Exception exception;
|
||||||
|
|
||||||
|
@ -219,18 +262,31 @@ public class TestConnectionReuse extends LocalServerTestBase {
|
||||||
final boolean forceClose) {
|
final boolean forceClose) {
|
||||||
super();
|
super();
|
||||||
this.httpclient = httpclient;
|
this.httpclient = httpclient;
|
||||||
this.requestURI = requestURI;
|
|
||||||
this.target = target;
|
this.target = target;
|
||||||
this.repetitions = repetitions;
|
|
||||||
this.forceClose = forceClose;
|
this.forceClose = forceClose;
|
||||||
|
this.requests = new ArrayList<>(repetitions);
|
||||||
|
for (int i = 0; i < repetitions; i++) {
|
||||||
|
requests.add(new HttpGet(requestURI));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerThread(
|
||||||
|
final CloseableHttpClient httpclient,
|
||||||
|
final HttpHost target,
|
||||||
|
final boolean forceClose,
|
||||||
|
final List<ClassicHttpRequest> requests) {
|
||||||
|
super();
|
||||||
|
this.httpclient = httpclient;
|
||||||
|
this.target = target;
|
||||||
|
this.forceClose = forceClose;
|
||||||
|
this.requests = requests;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < this.repetitions; i++) {
|
for (final ClassicHttpRequest request : requests) {
|
||||||
final HttpGet httpget = new HttpGet(this.requestURI);
|
this.httpclient.execute(this.target, request, response -> {
|
||||||
this.httpclient.execute(this.target, httpget, response -> {
|
|
||||||
if (this.forceClose) {
|
if (this.forceClose) {
|
||||||
response.close();
|
response.close();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -201,6 +201,7 @@ public final class ProtocolExec implements ExecChainHandler {
|
||||||
|
|
||||||
if (Method.TRACE.isSame(request.getMethod())) {
|
if (Method.TRACE.isSame(request.getMethod())) {
|
||||||
// Do not perform authentication for TRACE request
|
// Do not perform authentication for TRACE request
|
||||||
|
ResponseEntityProxy.enhance(response, execRuntime);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
final HttpEntity requestEntity = request.getEntity();
|
final HttpEntity requestEntity = request.getEntity();
|
||||||
|
@ -208,6 +209,7 @@ public final class ProtocolExec implements ExecChainHandler {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("{} Cannot retry non-repeatable request", exchangeId);
|
LOG.debug("{} Cannot retry non-repeatable request", exchangeId);
|
||||||
}
|
}
|
||||||
|
ResponseEntityProxy.enhance(response, execRuntime);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
if (needAuthentication(
|
if (needAuthentication(
|
||||||
|
|
Loading…
Reference in New Issue