diff --git a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java index bd43d5fc31a..46ec3a4a394 100644 --- a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java @@ -270,7 +270,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { } } catch(Throwable e) { Throwable ex = e; - e.printStackTrace(); while(ex != null) { String m = ex.getMessage(); if(m != null && m.contains("Broken pipe")) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java index 1692aa99eff..dea1711dd83 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java @@ -544,6 +544,7 @@ public class HttpSolrClient extends SolrClient { // Execute the method. HttpClientContext httpClientRequestContext = HttpClientUtil.createNewHttpClientRequestContext(); final HttpResponse response = httpClient.execute(method, httpClientRequestContext); + int httpStatus = response.getStatusLine().getStatusCode(); // Read the contents @@ -582,6 +583,7 @@ public class HttpSolrClient extends SolrClient { // no processor specified, return raw stream NamedList rsp = new NamedList<>(); rsp.add("stream", respBody); + rsp.add("closeableResponse", response); // Only case where stream should not be closed shouldClose = false; return rsp; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java index f132815ba89..2bb2e1c1881 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; @@ -68,6 +70,7 @@ public class SolrStream extends TupleStream { private transient SolrClientCache cache; private String slice; private long checkpoint = -1; + private CloseableHttpResponse closeableHttpResponse; /** * @param baseUrl Base URL of the stream. @@ -188,11 +191,7 @@ public class SolrStream extends TupleStream { * */ public void close() throws IOException { - - if (tupleStreamParser != null) { - tupleStreamParser.close(); - } - + closeableHttpResponse.close(); if(cache == null) { client.close(); } @@ -266,7 +265,7 @@ public class SolrStream extends TupleStream { } // temporary... - public static TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException { + public TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException { String p = requestParams.get("qt"); if (p != null) { ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams; @@ -280,6 +279,7 @@ public class SolrStream extends TupleStream { query.setMethod(SolrRequest.METHOD.POST); NamedList genericResponse = server.request(query); InputStream stream = (InputStream) genericResponse.get("stream"); + this.closeableHttpResponse = (CloseableHttpResponse)genericResponse.get("closeableResponse"); if (CommonParams.JAVABIN.equals(wt)) { return new JavabinTupleStreamParser(stream, true); } else { @@ -287,6 +287,4 @@ public class SolrStream extends TupleStream { return new JSONTupleStream(reader); } } - - } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java index ceea6af2e14..0542bd6be92 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java @@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.io.stream; import java.io.Closeable; import java.io.IOException; +import java.io.PrintWriter; import java.io.Serializable; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -44,10 +46,14 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.StrUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class TupleStream implements Closeable, Serializable, MapWriter { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final long serialVersionUID = 1; private UUID streamNodeId = UUID.randomUUID(); @@ -78,7 +84,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter open(); ew.put("docs", (IteratorWriter) iw -> { try { - for (; ; ) { + for ( ; ; ) { Tuple tuple = read(); if (tuple != null) { iw.add(tuple); @@ -90,8 +96,22 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter break; } } - } catch (IOException e) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } catch (Throwable e) { + close(); + Throwable ex = e; + while(ex != null) { + String m = ex.getMessage(); + if(m != null && m.contains("Broken pipe")) { + throw new IgnoreException(); + } + ex = ex.getCause(); + } + + if(e instanceof IOException) { + throw e; + } else { + throw new IOException(e); + } } }); } @@ -178,4 +198,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter throw new IOException("Slices not found for " + collectionName); } + + public static class IgnoreException extends IOException { + public void printStackTrace(PrintWriter pw) { + pw.print("Early Client Disconnect"); + } + + public String getMessage() { + return "Early Client Disconnect"; + } + } } \ No newline at end of file