SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once

This commit is contained in:
Noble Paul 2018-10-09 17:44:40 +11:00
parent dbed8bafe6
commit b4d9b25f44
5 changed files with 224 additions and 2 deletions

View File

@ -138,6 +138,8 @@ New Features
* SOLR-12815: Implement maxOps limit for IndexSizeTrigger. (ab)
* SOLR-12843: Implement a MultiContentWriter in SolrJ to post multiple files/payload at once (noble)
Other Changes
----------------------

View File

@ -142,7 +142,7 @@ public class UpdateRequestHandler extends ContentStreamHandlerBase implements Pe
registry.put("application/xml", new XMLLoader().init(p) );
registry.put("application/json", new JsonLoader().init(p) );
registry.put("application/csv", new CSVLoader().init(p) );
registry.put("application/javabin", new JavabinLoader().init(p) );
registry.put("application/javabin", new JavabinLoader(instance).init(p) );
registry.put("text/csv", registry.get("application/csv") );
registry.put("text/xml", registry.get("application/xml") );
registry.put("text/json", registry.get("application/json"));

View File

@ -19,6 +19,8 @@ package org.apache.solr.handler.loader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -31,7 +33,11 @@ import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.DataInputInputStream;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -46,6 +52,16 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
* @see org.apache.solr.common.util.JavaBinCodec
*/
public class JavabinLoader extends ContentStreamLoader {
final ContentStreamLoader contentStreamLoader;
public JavabinLoader() {
this.contentStreamLoader = this;
}
public JavabinLoader(ContentStreamLoader contentStreamLoader) {
super();
this.contentStreamLoader = contentStreamLoader;
}
@Override
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception {
@ -62,6 +78,10 @@ public class JavabinLoader extends ContentStreamLoader {
private void parseAndLoadDocs(final SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream,
final UpdateRequestProcessor processor) throws IOException {
if (req.getParams().getBool("multistream", false)) {
handleMultiStream(req, rsp, stream, processor);
return;
}
UpdateRequest update = null;
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
private AddUpdateCommand addCmd = null;
@ -116,6 +136,44 @@ public class JavabinLoader extends ContentStreamLoader {
}
}
private void handleMultiStream(SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream, UpdateRequestProcessor processor)
throws IOException {
FastInputStream in = FastInputStream.wrap(stream);
SolrParams old = req.getParams();
new JavaBinCodec() {
SolrParams params;
AddUpdateCommand addCmd = null;
@Override
public List<Object> readIterator(DataInputInputStream fis) throws IOException {
while (true) {
Object o = readVal(fis);
if (o == END_OBJ) break;
if (o instanceof NamedList) {
params = ((NamedList) o).toSolrParams();
} else {
try {
if (o instanceof byte[]) {
if (params != null) req.setParams(params);
byte[] buf = (byte[]) o;
contentStreamLoader.load(req, rsp, new ContentStreamBase.ByteArrayStream(buf, null), processor);
} else {
throw new RuntimeException("unsupported type ");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
params = null;
req.setParams(old);
}
}
}
return Collections.emptyList();
}
}.unmarshal(in);
}
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import static org.apache.solr.common.params.UpdateParams.ASSUME_CONTENT_TYPE;
public class MultiContentWriterRequest extends AbstractUpdateRequest {
private final Iterator<Pair<NamedList, Object>> payload;
/**
*
* @param m HTTP method
* @param path path to which to post to
* @param payload add the per doc params, The Object could be a ByteBuffer or byte[]
*/
public MultiContentWriterRequest(METHOD m, String path, Iterator<Pair<NamedList, Object>> payload) {
super(m, path);
params = new ModifiableSolrParams();
params.add("multistream", "true");
this.payload = payload;
}
@Override
public RequestWriter.ContentWriter getContentWriter(String expectedType) {
return new RequestWriter.ContentWriter() {
@Override
public void write(OutputStream os) throws IOException {
new JavaBinCodec().marshal((IteratorWriter) iw -> {
while (payload.hasNext()) {
Pair<NamedList, Object> next = payload.next();
if (next.second() instanceof ByteBuffer || next.second() instanceof byte[]) {
NamedList params = next.first();
if(params.get(ASSUME_CONTENT_TYPE) == null){
String detectedType = detect(next.second());
if(detectedType==null){
throw new RuntimeException("Unknown content type");
}
params.add(ASSUME_CONTENT_TYPE, detectedType);
}
iw.add(params);
iw.add(next.second());
} else {
throw new RuntimeException("payload value must be byte[] or ByteBuffer");
}
}
}, os);
}
@Override
public String getContentType() {
return "application/javabin";
}
};
}
public static String detect(Object o) throws IOException {
Reader rdr = null;
byte[] bytes = null;
if (o instanceof byte[]) bytes = (byte[]) o;
else if (o instanceof ByteBuffer) bytes = ((ByteBuffer) o).array();
rdr = new InputStreamReader(new ByteArrayInputStream(bytes));
String detectedContentType = null;
for (;;) {
int ch = rdr.read();
if (Character.isWhitespace(ch)) {
continue;
}
int nextChar = -1;
// first non-whitespace chars
if (ch == '#' // single line comment
|| (ch == '/' && ((nextChar = rdr.read()) == '/' || nextChar == '*')) // single line or multi-line comment
|| (ch == '{' || ch == '[') // start of JSON object
)
{
detectedContentType = "application/json";
} else if (ch == '<') {
detectedContentType = "text/xml";
}
break;
}
return detectedContentType;
}
public static ByteBuffer readByteBuffer(InputStream is) throws IOException {
BinaryRequestWriter.BAOS baos = new BinaryRequestWriter.BAOS();
org.apache.commons.io.IOUtils.copy(is, baos);
return ByteBuffer.wrap(baos.getbuf(), 0, baos.size());
}
}

View File

@ -17,8 +17,11 @@
package org.apache.solr.client.solrj;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -43,6 +46,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.request.LukeRequest;
import org.apache.solr.client.solrj.request.MultiContentWriterRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.StreamingUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@ -64,11 +68,14 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.FacetParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.junit.Test;
import org.noggit.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.UpdateParams.ASSUME_CONTENT_TYPE;
import static org.junit.internal.matchers.StringContains.containsString;
/**
@ -671,7 +678,39 @@ abstract public class SolrExampleTests extends SolrExampleTestsBase
Assert.assertEquals( 10, rsp.getResults().getNumFound() );
}
@Test
@Test
public void testMultiContentWriterRequest() throws Exception {
SolrClient client = getSolrClient();
client.deleteByQuery("*:*");// delete everything!
client.commit();
QueryResponse rsp = client.query(new SolrQuery("*:*"));
Assert.assertEquals(0, rsp.getResults().getNumFound());
List<Pair<NamedList, Object>> docs = new ArrayList<>();
NamedList params = new NamedList();
docs.add(new Pair(params, getFileContent(params, "solrj/docs1.xml")));
params = new NamedList();
params.add(ASSUME_CONTENT_TYPE, "application/csv");
docs.add(new Pair(params, getFileContent(params, "solrj/books.csv")));
MultiContentWriterRequest up = new MultiContentWriterRequest(SolrRequest.METHOD.POST, "/update", docs.iterator());
up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
NamedList<Object> result = client.request(up);
System.out.println(result.jsonStr());
rsp = client.query(new SolrQuery("*:*"));
Assert.assertEquals(12, rsp.getResults().getNumFound());
}
private ByteBuffer getFileContent(NamedList nl, String name) throws IOException {
try (InputStream is = new FileInputStream(getFile(name))) {
return MultiContentWriterRequest.readByteBuffer(is);
}
}
@Test
public void testMultiContentStreamRequest() throws Exception {
SolrClient client = getSolrClient();
client.deleteByQuery("*:*");// delete everything!