mirror of https://github.com/apache/lucene.git
SOLR-2904 -- BinaryUpdateRequestHandler should be able to accept multiple update requests from a stream
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1204453 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
356fa5d137
commit
20a43ad9dc
|
@ -189,6 +189,9 @@ New Features
|
|||
* SOLR-2134 Trie* fields should support sortMissingLast=true, and deprecate Sortable* Field Types
|
||||
(Ryan McKinley, Mike McCandless, Uwe Schindler, Erick Erickson)
|
||||
|
||||
* SOLR-2904: BinaryUpdateRequestHandler should be able to accept multiple update requests from
|
||||
a stream (shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -24,12 +24,14 @@ import org.apache.solr.common.SolrInputDocument;
|
|||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.ContentStream;
|
||||
import org.apache.solr.common.util.FastInputStream;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.update.AddUpdateCommand;
|
||||
import org.apache.solr.update.DeleteUpdateCommand;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessor;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
|
@ -66,30 +68,49 @@ public class BinaryUpdateRequestHandler extends ContentStreamHandlerBase {
|
|||
private void parseAndLoadDocs(final SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream,
|
||||
final UpdateRequestProcessor processor) throws IOException {
|
||||
UpdateRequest update = null;
|
||||
update = new JavaBinUpdateRequestCodec().unmarshal(stream,
|
||||
new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
|
||||
private AddUpdateCommand addCmd = null;
|
||||
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
|
||||
private AddUpdateCommand addCmd = null;
|
||||
|
||||
public void document(SolrInputDocument document, UpdateRequest updateRequest) {
|
||||
if (addCmd == null) {
|
||||
addCmd = getAddCommand(req, updateRequest.getParams());
|
||||
}
|
||||
addCmd.solrDoc = document;
|
||||
try {
|
||||
processor.processAdd(addCmd);
|
||||
addCmd.clear();
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (update.getDeleteById() != null) {
|
||||
delete(req, update.getDeleteById(), processor, true);
|
||||
public void update(SolrInputDocument document, UpdateRequest updateRequest) {
|
||||
if (document == null) {
|
||||
// Perhaps commit from the parameters
|
||||
try {
|
||||
RequestHandlerUtils.handleCommit(req, processor, updateRequest.getParams(), false);
|
||||
RequestHandlerUtils.handleRollback(req, processor, updateRequest.getParams(), false);
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR handling commit/rollback");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (addCmd == null) {
|
||||
addCmd = getAddCommand(req, updateRequest.getParams());
|
||||
}
|
||||
addCmd.solrDoc = document;
|
||||
try {
|
||||
processor.processAdd(addCmd);
|
||||
addCmd.clear();
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document);
|
||||
}
|
||||
}
|
||||
};
|
||||
FastInputStream in = FastInputStream.wrap(stream);
|
||||
for (; ; ) {
|
||||
try {
|
||||
update = new JavaBinUpdateRequestCodec().unmarshal(in, handler);
|
||||
} catch (EOFException e) {
|
||||
break; // this is expected
|
||||
} catch (Exception e) {
|
||||
log.error("Exception while processing update request", e);
|
||||
break;
|
||||
}
|
||||
if (update.getDeleteById() != null) {
|
||||
delete(req, update.getDeleteById(), processor, true);
|
||||
}
|
||||
if (update.getDeleteQuery() != null) {
|
||||
delete(req, update.getDeleteQuery(), processor, false);
|
||||
}
|
||||
}
|
||||
if (update.getDeleteQuery() != null) {
|
||||
delete(req, update.getDeleteQuery(), processor, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.solr.client.solrj.request;
|
||||
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.FastInputStream;
|
||||
|
@ -72,16 +71,16 @@ public class JavaBinUpdateRequestCodec {
|
|||
|
||||
/**
|
||||
* Reads a NamedList from the given InputStream, converts it into a SolrInputDocument and passes it to the given
|
||||
* StreamingDocumentHandler
|
||||
* StreamingUpdateHandler
|
||||
*
|
||||
* @param is the InputStream from which to read
|
||||
* @param handler an instance of StreamingDocumentHandler to which SolrInputDocuments are streamed one by one
|
||||
* @param handler an instance of StreamingUpdateHandler to which SolrInputDocuments are streamed one by one
|
||||
*
|
||||
* @return the UpdateRequest
|
||||
*
|
||||
* @throws IOException in case of an exception while reading from the input stream or unmarshalling
|
||||
*/
|
||||
public UpdateRequest unmarshal(InputStream is, final StreamingDocumentHandler handler) throws IOException {
|
||||
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
|
||||
final UpdateRequest updateRequest = new UpdateRequest();
|
||||
List<List<NamedList>> doclist;
|
||||
List<String> delById;
|
||||
|
@ -128,8 +127,17 @@ public class JavaBinUpdateRequestCodec {
|
|||
while (true) {
|
||||
Object o = readVal(fis);
|
||||
if (o == END_OBJ) break;
|
||||
SolrInputDocument sdoc = (SolrInputDocument)o;
|
||||
handler.document(sdoc, updateRequest);
|
||||
SolrInputDocument sdoc = null;
|
||||
if (o instanceof List) {
|
||||
sdoc = listToSolrInputDocument((List<NamedList>) o);
|
||||
} else if (o instanceof NamedList) {
|
||||
UpdateRequest req = new UpdateRequest();
|
||||
req.setParams(new ModifiableSolrParams(SolrParams.toSolrParams((NamedList) o)));
|
||||
handler.update(null, req);
|
||||
} else {
|
||||
sdoc = (SolrInputDocument) o;
|
||||
}
|
||||
handler.update(sdoc, updateRequest);
|
||||
}
|
||||
return Collections.EMPTY_LIST;
|
||||
}
|
||||
|
@ -144,7 +152,11 @@ public class JavaBinUpdateRequestCodec {
|
|||
if (doclist != null && !doclist.isEmpty()) {
|
||||
List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
|
||||
for (Object o : doclist) {
|
||||
solrInputDocs.add((SolrInputDocument)o);
|
||||
if (o instanceof List) {
|
||||
solrInputDocs.add(listToSolrInputDocument((List<NamedList>)o));
|
||||
} else {
|
||||
solrInputDocs.add((SolrInputDocument)o);
|
||||
}
|
||||
}
|
||||
updateRequest.add(solrInputDocs);
|
||||
}
|
||||
|
@ -162,6 +174,20 @@ public class JavaBinUpdateRequestCodec {
|
|||
|
||||
}
|
||||
|
||||
private SolrInputDocument listToSolrInputDocument(List<NamedList> namedList) {
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
for (int i = 0; i < namedList.size(); i++) {
|
||||
NamedList nl = namedList.get(i);
|
||||
if (i == 0) {
|
||||
doc.setDocumentBoost(nl.getVal(0) == null ? 1.0f : (Float) nl.getVal(0));
|
||||
} else {
|
||||
doc.addField((String) nl.getVal(0),
|
||||
nl.getVal(1),
|
||||
nl.getVal(2) == null ? 1.0f : (Float) nl.getVal(2));
|
||||
}
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
private NamedList solrParamsToNamedList(SolrParams params) {
|
||||
if (params == null) return new NamedList();
|
||||
|
@ -174,7 +200,7 @@ public class JavaBinUpdateRequestCodec {
|
|||
return nl;
|
||||
}
|
||||
|
||||
public static interface StreamingDocumentHandler {
|
||||
public void document(SolrInputDocument document, UpdateRequest req);
|
||||
public static interface StreamingUpdateHandler {
|
||||
public void update(SolrInputDocument document, UpdateRequest req);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import junit.framework.Assert;
|
|||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.SolrInputField;
|
||||
import org.apache.solr.common.util.FastInputStream;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -83,8 +84,8 @@ public class TestUpdateRequestCodec extends LuceneTestCase {
|
|||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
codec.marshal(updateRequest, baos);
|
||||
final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
|
||||
JavaBinUpdateRequestCodec.StreamingDocumentHandler handler = new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
|
||||
public void document(SolrInputDocument document, UpdateRequest req) {
|
||||
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
|
||||
public void update(SolrInputDocument document, UpdateRequest req) {
|
||||
Assert.assertNotNull(req.getParams());
|
||||
docs.add(document);
|
||||
}
|
||||
|
@ -131,8 +132,8 @@ public class TestUpdateRequestCodec extends LuceneTestCase {
|
|||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
codec.marshal(updateRequest, baos);
|
||||
final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
|
||||
JavaBinUpdateRequestCodec.StreamingDocumentHandler handler = new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
|
||||
public void document(SolrInputDocument document, UpdateRequest req) {
|
||||
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
|
||||
public void update(SolrInputDocument document, UpdateRequest req) {
|
||||
Assert.assertNotNull(req.getParams());
|
||||
docs.add(document);
|
||||
}
|
||||
|
@ -154,7 +155,7 @@ public class TestUpdateRequestCodec extends LuceneTestCase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private void compareDocs(String m,
|
||||
SolrInputDocument expectedDoc,
|
||||
|
|
Loading…
Reference in New Issue