diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index d57a8c539f3..d4602b37991 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -305,6 +305,9 @@ Bug Fixes * SOLR-13376: Multi-node race condition to create/remove nodeLost markers. (hoss, ab) +* SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing. + (Cao Manh Dat) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 59106849b10..5098cd1daa7 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -433,11 +433,6 @@ public class SolrCmdDistributor implements Closeable { } } catch (Exception e) { log.warn("Failed to parse response from {} during replication factor accounting", node, e); - } finally { - try { - inputStream.close(); - } catch (Exception ignore) { - } } } return Integer.MAX_VALUE; diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java index 4eddb98136e..6926c6fab94 100644 --- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java +++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java @@ -16,6 +16,7 @@ */ package org.apache.solr.update; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.net.SocketException; @@ -23,6 +24,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.xml.parsers.ParserConfigurationException; import org.apache.solr.BaseDistributedSearchTestCase; @@ -353,6 +355,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { testDeletes(false, false); testDeletes(true, true); testDeletes(true, false); + getRfFromResponseShouldNotCloseTheInputStream(); } private void testDeletes(boolean dbq, boolean withFailures) throws Exception { @@ -531,6 +534,22 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase { SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true); assertFalse(req.shouldRetry(err)); } + + public void getRfFromResponseShouldNotCloseTheInputStream() { + UpdateRequest dbqReq = new UpdateRequest(); + dbqReq.deleteByQuery("*:*"); + SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true); + AtomicBoolean isClosed = new AtomicBoolean(false); + ByteArrayInputStream is = new ByteArrayInputStream(new byte[100]) { + @Override + public void close() throws IOException { + isClosed.set(true); + super.close(); + } + }; + req.trackRequestResult(null, is, true); + assertFalse("Underlying stream should not be closed!", isClosed.get()); + } private void testReqShouldRetryMaxRetries() { Error err = getError(new SocketException()); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java index fb2af9423c5..7165e9b3a7e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java @@ -279,9 +279,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } finally { try { - if (rspBody != null) { - while (rspBody.read() != -1) {} - } + consumeFully(rspBody); } catch (Exception e) { log.error("Error consuming and closing http response stream.", e); } @@ -295,6 +293,21 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { } } + private void consumeFully(InputStream is) { + if (is != null) { + try (is) { + // make sure the stream is full read + is.skip(is.available()); + while (is.read() != -1) { + } + } catch (UnsupportedOperationException e) { + // nothing to do then + } catch (IOException e) { + // quiet + } + } + } + private void notifyQueueAndRunnersIfEmptyQueue() { if (queue.size() == 0) { synchronized (queue) { @@ -512,6 +525,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient { /** * Intended to be used as an extension point for doing post processing after a request completes. + * @param respBody the body of the response, subclasses must not close this stream. */ public void onSuccess(Response resp, InputStream respBody) { // no-op by design, override to add functionality