SOLR-8451: We should not call method.abort in HttpSolrClient or HttpSolrCall#remoteQuery and HttpSolrCall#remoteQuery should not close streams.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1723615 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2016-01-07 19:37:49 +00:00
parent fa640e6b69
commit 53c72a9ebd
8 changed files with 244 additions and 33 deletions

View File

@ -335,6 +335,9 @@ Bug Fixes
* SOLR-8494: SimplePostTool and therefore the bin/post script cannot upload files larger than 2.1GB. (shalin)
* SOLR-8451: We should not call method.abort in HttpSolrClient or HttpSolrCall#remoteQuery and
HttpSolrCall#remoteQuery should not close streams. (Mark Miller)
Other Changes
----------------------

View File

@ -519,7 +519,6 @@ public class HttpSolrCall {
private void remoteQuery(String coreUrl, HttpServletResponse resp) throws IOException {
HttpRequestBase method = null;
HttpEntity httpEntity = null;
boolean success = false;
try {
String urlstr = coreUrl + queryParams.toQueryString();
@ -578,24 +577,17 @@ public class HttpSolrCall {
InputStream is = httpEntity.getContent();
OutputStream os = resp.getOutputStream();
try {
IOUtils.copyLarge(is, os);
os.flush();
} finally {
IOUtils.closeQuietly(os); // TODO: I thought we weren't supposed to explicitly close servlet streams
IOUtils.closeQuietly(is);
}
IOUtils.copyLarge(is, os);
os.flush();
}
success = true;
} catch (IOException e) {
sendError(new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Error trying to proxy request for url: " + coreUrl, e));
} finally {
EntityUtils.consumeQuietly(httpEntity);
if (method != null && !success) {
method.abort();
}
}
}

View File

@ -659,6 +659,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
updateCommand = cmd;
if (zkEnabled) {
@ -1122,6 +1125,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
updateCommand = cmd;
if (!cmd.isDeleteById()) {
@ -1584,6 +1590,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
updateCommand = cmd;
List<Node> nodes = null;
boolean singleLeader = false;

View File

@ -64,12 +64,15 @@ public class TestInjection {
public static String failReplicaRequests = null;
public static String failUpdateRequests = null;
private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
public static void reset() {
nonGracefullClose = null;
failReplicaRequests = null;
failUpdateRequests = null;
for (Timer timer : timers) {
timer.cancel();
@ -127,6 +130,19 @@ public class TestInjection {
return true;
}
public static boolean injectFailUpdateRequests() {
if (failUpdateRequests != null) {
Pair<Boolean,Integer> pair = parseValue(failUpdateRequests);
boolean enabled = pair.getKey();
int chanceIn100 = pair.getValue();
if (enabled && RANDOM.nextInt(100) >= (100 - chanceIn100)) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Random test update fail");
}
}
return true;
}
private static Pair<Boolean,Integer> parseValue(String raw) {
Matcher m = ENABLED_PERCENT.matcher(raw);
if (!m.matches()) throw new RuntimeException("No match, probably bad syntax: " + raw);

View File

@ -27,4 +27,10 @@ log4j.logger.org.apache.solr.hadoop=INFO
#log4j.logger.org.apache.solr.handler.IndexFetcher=DEBUG
#log4j.logger.org.apache.solr.common.cloud.ClusterStateUtil=DEBUG
#log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG
#log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG
#log4j.logger.org.apache.http.impl.conn.PoolingClientConnectionManager=DEBUG
#log4j.logger.org.apache.http.impl.conn.BasicClientConnectionManager=DEBUG
#log4j.logger.org.apache.http=DEBUG
#log4j.logger.org.apache.solr.client.solrj.impl.SolrHttpRequestRetryHandler=DEBUG
#log4j.logger.org.eclipse.jetty.server=DEBUG

View File

@ -0,0 +1,190 @@
package org.apache.solr.client.solrj;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.HttpConnectionMetrics;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.util.TestInjection;
import org.junit.BeforeClass;
import org.junit.Test;
@SuppressSSL
public class ConnectionReuseTest extends AbstractFullDistribZkTestBase {
private AtomicInteger id = new AtomicInteger();
@BeforeClass
public static void beforeConnectionReuseTest() {
if (true) TestInjection.failUpdateRequests = "true:100";
}
public ConnectionReuseTest() {
fixShardCount(1);
sliceCount = 1;
stress = 0;
}
public static String getSchemaFile() {
return "schema.xml";
}
public static String getSolrConfigFile() {
// use this because it has /update and is minimal
return "solrconfig-tlog.xml";
}
@Test
public void test() throws Exception {
URL url = new URL(((HttpSolrClient) clients.get(0)).getBaseURL());
SolrClient client;
HttpClient httpClient = HttpClientUtil.createClient(null);
int rndClient = 0;//random().nextInt(3);
if (rndClient == 0) {
client = new ConcurrentUpdateSolrClient(url.toString(), httpClient, 6, 1); // currently only testing with 1 thread
} else if (rndClient == 1) {
client = new HttpSolrClient(url.toString(), httpClient);
} else if (rndClient == 2) {
client = new CloudSolrClient(zkServer.getZkAddress(), random().nextBoolean(), httpClient);
((CloudSolrClient) client).setParallelUpdates(random().nextBoolean());
((CloudSolrClient) client).setDefaultCollection(DEFAULT_COLLECTION);
((CloudSolrClient) client).getLbClient().setConnectionTimeout(30000);
((CloudSolrClient) client).getLbClient().setSoTimeout(60000);
} else {
throw new RuntimeException("impossible");
}
PoolingClientConnectionManager cm = (PoolingClientConnectionManager) httpClient.getConnectionManager();
HttpHost target = new HttpHost(url.getHost(), url.getPort(), isSSLMode() ? "https" : "http");
HttpRoute route = new HttpRoute(target);
ClientConnectionRequest mConn = getClientConnectionRequest(httpClient, route);
ManagedClientConnection conn1 = getConn(mConn);
headerRequest(target, route, conn1);
conn1.releaseConnection();
cm.releaseConnection(conn1, -1, TimeUnit.MILLISECONDS);
int queueBreaks = 0;
int cnt1 = atLeast(3);
int cnt2 = atLeast(30);
for (int j = 0; j < cnt1; j++) {
for (int i = 0; i < cnt2; i++) {
boolean done = false;
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id.incrementAndGet());
try {
client.add(c.solrDoc);
} catch (Exception e) {
e.printStackTrace();
}
if (!done && i > 0 && i < cnt2 - 1 && client instanceof ConcurrentUpdateSolrClient && random().nextInt(10) > 8) {
queueBreaks++;
done = true;
Thread.sleep(350); // wait past streaming client poll time of 250ms
}
}
if (client instanceof ConcurrentUpdateSolrClient) {
((ConcurrentUpdateSolrClient) client).blockUntilFinished();
}
}
route = new HttpRoute(new HttpHost(url.getHost(), url.getPort(), isSSLMode() ? "https" : "http"));
mConn = cm.requestConnection(route, null);
ManagedClientConnection conn2 = getConn(mConn);
HttpConnectionMetrics metrics = conn2.getMetrics();
headerRequest(target, route, conn2);
conn2.releaseConnection();
cm.releaseConnection(conn2, -1, TimeUnit.MILLISECONDS);
assertNotNull("No connection metrics found - is the connection getting aborted? server closing the connection? " + client.getClass().getSimpleName(), metrics);
// we try and make sure the connection we get has handled all of the requests in this test
if (client instanceof ConcurrentUpdateSolrClient) {
// we can't fully control queue polling breaking up requests - allow a bit of leeway
int exp = cnt1 + queueBreaks + 2;
assertTrue(
"We expected all communication via streaming client to use one connection! expected=" + exp + " got="
+ metrics.getRequestCount(),
Math.max(exp, metrics.getRequestCount()) - Math.min(exp, metrics.getRequestCount()) < 3);
} else {
assertTrue("We expected all communication to use one connection! " + client.getClass().getSimpleName(),
cnt1 * cnt2 + 2 <= metrics.getRequestCount());
}
client.close();
}
public ManagedClientConnection getConn(ClientConnectionRequest mConn)
throws InterruptedException, ConnectionPoolTimeoutException {
ManagedClientConnection conn = mConn.getConnection(30, TimeUnit.SECONDS);
conn.setIdleDuration(-1, TimeUnit.MILLISECONDS);
conn.markReusable();
return conn;
}
public void headerRequest(HttpHost target, HttpRoute route, ManagedClientConnection conn)
throws IOException, HttpException {
HttpRequest req = new BasicHttpRequest("OPTIONS", "*", HttpVersion.HTTP_1_1);
req.addHeader("Host", target.getHostName());
BasicHttpParams p = new BasicHttpParams();
HttpProtocolParams.setVersion(p, HttpVersion.HTTP_1_1);
if (!conn.isOpen()) conn.open(route, new BasicHttpContext(null), p);
conn.sendRequestHeader(req);
conn.flush();
conn.receiveResponseHeader();
}
public ClientConnectionRequest getClientConnectionRequest(HttpClient httpClient, HttpRoute route) {
ClientConnectionRequest mConn = ((PoolingClientConnectionManager) httpClient.getConnectionManager()).requestConnection(route, null);
return mConn;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentProducer;
import org.apache.http.entity.EntityTemplate;
import org.apache.http.util.EntityUtils;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
@ -56,11 +58,8 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ConcurrentUpdateSolrClient buffers all added documents and writes
@ -196,6 +195,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
HttpPost method = null;
HttpResponse response = null;
InputStream rspBody = null;
try {
final UpdateRequest updateRequest =
queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
@ -278,6 +278,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
method.addHeader("Content-Type", contentType);
response = client.getHttpClient().execute(method);
rspBody = response.getEntity().getContent();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
@ -295,7 +296,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
encoding = param.getValue();
}
}
NamedList<Object> resp = client.parser.processResponse(response.getEntity().getContent(), encoding);
NamedList<Object> resp = client.parser.processResponse(rspBody, encoding);
NamedList<Object> error = (NamedList<Object>) resp.get("error");
if (error != null) {
solrExc.setMetadata((NamedList<String>) error.get("metadata"));
@ -312,10 +313,10 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} finally {
try {
if (response != null) {
response.getEntity().getContent().close();
EntityUtils.consume(response.getEntity());
}
} catch (Exception ex) {
log.warn("", ex);
} catch (Exception e) {
log.error("Error consuming and closing http response stream.", e);
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
@ -77,8 +78,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* A SolrClient implementation that talks directly to a Solr server via HTTP
*
@ -472,16 +471,17 @@ public class HttpSolrClient extends SolrClient {
protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser processor) throws SolrServerException {
method.addHeader("User-Agent", AGENT);
HttpEntity entity = null;
InputStream respBody = null;
boolean shouldClose = true;
boolean success = false;
try {
// Execute the method.
final HttpResponse response = httpClient.execute(method);
int httpStatus = response.getStatusLine().getStatusCode();
// Read the contents
respBody = response.getEntity().getContent();
entity = response.getEntity();
respBody = entity.getContent();
Header ctHeader = response.getLastHeader("content-type");
String contentType;
if (ctHeader != null) {
@ -517,7 +517,6 @@ public class HttpSolrClient extends SolrClient {
rsp.add("stream", respBody);
// Only case where stream should not be closed
shouldClose = false;
success = true;
return rsp;
}
@ -576,7 +575,6 @@ public class HttpSolrClient extends SolrClient {
if (metadata != null) rss.setMetadata(metadata);
throw rss;
}
success = true;
return rsp;
} catch (ConnectException e) {
throw new SolrServerException("Server refused connection at: "
@ -589,15 +587,11 @@ public class HttpSolrClient extends SolrClient {
throw new SolrServerException(
"IOException occured when talking to server at: " + getBaseURL(), e);
} finally {
if (respBody != null && shouldClose) {
if (shouldClose) {
try {
respBody.close();
EntityUtils.consume(entity);
} catch (IOException e) {
log.error("", e);
} finally {
if (!success) {
method.abort();
}
log.error("Error consuming and closing http response stream.", e);
}
}
}