SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing

This commit is contained in:
Cao Manh Dat 2019-10-09 20:56:50 +01:00
parent c0b00716bd
commit 1cda424503
4 changed files with 39 additions and 8 deletions

View File

@ -305,6 +305,9 @@ Bug Fixes
* SOLR-13376: Multi-node race condition to create/remove nodeLost markers. (hoss, ab)
* SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing.
(Cao Manh Dat)
Other Changes
----------------------

View File

@ -433,11 +433,6 @@ public class SolrCmdDistributor implements Closeable {
}
} catch (Exception e) {
log.warn("Failed to parse response from {} during replication factor accounting", node, e);
} finally {
try {
inputStream.close();
} catch (Exception ignore) {
}
}
}
return Integer.MAX_VALUE;

View File

@ -16,6 +16,7 @@
*/
package org.apache.solr.update;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.SocketException;
@ -23,6 +24,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.BaseDistributedSearchTestCase;
@ -353,6 +355,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
testDeletes(false, false);
testDeletes(true, true);
testDeletes(true, false);
getRfFromResponseShouldNotCloseTheInputStream();
}
private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
@ -531,6 +534,22 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
assertFalse(req.shouldRetry(err));
}
public void getRfFromResponseShouldNotCloseTheInputStream() {
UpdateRequest dbqReq = new UpdateRequest();
dbqReq.deleteByQuery("*:*");
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
AtomicBoolean isClosed = new AtomicBoolean(false);
ByteArrayInputStream is = new ByteArrayInputStream(new byte[100]) {
@Override
public void close() throws IOException {
isClosed.set(true);
super.close();
}
};
req.trackRequestResult(null, is, true);
assertFalse("Underlying stream should not be closed!", isClosed.get());
}
private void testReqShouldRetryMaxRetries() {
Error err = getError(new SocketException());

View File

@ -279,9 +279,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
} finally {
try {
if (rspBody != null) {
while (rspBody.read() != -1) {}
}
consumeFully(rspBody);
} catch (Exception e) {
log.error("Error consuming and closing http response stream.", e);
}
@ -295,6 +293,21 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
}
}
private void consumeFully(InputStream is) {
if (is != null) {
try (is) {
// make sure the stream is full read
is.skip(is.available());
while (is.read() != -1) {
}
} catch (UnsupportedOperationException e) {
// nothing to do then
} catch (IOException e) {
// quiet
}
}
}
private void notifyQueueAndRunnersIfEmptyQueue() {
if (queue.size() == 0) {
synchronized (queue) {
@ -512,6 +525,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
/**
* Intended to be used as an extension point for doing post processing after a request completes.
* @param respBody the body of the response, subclasses must not close this stream.
*/
public void onSuccess(Response resp, InputStream respBody) {
// no-op by design, override to add functionality