SOLR-1038 -- Enhance CommonsHttpSolrServer to add docs in batch using an iterator API

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@755882 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2009-03-19 09:15:10 +00:00
parent bf48eceb2e
commit 54819167be
7 changed files with 201 additions and 26 deletions

View File

@ -184,6 +184,8 @@ New Features
range filter in Lucene contrib-queries and can be dramatically faster than normal range queries.
(Uwe Schindler, shalin)
31. SOLR-1038: Enhance CommonsHttpSolrServer to add docs in batch using an iterator API (Noble Paul via shalin)
Optimizations
----------------------

View File

@ -41,7 +41,8 @@ public class BinaryRequestWriter extends RequestWriter {
UpdateRequest updateRequest = (UpdateRequest) req;
if (isNull(updateRequest.getDocuments()) &&
isNull(updateRequest.getDeleteById()) &&
isNull(updateRequest.getDeleteQuery())) {
isNull(updateRequest.getDeleteQuery())
&& (updateRequest.getDocIterator() == null) ) {
return null;
}
List<ContentStream> l = new ArrayList<ContentStream>();

View File

@ -22,10 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;
@ -51,13 +48,13 @@ import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.DefaultSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.*;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
@ -601,4 +598,26 @@ public class CommonsHttpSolrServer extends SolrServer
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
/**
* Adds the documents supplied by the given iterator. A commit is called after all the documents are added.
* If an exception is thrown, commit is not called.
*
* @param docIterator the iterator which returns SolrInputDocument instances
* @param commitParams additional parameters such as optimize, waitFlush, waitSearcher
*
* @return the response from the SolrServer
*/
public UpdateResponse addAndCommit(Iterator<SolrInputDocument> docIterator, SolrParams commitParams)
throws SolrServerException, IOException {
UpdateRequest req = new UpdateRequest();
req.setDocIterator(docIterator);
if (commitParams instanceof ModifiableSolrParams) {
req.setParams((ModifiableSolrParams) commitParams);
} else if (commitParams != null) {
req.setParams(new ModifiableSolrParams(commitParams));
}
req.setParam(UpdateParams.COMMIT, "true");
return req.process(this);
}
}

View File

