SOLR-11380: SolrJ must stream docs to server instead of writing to a buffer first

This commit is contained in:
Noble Paul 2017-10-31 13:16:31 +10:30
parent 52b4625ac7
commit 706b6c9171
5 changed files with 197 additions and 151 deletions

View File

@ -118,6 +118,8 @@ Other Changes
* SOLR-11562: Restore Solr logo ASCII-art in startup log by removing unnecessary default confdir logging (janhoy) * SOLR-11562: Restore Solr logo ASCII-art in startup log by removing unnecessary default confdir logging (janhoy)
* SOLR-11380: SolrJ must stream docs to server instead of writing to a buffer first (noble)
================== 7.1.0 ================== ================== 7.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -16,11 +16,15 @@
*/ */
package org.apache.solr.handler; package org.apache.solr.handler;
import java.io.ByteArrayOutputStream;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter; import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.loader.ContentStreamLoader; import org.apache.solr.handler.loader.ContentStreamLoader;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
@ -54,8 +58,11 @@ public class BinaryUpdateRequestHandlerTest extends SolrTestCaseJ4 {
SolrQueryRequest req = req(); SolrQueryRequest req = req();
ContentStreamLoader csl = handler.newLoader(req, p); ContentStreamLoader csl = handler.newLoader(req, p);
csl.load(req, rsp, brw.getContentStream(ureq), p); RequestWriter.ContentWriter cw = brw.getContentWriter(ureq);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
cw.write(baos);
ContentStreamBase.ByteArrayStream cs = new ContentStreamBase.ByteArrayStream(baos.toByteArray(), null, "application/javabin");
csl.load(req, rsp, cs, p);
AddUpdateCommand add = p.addCommands.get(0); AddUpdateCommand add = p.addCommands.get(0);
System.out.println(add.solrDoc); System.out.println(add.solrDoc);
assertEquals(false, add.overwrite); assertEquals(false, add.overwrite);

View File

@ -16,17 +16,17 @@
*/ */
package org.apache.solr.client.solrj.impl; package org.apache.solr.client.solrj.impl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec; import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.ContentStream;
import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/** /**
* A RequestWriter which writes requests in the javabin format * A RequestWriter which writes requests in the javabin format
* *
@ -36,6 +36,32 @@ import java.util.List;
*/ */
public class BinaryRequestWriter extends RequestWriter { public class BinaryRequestWriter extends RequestWriter {
@Override
public ContentWriter getContentWriter(SolrRequest req) {
if (req instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) req;
if (isNull(updateRequest.getDocuments()) &&
isNull(updateRequest.getDeleteByIdMap()) &&
isNull(updateRequest.getDeleteQuery())
&& (updateRequest.getDocIterator() == null)) {
return null;
}
return new ContentWriter() {
@Override
public void write(OutputStream os) throws IOException {
new JavaBinUpdateRequestCodec().marshal(updateRequest, os);
}
@Override
public String getContentType() {
return "application/javabin";
}
};
} else {
return super.getContentWriter(req);
}
}
@Override @Override
public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException { public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException {
if (req instanceof UpdateRequest) { if (req instanceof UpdateRequest) {
@ -46,13 +72,10 @@ public class BinaryRequestWriter extends RequestWriter {
&& (updateRequest.getDocIterator() == null) ) { && (updateRequest.getDocIterator() == null) ) {
return null; return null;
} }
List<ContentStream> l = new ArrayList<>(); throw new RuntimeException("This Should not happen");
l.add(new LazyContentStream(updateRequest));
return l;
} else { } else {
return super.getContentStreams(req); return super.getContentStreams(req);
} }
} }
@ -61,46 +84,6 @@ public class BinaryRequestWriter extends RequestWriter {
return "application/javabin"; return "application/javabin";
} }
@Override
public ContentStream getContentStream(final UpdateRequest request) throws IOException {
final BAOS baos = new BAOS();
new JavaBinUpdateRequestCodec().marshal(request, baos);
return new ContentStream() {
@Override
public String getName() {
return null;
}
@Override
public String getSourceInfo() {
return "javabin";
}
@Override
public String getContentType() {
return "application/javabin";
}
@Override
public Long getSize() // size if we know it, otherwise null
{
return new Long(baos.size());
}
@Override
public InputStream getStream() {
return new ByteArrayInputStream(baos.getbuf(), 0, baos.size());
}
@Override
public Reader getReader() {
throw new RuntimeException("No reader available . this is a binarystream");
}
};
}
@Override @Override
public void write(SolrRequest request, OutputStream os) throws IOException { public void write(SolrRequest request, OutputStream os) throws IOException {
if (request instanceof UpdateRequest) { if (request instanceof UpdateRequest) {

View File

@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.net.ConnectException; import java.net.ConnectException;
@ -52,6 +53,7 @@ import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.mime.FormBodyPart; import org.apache.http.entity.mime.FormBodyPart;
@ -327,7 +329,8 @@ public class HttpSolrClient extends SolrClient {
request = ((V2RequestSupport) request).getV2Request(); request = ((V2RequestSupport) request).getV2Request();
} }
SolrParams params = request.getParams(); SolrParams params = request.getParams();
Collection<ContentStream> streams = requestWriter.getContentStreams(request); RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(request);
Collection<ContentStream> streams = contentWriter == null ? requestWriter.getContentStreams(request) : null;
String path = requestWriter.getPath(request); String path = requestWriter.getPath(request);
if (path == null || !path.startsWith("/")) { if (path == null || !path.startsWith("/")) {
path = DEFAULT_PATH; path = DEFAULT_PATH;
@ -362,7 +365,7 @@ public class HttpSolrClient extends SolrClient {
} }
if (SolrRequest.METHOD.GET == request.getMethod()) { if (SolrRequest.METHOD.GET == request.getMethod()) {
if (streams != null) { if (streams != null || contentWriter != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!"); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!");
} }
@ -389,11 +392,88 @@ public class HttpSolrClient extends SolrClient {
|| (streams != null && streams.size() > 1)) && !hasNullStreamName; || (streams != null && streams.size() > 1)) && !hasNullStreamName;
LinkedList<NameValuePair> postOrPutParams = new LinkedList<>(); LinkedList<NameValuePair> postOrPutParams = new LinkedList<>();
if (streams == null || isMultipart) {
if(contentWriter != null) {
String fullQueryUrl = url + wparams.toQueryString();
HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
postOrPut.addHeader("Content-Type",
contentWriter.getContentType());
postOrPut.setEntity(new BasicHttpEntity(){
@Override
public boolean isStreaming() {
return true;
}
@Override
public void writeTo(OutputStream outstream) throws IOException {
contentWriter.write(outstream);
}
});
return postOrPut;
} else if (streams == null || isMultipart) {
// send server list and request list as query string params // send server list and request list as query string params
ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams); ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams);
queryParams.add(calculateQueryParams(request.getQueryParams(), wparams)); queryParams.add(calculateQueryParams(request.getQueryParams(), wparams));
String fullQueryUrl = url + queryParams.toQueryString(); String fullQueryUrl = url + queryParams.toQueryString();
HttpEntityEnclosingRequestBase postOrPut = fillContentStream(request, streams, wparams, isMultipart, postOrPutParams, fullQueryUrl);
return postOrPut;
}
// It is has one stream, it is the post body, put the params in the URL
else {
String fullQueryUrl = url + wparams.toQueryString();
HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
fillSingleContentStream(streams, postOrPut);
return postOrPut;
}
}
throw new SolrServerException("Unsupported method: " + request.getMethod());
}
private void fillSingleContentStream(Collection<ContentStream> streams, HttpEntityEnclosingRequestBase postOrPut) throws IOException {
// Single stream as body
// Using a loop just to get the first one
final ContentStream[] contentStream = new ContentStream[1];
for (ContentStream content : streams) {
contentStream[0] = content;
break;
}
if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
Long size = contentStream[0].getSize();
postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size == null ? -1 : size) {
@Override
public Header getContentType() {
return new BasicHeader("Content-Type", contentStream[0].getContentType());
}
@Override
public boolean isRepeatable() {
return false;
}
});
} else {
Long size = contentStream[0].getSize();
postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size == null ? -1 : size) {
@Override
public Header getContentType() {
return new BasicHeader("Content-Type", contentStream[0].getContentType());
}
@Override
public boolean isRepeatable() {
return false;
}
});
}
}
private HttpEntityEnclosingRequestBase fillContentStream(SolrRequest request, Collection<ContentStream> streams, ModifiableSolrParams wparams, boolean isMultipart, LinkedList<NameValuePair> postOrPutParams, String fullQueryUrl) throws IOException {
HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ? HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl); new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
@ -447,57 +527,9 @@ public class HttpSolrClient extends SolrClient {
//not using multipart //not using multipart
postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8)); postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8));
} }
return postOrPut; return postOrPut;
} }
// It is has one stream, it is the post body, put the params in the URL
else {
String fullQueryUrl = url + wparams.toQueryString();
HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
// Single stream as body
// Using a loop just to get the first one
final ContentStream[] contentStream = new ContentStream[1];
for (ContentStream content : streams) {
contentStream[0] = content;
break;
}
if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
Long size = contentStream[0].getSize();
postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size == null ? -1 : size) {
@Override
public Header getContentType() {
return new BasicHeader("Content-Type", contentStream[0].getContentType());
}
@Override
public boolean isRepeatable() {
return false;
}
});
} else {
Long size = contentStream[0].getSize();
postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), size == null ? -1 : size) {
@Override
public Header getContentType() {
return new BasicHeader("Content-Type", contentStream[0].getContentType());
}
@Override
public boolean isRepeatable() {
return false;
}
});
}
return postOrPut;
}
}
throw new SolrServerException("Unsupported method: " + request.getMethod());
}
private static final List<String> errPath = Arrays.asList("metadata", "error-class");//Utils.getObjectByPath(err, false,"metadata/error-class") private static final List<String> errPath = Arrays.asList("metadata", "error-class");//Utils.getObjectByPath(err, false,"metadata/error-class")
protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser processor, final boolean isV2Api) throws SolrServerException { protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser processor, final boolean isV2Api) throws SolrServerException {

View File

@ -16,18 +16,23 @@
*/ */
package org.apache.solr.client.solrj.request; package org.apache.solr.client.solrj.request;
import org.apache.solr.client.solrj.SolrRequest; import java.io.BufferedWriter;
import org.apache.solr.client.solrj.util.ClientUtils; import java.io.IOException;
import org.apache.solr.common.util.ContentStream; import java.io.InputStream;
import org.apache.solr.common.util.ContentStreamBase; import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.*; import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
/** /**
* A RequestWriter is used to write requests to Solr. * A RequestWriter is used to write requests to Solr.
@ -40,6 +45,23 @@ import java.nio.charset.StandardCharsets;
public class RequestWriter { public class RequestWriter {
public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final Charset UTF_8 = StandardCharsets.UTF_8;
public interface ContentWriter {
void write(OutputStream os) throws IOException;
String getContentType();
}
/**
* Use this to do a push writing instead of pull. If this method returns null
* {@link org.apache.solr.client.solrj.request.RequestWriter#getContentStream(UpdateRequest)} is
* invoked to do a pull write.
*/
public ContentWriter getContentWriter(SolrRequest req) {
return null;
}
public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException { public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException {
if (req instanceof UpdateRequest) { if (req instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) req; UpdateRequest updateRequest = (UpdateRequest) req;