SOLR-865 -- Adding support for document updates in binary format and corresponding support in Solrj client

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@734796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2009-01-15 20:20:59 +00:00
parent 91303ff7df
commit 6eb1b688e3
10 changed files with 702 additions and 3 deletions

View File

@ -138,6 +138,9 @@ New Features
the error handling is not immediate as it is with the standard SolrServer. the error handling is not immediate as it is with the standard SolrServer.
(ryan) (ryan)
30. SOLR-865: Adding support for document updates in binary format and corresponding support in Solrj client.
(Noble Paul via shalin)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -676,6 +676,9 @@
--> -->
<requestHandler name="/update" class="solr.XmlUpdateRequestHandler" /> <requestHandler name="/update" class="solr.XmlUpdateRequestHandler" />
<requestHandler name="/update/javabin" class="solr.BinaryUpdateRequestHandler" />
<!-- <!--
Analysis request handler. Since Solr 1.3. Use to returnhow a document is analyzed. Useful Analysis request handler. Since Solr 1.3. Use to returnhow a document is analyzed. Useful
for debugging and as a token server for other types of applications for debugging and as a token server for other types of applications

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import static org.apache.solr.handler.XmlUpdateRequestHandler.COMMIT_WITHIN;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
* Update handler which uses the JavaBin format
*
* @version $Id$
* @see org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec
* @see org.apache.solr.common.util.JavaBinCodec
* @since solr 1.4
*/
public class BinaryUpdateRequestHandler extends ContentStreamHandlerBase {
protected ContentStreamLoader newLoader(SolrQueryRequest req, final UpdateRequestProcessor processor) {
return new ContentStreamLoader() {
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream) throws Exception {
parseAndLoadDocs(req, rsp, stream.getStream(), processor);
}
};
}
private void parseAndLoadDocs(SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream,
final UpdateRequestProcessor processor) throws IOException {
UpdateRequest update = null;
update = new JavaBinUpdateRequestCodec().unmarshal(stream,
new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
private AddUpdateCommand addCmd = null;
public void document(SolrInputDocument document, UpdateRequest updateRequest) {
if (addCmd == null) {
addCmd = getAddCommand(updateRequest.getParams());
}
addCmd.solrDoc = document;
try {
processor.processAdd(addCmd);
addCmd.clear();
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document);
}
}
});
if (update.getDeleteById() != null) {
delete(update.getDeleteById(), processor, true);
}
if (update.getDeleteQuery() != null) {
delete(update.getDeleteQuery(), processor, false);
}
}
private AddUpdateCommand getAddCommand(SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand();
boolean overwrite = true; // the default
Boolean overwritePending = null;
Boolean overwriteCommitted = null;
overwrite = params.getBool(UpdateParams.OVERWRITE, overwrite);
addCmd.commitWithin = params.getInt(COMMIT_WITHIN, -1);
// check if these flags are set
if (overwritePending != null && overwriteCommitted != null) {
if (overwritePending != overwriteCommitted) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"can't have different values for 'overwritePending' and 'overwriteCommitted'");
}
overwrite = overwritePending;
}
addCmd.overwriteCommitted = overwrite;
addCmd.overwritePending = overwrite;
addCmd.allowDups = !overwrite;
return addCmd;
}
private void delete(List<String> l, UpdateRequestProcessor processor, boolean isId) throws IOException {
for (String s : l) {
DeleteUpdateCommand delcmd = new DeleteUpdateCommand();
if (isId) {
System.out.println("delete by id : " + s);
delcmd.id = s;
} else {
System.out.println("delete by query = " + s);
delcmd.query = s;
}
delcmd.fromCommitted = true;
delcmd.fromPending = true;
processor.processDelete(delcmd);
}
}
public String getDescription() {
return "Add/Update multiple documents with javabin format";
}
public String getSourceId() {
return "$Id$";
}
public String getSource() {
return "$URL$";
}
public String getVersion() {
return "$Revision$";
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.util.ContentStream;
import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A RequestWriter which writes requests in the javabin format
*
* @version $Id$
* @see org.apache.solr.client.solrj.request.RequestWriter
* @since solr 1.4
*/
public class BinaryRequestWriter extends RequestWriter {
public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException {
if (req instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) req;
if (isNull(updateRequest.getDocuments()) &&
isNull(updateRequest.getDeleteById()) &&
isNull(updateRequest.getDeleteQuery())) {
return null;
}
final BAOS baos = new BAOS();
new JavaBinUpdateRequestCodec().marshal(updateRequest, baos);
List<ContentStream> l = new ArrayList<ContentStream>(1);
l.add(new ContentStream() {
public String getName() {
return null;
}
public String getSourceInfo() {
return "javabin";
}
public String getContentType() {
return "application/octet-stream";
}
public Long getSize() // size if we know it, otherwise null
{
return new Long(baos.size());
}
public InputStream getStream() throws IOException {
return new ByteArrayInputStream(baos.getbuf(), 0, baos.size());
}
public Reader getReader() throws IOException {
throw new RuntimeException("No reader available . this is a binarystream");
}
});
return l;
} else {
return super.getContentStreams(req);
}
}
private boolean isNull(List l) {
return l == null || l.isEmpty();
}
/*
* A hack to get access to the protected internal buffer and avoid an additional copy
*/
class BAOS extends ByteArrayOutputStream {
byte[] getbuf() {
return super.buf;
}
}
public String getPath(SolrRequest req) {
if (req instanceof UpdateRequest) {
return "/update/javabin";
} else {
return req.getPath();
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
@ -97,6 +98,12 @@ public class CommonsHttpSolrServer extends SolrServer
* @see org.apache.solr.client.solrj.impl.BinaryResponseParser * @see org.apache.solr.client.solrj.impl.BinaryResponseParser
*/ */
protected ResponseParser _parser; protected ResponseParser _parser;
/**
* The RequestWriter used to write all requests to Solr
* @see org.apache.solr.client.solrj.request.RequestWriter
*/
protected RequestWriter requestWriter = new RequestWriter();
private final HttpClient _httpClient; private final HttpClient _httpClient;
@ -241,8 +248,8 @@ public class CommonsHttpSolrServer extends SolrServer
public NamedList<Object> request(final SolrRequest request, ResponseParser processor) throws SolrServerException, IOException { public NamedList<Object> request(final SolrRequest request, ResponseParser processor) throws SolrServerException, IOException {
HttpMethod method = null; HttpMethod method = null;
SolrParams params = request.getParams(); SolrParams params = request.getParams();
Collection<ContentStream> streams = request.getContentStreams(); Collection<ContentStream> streams = requestWriter.getContentStreams(request);
String path = request.getPath(); String path = requestWriter.getPath(request);
if( path == null || !path.startsWith( "/" ) ) { if( path == null || !path.startsWith( "/" ) ) {
path = "/select"; path = "/select";
} }
@ -565,4 +572,8 @@ public class CommonsHttpSolrServer extends SolrServer
} }
_maxRetries = maxRetries; _maxRetries = maxRetries;
} }
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
} }

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
* Provides methods for marshalling an UpdateRequest to a NamedList which can be serialized in the javabin format and
* vice versa.
*
* @version $Id$
* @see org.apache.solr.common.util.JavaBinCodec
* @since solr 1.4
*/
public class JavaBinUpdateRequestCodec {
/**
* Converts an UpdateRequest to a NamedList which can be serialized to the given OutputStream in the javabin format
*
* @param updateRequest the UpdateRequest to be written out
* @param os the OutputStream to which the request is to be written
*
* @throws IOException in case of an exception during marshalling or writing to the stream
*/
public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOException {
JavaBinCodec codec = new JavaBinCodec();
NamedList nl = new NamedList();
NamedList params = solrParamsToNamedList(updateRequest.getParams());
if (updateRequest.getCommitWithin() != -1) {
params.add("commitWithin", updateRequest.getCommitWithin());
}
List<List<NamedList>> doclist = new ArrayList<List<NamedList>>();
if (updateRequest.getDocuments() != null) {
for (SolrInputDocument doc : updateRequest.getDocuments()) {
doclist.add(solrInputDocumentToList(doc));
}
}
nl.add("params", params);// 0: params
nl.add("delById", updateRequest.getDeleteById());
nl.add("delByQ", updateRequest.getDeleteQuery());
nl.add("docs", doclist.iterator());
codec.marshal(nl, os);
}
/**
* Reads a NamedList from the given InputStream, converts it into a SolrInputDocument and passes it to the given
* StreamingDocumentHandler
*
* @param is the InputStream from which to read
* @param handler an instance of StreamingDocumentHandler to which SolrInputDocuments are streamed one by one
*
* @return the UpdateRequest
*
* @throws IOException in case of an exception while reading from the input stream or unmarshalling
*/
public UpdateRequest unmarshal(InputStream is, final StreamingDocumentHandler handler) throws IOException {
final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList>> doclist;
List<String> delById;
List<String> delByQ;
final NamedList[] namedList = new NamedList[1];
JavaBinCodec codec = new JavaBinCodec() {
public NamedList readNamedList(FastInputStream dis) throws IOException {
int sz = readSize(dis);
NamedList nl = new NamedList();
if (namedList[0] == null) {
namedList[0] = nl;
}
for (int i = 0; i < sz; i++) {
String name = (String) readVal(dis);
Object val = readVal(dis);
nl.add(name, val);
}
return nl;
}
public List readIterator(FastInputStream fis) throws IOException {
NamedList params = (NamedList) namedList[0].getVal(0);
updateRequest.setParams(namedListToSolrParams(params));
if (handler == null) return super.readIterator(fis);
while (true) {
Object o = readVal(fis);
if (o == END_OBJ) break;
handler.document(listToSolrInputDocument((List<NamedList>) o), updateRequest);
}
return Collections.EMPTY_LIST;
}
};
codec.unmarshal(is);
delById = (List<String>) namedList[0].get("delById");
delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List<List<NamedList>>) namedList[0].get("docs");
if (doclist != null && !doclist.isEmpty()) {
List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
for (List<NamedList> n : doclist) {
solrInputDocs.add(listToSolrInputDocument(n));
}
updateRequest.add(solrInputDocs);
}
if (delById != null) {
for (String s : delById) {
updateRequest.deleteById(s);
}
}
if (delByQ != null) {
for (String s : delByQ) {
updateRequest.deleteByQuery(s);
}
}
return updateRequest;
}
private List<NamedList> solrInputDocumentToList(SolrInputDocument doc) {
List<NamedList> l = new ArrayList<NamedList>();
NamedList nl = new NamedList();
nl.add("boost", doc.getDocumentBoost() == 1.0f ? null : doc.getDocumentBoost());
l.add(nl);
Iterator<SolrInputField> it = doc.iterator();
while (it.hasNext()) {
nl = new NamedList();
SolrInputField field = it.next();
nl.add("name", field.getName());
nl.add("val", field.getValue());
nl.add("boost", field.getBoost() == 1.0f ? null : field.getBoost());
l.add(nl);
}
return l;
}
private SolrInputDocument listToSolrInputDocument(List<NamedList> namedList) {
SolrInputDocument doc = new SolrInputDocument();
for (int i = 0; i < namedList.size(); i++) {
NamedList nl = namedList.get(i);
if (i == 0) {
doc.setDocumentBoost(nl.getVal(0) == null ? 1.0f : (Float) nl.getVal(0));
} else {
doc.addField((String) nl.getVal(0),
nl.getVal(1),
nl.getVal(2) == null ? 1.0f : (Float) nl.getVal(2));
}
}
return doc;
}
private NamedList solrParamsToNamedList(SolrParams params) {
if (params == null) return new NamedList();
Iterator<String> it = params.getParameterNamesIterator();
NamedList nl = new NamedList();
while (it.hasNext()) {
String s = it.next();
nl.add(s, params.getParams(s));
}
return nl;
}
private ModifiableSolrParams namedListToSolrParams(NamedList nl) {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
for (int i = 0; i < nl.size(); i++) {
List<String> l = (List) nl.getVal(i);
if (l != null)
solrParams.add(nl.getName(i),
(String[]) l.toArray(new String[l.size()]));
}
return solrParams;
}
public static interface StreamingDocumentHandler {
public void document(SolrInputDocument document, UpdateRequest req);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.common.util.ContentStream;
import java.io.IOException;
import java.util.Collection;
/**
* A RequestWriter is used to write requests to Solr.
* <p/>
* A subclass can override the methods in this class to supply a custom format in which a request can be sent.
* @since solr 1.4
* @version $Id$
*/
public class RequestWriter {
public Collection<ContentStream> getContentStreams(SolrRequest req) throws IOException {
return req.getContentStreams();
}
public String getPath(SolrRequest req) {
return req.getPath();
}
}

View File

@ -252,7 +252,16 @@ public class UpdateRequest extends SolrRequest
public List<SolrInputDocument> getDocuments() { public List<SolrInputDocument> getDocuments() {
return documents; return documents;
} }
public List<String> getDeleteById() {
return deleteById;
}
public List<String> getDeleteQuery() {
return deleteQuery;
}
public boolean isWaitFlush() { public boolean isWaitFlush() {
return params != null && params.getBool(UpdateParams.WAIT_FLUSH, false); return params != null && params.getBool(UpdateParams.WAIT_FLUSH, false);
} }

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.embedded;
import org.apache.solr.client.solrj.LargeVolumeTestBase;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
/**
* @version $Id$
* @see org.apache.solr.client.solrj.impl.BinaryRequestWriter
* @see org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec
* @since solr 1.4
*/
public class LargeVolumeBinaryJettyTest extends LargeVolumeTestBase {
SolrServer server;
JettySolrRunner jetty;
int port = 0;
static final String context = "/example";
@Override
public void setUp() throws Exception {
super.setUp();
jetty = new JettySolrRunner(context, 0);
jetty.start();
port = jetty.getLocalPort();
server = this.createNewSolrServer();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
jetty.stop(); // stop the server
}
@Override
protected SolrServer getSolrServer() {
return server;
}
@Override
protected SolrServer createNewSolrServer() {
try {
// setup the server...
String url = "http://localhost:" + port + context;
CommonsHttpSolrServer s = new CommonsHttpSolrServer(url);
s.setRequestWriter(new BinaryRequestWriter());
s.setConnectionTimeout(100); // 1/10th sec
s.setDefaultMaxConnectionsPerHost(100);
s.setMaxTotalConnections(100);
return s;
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.request;
import junit.framework.Assert;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
/**
* Test for UpdateRequestCodec
*
* @since solr 1.4
* @version $Id$
* @see org.apache.solr.client.solrj.request.UpdateRequest
*/
public class TestUpdateRequestCodec {
@Test
public void simple() throws IOException {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.deleteById("*:*");
updateRequest.deleteById("id:5");
updateRequest.deleteByQuery("2*");
updateRequest.deleteByQuery("1*");
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", 1);
doc.addField("desc", "one", 2.0f);
doc.addField("desc", "1");
updateRequest.add(doc);
doc = new SolrInputDocument();
doc.addField("id", 2);
doc.setDocumentBoost(10.0f);
doc.addField("desc", "two", 3.0f);
doc.addField("desc", "2");
updateRequest.add(doc);
doc = new SolrInputDocument();
doc.addField("id", 3);
doc.addField("desc", "three", 3.0f);
doc.addField("desc", "3");
updateRequest.add(doc);
// updateRequest.setWaitFlush(true);
updateRequest.deleteById("2");
updateRequest.deleteByQuery("id:3");
JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
codec.marshal(updateRequest, baos);
final List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
JavaBinUpdateRequestCodec.StreamingDocumentHandler handler = new JavaBinUpdateRequestCodec.StreamingDocumentHandler() {
public void document(SolrInputDocument document, UpdateRequest req) {
Assert.assertNotNull(req.getParams());
// Assert.assertEquals(Boolean.TRUE, req.getParams().getBool(UpdateParams.WAIT_FLUSH));
docs.add(document);
}
};
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
Assert.assertNull(updateUnmarshalled.getDocuments());
for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document);
}
for (int i = 0; i < updateRequest.getDocuments().size(); i++) {
SolrInputDocument inDoc = updateRequest.getDocuments().get(i);
SolrInputDocument outDoc = updateUnmarshalled.getDocuments().get(i);
compareDocs(inDoc, outDoc);
}
Assert.assertEquals(updateUnmarshalled.getDeleteById().get(0) , updateRequest.getDeleteById().get(0));
Assert.assertEquals(updateUnmarshalled.getDeleteQuery().get(0) , updateRequest.getDeleteQuery().get(0));
}
private void compareDocs(SolrInputDocument docA, SolrInputDocument docB) {
Assert.assertEquals(docA.getDocumentBoost(), docB.getDocumentBoost());
for (String s : docA.getFieldNames()) {
SolrInputField fldA = docA.getField(s);
SolrInputField fldB = docB.getField(s);
Assert.assertEquals(fldA.getValue(), fldB.getValue());
Assert.assertEquals(fldA.getBoost(), fldB.getBoost());
}
}
}