diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 459245de6ff..0d685b47682 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -335,6 +335,9 @@ Bug Fixes * SOLR-8494: SimplePostTool and therefore the bin/post script cannot upload files larger than 2.1GB. (shalin) +* SOLR-8451: We should not call method.abort in HttpSolrClient or HttpSolrCall#remoteQuery and + HttpSolrCall#remoteQuery should not close streams. (Mark Miller) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java index 25286ec1792..fb92376ed9e 100644 --- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java +++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java @@ -519,7 +519,6 @@ public class HttpSolrCall { private void remoteQuery(String coreUrl, HttpServletResponse resp) throws IOException { HttpRequestBase method = null; HttpEntity httpEntity = null; - boolean success = false; try { String urlstr = coreUrl + queryParams.toQueryString(); @@ -578,24 +577,17 @@ public class HttpSolrCall { InputStream is = httpEntity.getContent(); OutputStream os = resp.getOutputStream(); - try { - IOUtils.copyLarge(is, os); - os.flush(); - } finally { - IOUtils.closeQuietly(os); // TODO: I thought we weren't supposed to explicitly close servlet streams - IOUtils.closeQuietly(is); - } + + IOUtils.copyLarge(is, os); + os.flush(); } - success = true; + } catch (IOException e) { sendError(new SolrException( SolrException.ErrorCode.SERVER_ERROR, "Error trying to proxy request for url: " + coreUrl, e)); } finally { EntityUtils.consumeQuietly(httpEntity); - if (method != null && !success) { - method.abort(); - } } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 5a50690bb05..9c691f4f1be 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -659,6 +659,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { @Override public void processAdd(AddUpdateCommand cmd) throws IOException { + + assert TestInjection.injectFailUpdateRequests(); + updateCommand = cmd; if (zkEnabled) { @@ -1122,6 +1125,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { @Override public void processDelete(DeleteUpdateCommand cmd) throws IOException { + + assert TestInjection.injectFailUpdateRequests(); + updateCommand = cmd; if (!cmd.isDeleteById()) { @@ -1584,6 +1590,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { @Override public void processCommit(CommitUpdateCommand cmd) throws IOException { + + assert TestInjection.injectFailUpdateRequests(); + updateCommand = cmd; List nodes = null; boolean singleLeader = false; diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java index 5416234b43c..9861a1065a5 100644 --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java @@ -64,12 +64,15 @@ public class TestInjection { public static String failReplicaRequests = null; + public static String failUpdateRequests = null; + private static Set timers = Collections.synchronizedSet(new HashSet()); public static void reset() { nonGracefullClose = null; failReplicaRequests = null; + failUpdateRequests = null; for (Timer timer : timers) { timer.cancel(); @@ -127,6 +130,19 @@ public class TestInjection { return true; } + public static boolean injectFailUpdateRequests() { + if (failUpdateRequests != null) { + Pair pair = parseValue(failUpdateRequests); + boolean enabled = pair.getKey(); + int chanceIn100 = pair.getValue(); + if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Random test update fail"); + } + } + + return true; + } + private static Pair parseValue(String raw) { Matcher m = ENABLED_PERCENT.matcher(raw); if (!m.matches()) throw new RuntimeException("No match, probably bad syntax: " + raw); diff --git a/solr/core/src/test-files/log4j.properties b/solr/core/src/test-files/log4j.properties index 86446e93953..51d9dbf7a10 100644 --- a/solr/core/src/test-files/log4j.properties +++ b/solr/core/src/test-files/log4j.properties @@ -27,4 +27,10 @@ log4j.logger.org.apache.solr.hadoop=INFO #log4j.logger.org.apache.solr.handler.IndexFetcher=DEBUG #log4j.logger.org.apache.solr.common.cloud.ClusterStateUtil=DEBUG -#log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG \ No newline at end of file +#log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG + +#log4j.logger.org.apache.http.impl.conn.PoolingClientConnectionManager=DEBUG +#log4j.logger.org.apache.http.impl.conn.BasicClientConnectionManager=DEBUG +#log4j.logger.org.apache.http=DEBUG +#log4j.logger.org.apache.solr.client.solrj.impl.SolrHttpRequestRetryHandler=DEBUG +#log4j.logger.org.eclipse.jetty.server=DEBUG \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/client/solrj/ConnectionReuseTest.java b/solr/core/src/test/org/apache/solr/client/solrj/ConnectionReuseTest.java new file mode 100644 index 00000000000..8dbf99c1dda --- /dev/null +++ b/solr/core/src/test/org/apache/solr/client/solrj/ConnectionReuseTest.java @@ -0,0 +1,190 @@ +package org.apache.solr.client.solrj; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.net.URL; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.http.HttpConnectionMetrics; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpVersion; +import org.apache.http.client.HttpClient; +import org.apache.http.conn.ClientConnectionRequest; +import org.apache.http.conn.ConnectionPoolTimeoutException; +import org.apache.http.conn.ManagedClientConnection; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.impl.conn.PoolingClientConnectionManager; +import org.apache.http.message.BasicHttpRequest; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpProtocolParams; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.cloud.AbstractFullDistribZkTestBase; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.util.TestInjection; +import org.junit.BeforeClass; +import org.junit.Test; + +@SuppressSSL +public class ConnectionReuseTest extends AbstractFullDistribZkTestBase { + + private AtomicInteger id = new AtomicInteger(); + + @BeforeClass + public static void beforeConnectionReuseTest() { + if (true) TestInjection.failUpdateRequests = "true:100"; + } + + public ConnectionReuseTest() { + fixShardCount(1); + sliceCount = 1; + stress = 0; + } + + public static String getSchemaFile() { + return "schema.xml"; + } + + public static String getSolrConfigFile() { + // use this because it has /update and is minimal + return "solrconfig-tlog.xml"; + } + + @Test + public void test() throws Exception { + URL url = new URL(((HttpSolrClient) clients.get(0)).getBaseURL()); + + SolrClient client; + HttpClient httpClient = HttpClientUtil.createClient(null); + int rndClient = 0;//random().nextInt(3); + if (rndClient == 0) { + client = new ConcurrentUpdateSolrClient(url.toString(), httpClient, 6, 1); // currently only testing with 1 thread + } else if (rndClient == 1) { + client = new HttpSolrClient(url.toString(), httpClient); + } else if (rndClient == 2) { + client = new CloudSolrClient(zkServer.getZkAddress(), random().nextBoolean(), httpClient); + ((CloudSolrClient) client).setParallelUpdates(random().nextBoolean()); + ((CloudSolrClient) client).setDefaultCollection(DEFAULT_COLLECTION); + ((CloudSolrClient) client).getLbClient().setConnectionTimeout(30000); + ((CloudSolrClient) client).getLbClient().setSoTimeout(60000); + } else { + throw new RuntimeException("impossible"); + } + + PoolingClientConnectionManager cm = (PoolingClientConnectionManager) httpClient.getConnectionManager(); + + HttpHost target = new HttpHost(url.getHost(), url.getPort(), isSSLMode() ? "https" : "http"); + HttpRoute route = new HttpRoute(target); + + ClientConnectionRequest mConn = getClientConnectionRequest(httpClient, route); + + ManagedClientConnection conn1 = getConn(mConn); + headerRequest(target, route, conn1); + conn1.releaseConnection(); + cm.releaseConnection(conn1, -1, TimeUnit.MILLISECONDS); + + int queueBreaks = 0; + int cnt1 = atLeast(3); + int cnt2 = atLeast(30); + for (int j = 0; j < cnt1; j++) { + for (int i = 0; i < cnt2; i++) { + boolean done = false; + AddUpdateCommand c = new AddUpdateCommand(null); + c.solrDoc = sdoc("id", id.incrementAndGet()); + try { + client.add(c.solrDoc); + } catch (Exception e) { + e.printStackTrace(); + } + if (!done && i > 0 && i < cnt2 - 1 && client instanceof ConcurrentUpdateSolrClient && random().nextInt(10) > 8) { + queueBreaks++; + done = true; + Thread.sleep(350); // wait past streaming client poll time of 250ms + } + } + if (client instanceof ConcurrentUpdateSolrClient) { + ((ConcurrentUpdateSolrClient) client).blockUntilFinished(); + } + } + + route = new HttpRoute(new HttpHost(url.getHost(), url.getPort(), isSSLMode() ? "https" : "http")); + + mConn = cm.requestConnection(route, null); + + ManagedClientConnection conn2 = getConn(mConn); + + HttpConnectionMetrics metrics = conn2.getMetrics(); + headerRequest(target, route, conn2); + conn2.releaseConnection(); + cm.releaseConnection(conn2, -1, TimeUnit.MILLISECONDS); + + + assertNotNull("No connection metrics found - is the connection getting aborted? server closing the connection? " + client.getClass().getSimpleName(), metrics); + + // we try and make sure the connection we get has handled all of the requests in this test + if (client instanceof ConcurrentUpdateSolrClient) { + // we can't fully control queue polling breaking up requests - allow a bit of leeway + int exp = cnt1 + queueBreaks + 2; + assertTrue( + "We expected all communication via streaming client to use one connection! expected=" + exp + " got=" + + metrics.getRequestCount(), + Math.max(exp, metrics.getRequestCount()) - Math.min(exp, metrics.getRequestCount()) < 3); + } else { + assertTrue("We expected all communication to use one connection! " + client.getClass().getSimpleName(), + cnt1 * cnt2 + 2 <= metrics.getRequestCount()); + } + + client.close(); + } + + public ManagedClientConnection getConn(ClientConnectionRequest mConn) + throws InterruptedException, ConnectionPoolTimeoutException { + ManagedClientConnection conn = mConn.getConnection(30, TimeUnit.SECONDS); + conn.setIdleDuration(-1, TimeUnit.MILLISECONDS); + conn.markReusable(); + return conn; + } + + public void headerRequest(HttpHost target, HttpRoute route, ManagedClientConnection conn) + throws IOException, HttpException { + HttpRequest req = new BasicHttpRequest("OPTIONS", "*", HttpVersion.HTTP_1_1); + + req.addHeader("Host", target.getHostName()); + BasicHttpParams p = new BasicHttpParams(); + HttpProtocolParams.setVersion(p, HttpVersion.HTTP_1_1); + if (!conn.isOpen()) conn.open(route, new BasicHttpContext(null), p); + conn.sendRequestHeader(req); + conn.flush(); + conn.receiveResponseHeader(); + } + + public ClientConnectionRequest getClientConnectionRequest(HttpClient httpClient, HttpRoute route) { + ClientConnectionRequest mConn = ((PoolingClientConnectionManager) httpClient.getConnectionManager()).requestConnection(route, null); + return mConn; + } + +} + diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java index 1aa5a706351..1fe75461d5a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java @@ -24,6 +24,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentProducer; import org.apache.http.entity.EntityTemplate; +import org.apache.http.util.EntityUtils; import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; @@ -56,11 +58,8 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * ConcurrentUpdateSolrClient buffers all added documents and writes @@ -196,6 +195,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { HttpPost method = null; HttpResponse response = null; + InputStream rspBody = null; try { final UpdateRequest updateRequest = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS); @@ -278,6 +278,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { method.addHeader("Content-Type", contentType); response = client.getHttpClient().execute(method); + rspBody = response.getEntity().getContent(); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != HttpStatus.SC_OK) { StringBuilder msg = new StringBuilder(); @@ -295,7 +296,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient { encoding = param.getValue(); } } - NamedList resp = client.parser.processResponse(response.getEntity().getContent(), encoding); + NamedList resp = client.parser.processResponse(rspBody, encoding); NamedList error = (NamedList) resp.get("error"); if (error != null) { solrExc.setMetadata((NamedList) error.get("metadata")); @@ -312,10 +313,10 @@ public class ConcurrentUpdateSolrClient extends SolrClient { } finally { try { if (response != null) { - response.getEntity().getContent().close(); + EntityUtils.consume(response.getEntity()); } - } catch (Exception ex) { - log.warn("", ex); + } catch (Exception e) { + log.error("Error consuming and closing http response stream.", e); } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java index 2d1319253b7..b529fa965f8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java @@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl; import org.apache.commons.io.IOUtils; import org.apache.http.Header; +import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; @@ -77,8 +78,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * A SolrClient implementation that talks directly to a Solr server via HTTP * @@ -472,16 +471,17 @@ public class HttpSolrClient extends SolrClient { protected NamedList executeMethod(HttpRequestBase method, final ResponseParser processor) throws SolrServerException { method.addHeader("User-Agent", AGENT); + HttpEntity entity = null; InputStream respBody = null; boolean shouldClose = true; - boolean success = false; try { // Execute the method. final HttpResponse response = httpClient.execute(method); int httpStatus = response.getStatusLine().getStatusCode(); // Read the contents - respBody = response.getEntity().getContent(); + entity = response.getEntity(); + respBody = entity.getContent(); Header ctHeader = response.getLastHeader("content-type"); String contentType; if (ctHeader != null) { @@ -517,7 +517,6 @@ public class HttpSolrClient extends SolrClient { rsp.add("stream", respBody); // Only case where stream should not be closed shouldClose = false; - success = true; return rsp; } @@ -576,7 +575,6 @@ public class HttpSolrClient extends SolrClient { if (metadata != null) rss.setMetadata(metadata); throw rss; } - success = true; return rsp; } catch (ConnectException e) { throw new SolrServerException("Server refused connection at: " @@ -589,15 +587,11 @@ public class HttpSolrClient extends SolrClient { throw new SolrServerException( "IOException occured when talking to server at: " + getBaseURL(), e); } finally { - if (respBody != null && shouldClose) { + if (shouldClose) { try { - respBody.close(); + EntityUtils.consume(entity); } catch (IOException e) { - log.error("", e); - } finally { - if (!success) { - method.abort(); - } + log.error("Error consuming and closing http response stream.", e); } } }