@ -27,10 +27,7 @@ import org.apache.solr.common.util.NamedList;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.*;
/**
* Provides methods for marshalling an UpdateRequest to a NamedList which can be serialized in the javabin format and
@ -51,25 +48,33 @@ public class JavaBinUpdateRequestCodec {
* @throws IOException in case of an exception during marshalling or writing to the stream
*/
public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOException {
JavaBinCodec codec = new JavaBinCodec();
NamedList nl = new NamedList();
NamedList params = solrParamsToNamedList(updateRequest.getParams());
if (updateRequest.getCommitWithin() != -1) {
params.add("commitWithin", updateRequest.getCommitWithin());
}
List<List<NamedList>> doclist = new ArrayList<List<NamedList>>();
if (updateRequest.getDocuments() != null) {
for (SolrInputDocument doc : updateRequest.getDocuments()) {
doclist.add(solrInputDocumentToList(doc));
}
Iterator<SolrInputDocument> docIter = null;
if (updateRequest.getDocuments() != null) {
docIter = updateRequest.getDocuments().iterator();
}
if(updateRequest.getDocIterator() != null){
docIter = updateRequest.getDocIterator();
}
nl.add("params", params);// 0: params
nl.add("delById", updateRequest.getDeleteById());
nl.add("delByQ", updateRequest.getDeleteQuery());
nl.add("docs", doclist.iterator());
codec.marshal(nl, os);
nl.add("docs", docIter);
new JavaBinCodec(){
public void writeMap(Map val) throws IOException {
if (val instanceof SolrInputDocument) {
writeVal(solrInputDocumentToList((SolrInputDocument) val));
} else {
super.writeMap(val);
}
}
}.marshal(nl, os);
}
/**

View File

@ -51,7 +51,8 @@ public class RequestWriter {
private boolean isEmpty(UpdateRequest updateRequest) {
return isNull(updateRequest.getDocuments()) &&
isNull(updateRequest.getDeleteById()) &&
isNull(updateRequest.getDeleteQuery());
isNull(updateRequest.getDeleteQuery()) &&
updateRequest.getDocIterator() == null;
}
public String getPath(SolrRequest req) {

View File

@ -23,6 +23,7 @@ import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Iterator;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@ -48,6 +49,7 @@ public class UpdateRequest extends SolrRequest
};
private List<SolrInputDocument> documents = null;
private Iterator<SolrInputDocument> docIterator = null;
private List<String> deleteById = null;
private List<String> deleteQuery = null;
@ -165,6 +167,10 @@ public class UpdateRequest extends SolrRequest
this.params = params;
}
public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
this.docIterator = docIterator;
}
//--------------------------------------------------------------------------
//--------------------------------------------------------------------------
@ -188,18 +194,28 @@ public class UpdateRequest extends SolrRequest
* @since solr 1.4
*/
public void writeXML( Writer writer ) throws IOException {
if( documents != null && documents.size() > 0 ) {
if( (documents != null && documents.size() > 0) || docIterator != null) {
if( commitWithin > 0 ) {
writer.write("<add commitWithin=\""+commitWithin+"\">");
}
else {
writer.write("<add>");
}
if(documents != null) {
for (SolrInputDocument doc : documents) {
if (doc != null) {
ClientUtils.writeXML(doc, writer);
}
}
}
if (docIterator != null) {
while (docIterator.hasNext()) {
SolrInputDocument doc = docIterator.next();
if (doc != null) {
ClientUtils.writeXML(doc, writer);
}
}
}
writer.write("</add>");
}
@ -253,6 +269,9 @@ public class UpdateRequest extends SolrRequest
return documents;
}
public Iterator<SolrInputDocument> getDocIterator() {
return docIterator;
}
public List<String> getDeleteById() {
return deleteById;

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.common.SolrInputDocument;
import java.util.Iterator;
import java.io.IOException;
/**
* Test for SOLR-1038
*
* @since solr 1.4
* @version $Id$
*/
public class TestBatchUpdate extends SolrExampleTestBase {
static final int numdocs = 1000;
SolrServer server;
JettySolrRunner jetty;
int port = 0;
static final String context = "/example";
public void testWithXml() throws Exception {
CommonsHttpSolrServer commonsHttpSolrServer = (CommonsHttpSolrServer) getSolrServer();
commonsHttpSolrServer.setRequestWriter(new RequestWriter());
commonsHttpSolrServer.deleteByQuery( "*:*" ); // delete everything!
doIt(commonsHttpSolrServer);
}
public void testWithBinary()throws Exception{
CommonsHttpSolrServer commonsHttpSolrServer = (CommonsHttpSolrServer) getSolrServer();
commonsHttpSolrServer.setRequestWriter(new BinaryRequestWriter());
commonsHttpSolrServer.deleteByQuery( "*:*" ); // delete everything!
doIt(commonsHttpSolrServer);
}
private void doIt(CommonsHttpSolrServer commonsHttpSolrServer) throws SolrServerException, IOException {
final int[] counter = new int[1];
counter[0] = 0;
commonsHttpSolrServer.addAndCommit(new Iterator<SolrInputDocument>() {
public boolean hasNext() {
return counter[0] < numdocs;
}
public SolrInputDocument next() {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "" + (++counter[0]));
doc.addField("cat", "foocat");
return doc;
}
public void remove() {
//do nothing
}
}, null);
SolrQuery query = new SolrQuery("*:*");
QueryResponse response = commonsHttpSolrServer.query(query);
assertEquals(0, response.getStatus());
assertEquals(numdocs, response.getResults().getNumFound());
}
@Override public void setUp() throws Exception
{
super.setUp();
jetty = new JettySolrRunner( context, 0 );
jetty.start();
port = jetty.getLocalPort();
server = this.createNewSolrServer();
}
@Override public void tearDown() throws Exception
{
super.tearDown();
jetty.stop(); // stop the server
}
@Override
protected SolrServer getSolrServer()
{
return server;
}
@Override
protected SolrServer createNewSolrServer()
{
try {
// setup the server...
String url = "http://localhost:"+port+context;
CommonsHttpSolrServer s = new CommonsHttpSolrServer( url );
s.setConnectionTimeout(100); // 1/10th sec
s.setDefaultMaxConnectionsPerHost(100);
s.setMaxTotalConnections(100);
return s;
}
catch( Exception ex ) {
throw new RuntimeException( ex );
}
}
}