SOLR-10698: StreamHandler should allow connections to be closed early

This commit is contained in:
Joel Bernstein 2017-05-22 10:43:40 -04:00
parent f62248c90a
commit 02b1c8aa36
4 changed files with 41 additions and 12 deletions

View File

@ -270,7 +270,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
} catch(Throwable e) {
Throwable ex = e;
e.printStackTrace();
while(ex != null) {
String m = ex.getMessage();
if(m != null && m.contains("Broken pipe")) {

View File

@ -544,6 +544,7 @@ public class HttpSolrClient extends SolrClient {
// Execute the method.
HttpClientContext httpClientRequestContext = HttpClientUtil.createNewHttpClientRequestContext();
final HttpResponse response = httpClient.execute(method, httpClientRequestContext);
int httpStatus = response.getStatusLine().getStatusCode();
// Read the contents
@ -582,6 +583,7 @@ public class HttpSolrClient extends SolrClient {
// no processor specified, return raw stream
NamedList<Object> rsp = new NamedList<>();
rsp.add("stream", respBody);
rsp.add("closeableResponse", response);
// Only case where stream should not be closed
shouldClose = false;
return rsp;

View File

@ -26,6 +26,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
@ -68,6 +70,7 @@ public class SolrStream extends TupleStream {
private transient SolrClientCache cache;
private String slice;
private long checkpoint = -1;
private CloseableHttpResponse closeableHttpResponse;
/**
* @param baseUrl Base URL of the stream.
@ -188,11 +191,7 @@ public class SolrStream extends TupleStream {
* */
public void close() throws IOException {
if (tupleStreamParser != null) {
tupleStreamParser.close();
}
closeableHttpResponse.close();
if(cache == null) {
client.close();
}
@ -266,7 +265,7 @@ public class SolrStream extends TupleStream {
}
// temporary...
public static TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
public TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
String p = requestParams.get("qt");
if (p != null) {
ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
@ -280,6 +279,7 @@ public class SolrStream extends TupleStream {
query.setMethod(SolrRequest.METHOD.POST);
NamedList<Object> genericResponse = server.request(query);
InputStream stream = (InputStream) genericResponse.get("stream");
this.closeableHttpResponse = (CloseableHttpResponse)genericResponse.get("closeableResponse");
if (CommonParams.JAVABIN.equals(wt)) {
return new JavabinTupleStreamParser(stream, true);
} else {
@ -287,6 +287,4 @@ public class SolrStream extends TupleStream {
return new JSONTupleStream(reader);
}
}
}

View File

@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -44,10 +46,14 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class TupleStream implements Closeable, Serializable, MapWriter {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long serialVersionUID = 1;
private UUID streamNodeId = UUID.randomUUID();
@ -78,7 +84,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
open();
ew.put("docs", (IteratorWriter) iw -> {
try {
for (; ; ) {
for ( ; ; ) {
Tuple tuple = read();
if (tuple != null) {
iw.add(tuple);
@ -90,8 +96,22 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
break;
}
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (Throwable e) {
close();
Throwable ex = e;
while(ex != null) {
String m = ex.getMessage();
if(m != null && m.contains("Broken pipe")) {
throw new IgnoreException();
}
ex = ex.getCause();
}
if(e instanceof IOException) {
throw e;
} else {
throw new IOException(e);
}
}
});
}
@ -178,4 +198,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
throw new IOException("Slices not found for " + collectionName);
}
public static class IgnoreException extends IOException {
public void printStackTrace(PrintWriter pw) {
pw.print("Early Client Disconnect");
}
public String getMessage() {
return "Early Client Disconnect";
}
}
}