From 54819167be0e06ee00c36eaba90866902c8a6904 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 19 Mar 2009 09:15:10 +0000 Subject: [PATCH] SOLR-1038 -- Enhance CommonsHttpSolrServer to add docs in batch using an iterator API git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@755882 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../solrj/impl/BinaryRequestWriter.java | 3 +- .../solrj/impl/CommonsHttpSolrServer.java | 35 +++-- .../request/JavaBinUpdateRequestCodec.java | 29 ++-- .../client/solrj/request/RequestWriter.java | 3 +- .../client/solrj/request/UpdateRequest.java | 27 +++- .../solr/client/solrj/TestBatchUpdate.java | 128 ++++++++++++++++++ 7 files changed, 201 insertions(+), 26 deletions(-) create mode 100644 src/test/org/apache/solr/client/solrj/TestBatchUpdate.java diff --git a/CHANGES.txt b/CHANGES.txt index a34fa6e4ddd..7d2b9175561 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -184,6 +184,8 @@ New Features range filter in Lucene contrib-queries and can be dramatically faster than normal range queries. (Uwe Schindler, shalin) +31. SOLR-1038: Enhance CommonsHttpSolrServer to add docs in batch using an iterator API (Noble Paul via shalin) + Optimizations ---------------------- diff --git a/src/solrj/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java b/src/solrj/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java index 2ab2c97d77f..a2fbddc77d1 100644 --- a/src/solrj/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java +++ b/src/solrj/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java @@ -41,7 +41,8 @@ public class BinaryRequestWriter extends RequestWriter { UpdateRequest updateRequest = (UpdateRequest) req; if (isNull(updateRequest.getDocuments()) && isNull(updateRequest.getDeleteById()) && - isNull(updateRequest.getDeleteQuery())) { + isNull(updateRequest.getDeleteQuery()) + && (updateRequest.getDocIterator() == null) ) { return null; } List l = new ArrayList(); diff --git a/src/solrj/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java b/src/solrj/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java index 3654c063002..9128d881a9e 100644 --- a/src/solrj/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java +++ b/src/solrj/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java @@ -22,10 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.MalformedURLException; import java.net.URL; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.zip.GZIPInputStream; import java.util.zip.InflaterInputStream; @@ -51,13 +48,13 @@ import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrException; -import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.params.DefaultSolrParams; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.*; import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.NamedList; import org.slf4j.Logger; @@ -601,4 +598,26 @@ public class CommonsHttpSolrServer extends SolrServer public void setRequestWriter(RequestWriter requestWriter) { this.requestWriter = requestWriter; } + + /** + * Adds the documents supplied by the given iterator. A commit is called after all the documents are added. + * If an exception is thrown, commit is not called. + * + * @param docIterator the iterator which returns SolrInputDocument instances + * @param commitParams additional parameters such as optimize, waitFlush, waitSearcher + * + * @return the response from the SolrServer + */ + public UpdateResponse addAndCommit(Iterator docIterator, SolrParams commitParams) + throws SolrServerException, IOException { + UpdateRequest req = new UpdateRequest(); + req.setDocIterator(docIterator); + if (commitParams instanceof ModifiableSolrParams) { + req.setParams((ModifiableSolrParams) commitParams); + } else if (commitParams != null) { + req.setParams(new ModifiableSolrParams(commitParams)); + } + req.setParam(UpdateParams.COMMIT, "true"); + return req.process(this); + } } diff --git a/src/solrj/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java b/src/solrj/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java index 57cb7958089..bdb98664e84 100644 --- a/src/solrj/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java +++ b/src/solrj/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java @@ -27,10 +27,7 @@ import org.apache.solr.common.util.NamedList; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.util.*; /** * Provides methods for marshalling an UpdateRequest to a NamedList which can be serialized in the javabin format and @@ -51,25 +48,33 @@ public class JavaBinUpdateRequestCodec { * @throws IOException in case of an exception during marshalling or writing to the stream */ public void marshal(UpdateRequest updateRequest, OutputStream os) throws IOException { - JavaBinCodec codec = new JavaBinCodec(); NamedList nl = new NamedList(); NamedList params = solrParamsToNamedList(updateRequest.getParams()); if (updateRequest.getCommitWithin() != -1) { params.add("commitWithin", updateRequest.getCommitWithin()); } - List> doclist = new ArrayList>(); - if (updateRequest.getDocuments() != null) { - for (SolrInputDocument doc : updateRequest.getDocuments()) { - doclist.add(solrInputDocumentToList(doc)); - } + Iterator docIter = null; + if (updateRequest.getDocuments() != null) { + docIter = updateRequest.getDocuments().iterator(); + } + if(updateRequest.getDocIterator() != null){ + docIter = updateRequest.getDocIterator(); } nl.add("params", params);// 0: params nl.add("delById", updateRequest.getDeleteById()); nl.add("delByQ", updateRequest.getDeleteQuery()); - nl.add("docs", doclist.iterator()); - codec.marshal(nl, os); + nl.add("docs", docIter); + new JavaBinCodec(){ + public void writeMap(Map val) throws IOException { + if (val instanceof SolrInputDocument) { + writeVal(solrInputDocumentToList((SolrInputDocument) val)); + } else { + super.writeMap(val); + } + } + }.marshal(nl, os); } /** diff --git a/src/solrj/org/apache/solr/client/solrj/request/RequestWriter.java b/src/solrj/org/apache/solr/client/solrj/request/RequestWriter.java index 9e6574b79a5..9104fa89075 100644 --- a/src/solrj/org/apache/solr/client/solrj/request/RequestWriter.java +++ b/src/solrj/org/apache/solr/client/solrj/request/RequestWriter.java @@ -51,7 +51,8 @@ public class RequestWriter { private boolean isEmpty(UpdateRequest updateRequest) { return isNull(updateRequest.getDocuments()) && isNull(updateRequest.getDeleteById()) && - isNull(updateRequest.getDeleteQuery()); + isNull(updateRequest.getDeleteQuery()) && + updateRequest.getDocIterator() == null; } public String getPath(SolrRequest req) { diff --git a/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java b/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java index aad6ca83b55..a1af0b9775b 100644 --- a/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java +++ b/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java @@ -23,6 +23,7 @@ import java.io.Writer; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Iterator; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; @@ -48,6 +49,7 @@ public class UpdateRequest extends SolrRequest }; private List documents = null; + private Iterator docIterator = null; private List deleteById = null; private List deleteQuery = null; @@ -165,6 +167,10 @@ public class UpdateRequest extends SolrRequest this.params = params; } + public void setDocIterator(Iterator docIterator) { + this.docIterator = docIterator; + } + //-------------------------------------------------------------------------- //-------------------------------------------------------------------------- @@ -188,16 +194,26 @@ public class UpdateRequest extends SolrRequest * @since solr 1.4 */ public void writeXML( Writer writer ) throws IOException { - if( documents != null && documents.size() > 0 ) { + if( (documents != null && documents.size() > 0) || docIterator != null) { if( commitWithin > 0 ) { writer.write(""); } else { writer.write(""); } - for (SolrInputDocument doc : documents ) { - if( doc != null ) { - ClientUtils.writeXML( doc, writer ); + if(documents != null) { + for (SolrInputDocument doc : documents) { + if (doc != null) { + ClientUtils.writeXML(doc, writer); + } + } + } + if (docIterator != null) { + while (docIterator.hasNext()) { + SolrInputDocument doc = docIterator.next(); + if (doc != null) { + ClientUtils.writeXML(doc, writer); + } } } writer.write(""); @@ -253,6 +269,9 @@ public class UpdateRequest extends SolrRequest return documents; } + public Iterator getDocIterator() { + return docIterator; + } public List getDeleteById() { return deleteById; diff --git a/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java b/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java new file mode 100644 index 00000000000..cd22579e722 --- /dev/null +++ b/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj; + +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.impl.BinaryRequestWriter; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.request.RequestWriter; +import org.apache.solr.common.SolrInputDocument; + +import java.util.Iterator; +import java.io.IOException; + +/** + * Test for SOLR-1038 + * + * @since solr 1.4 + * @version $Id$ + */ +public class TestBatchUpdate extends SolrExampleTestBase { + static final int numdocs = 1000; + + SolrServer server; + JettySolrRunner jetty; + + int port = 0; + static final String context = "/example"; + + public void testWithXml() throws Exception { + CommonsHttpSolrServer commonsHttpSolrServer = (CommonsHttpSolrServer) getSolrServer(); + commonsHttpSolrServer.setRequestWriter(new RequestWriter()); + commonsHttpSolrServer.deleteByQuery( "*:*" ); // delete everything! + doIt(commonsHttpSolrServer); + } + + public void testWithBinary()throws Exception{ + CommonsHttpSolrServer commonsHttpSolrServer = (CommonsHttpSolrServer) getSolrServer(); + commonsHttpSolrServer.setRequestWriter(new BinaryRequestWriter()); + commonsHttpSolrServer.deleteByQuery( "*:*" ); // delete everything! + doIt(commonsHttpSolrServer); + } + + private void doIt(CommonsHttpSolrServer commonsHttpSolrServer) throws SolrServerException, IOException { + final int[] counter = new int[1]; + counter[0] = 0; + commonsHttpSolrServer.addAndCommit(new Iterator() { + + public boolean hasNext() { + return counter[0] < numdocs; + } + + public SolrInputDocument next() { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "" + (++counter[0])); + doc.addField("cat", "foocat"); + return doc; + } + + public void remove() { + //do nothing + + } + }, null); + SolrQuery query = new SolrQuery("*:*"); + QueryResponse response = commonsHttpSolrServer.query(query); + assertEquals(0, response.getStatus()); + assertEquals(numdocs, response.getResults().getNumFound()); + } + + + + + @Override public void setUp() throws Exception + { + super.setUp(); + + jetty = new JettySolrRunner( context, 0 ); + jetty.start(); + port = jetty.getLocalPort(); + + server = this.createNewSolrServer(); + } + + @Override public void tearDown() throws Exception + { + super.tearDown(); + jetty.stop(); // stop the server + } + + + @Override + protected SolrServer getSolrServer() + { + return server; + } + + @Override + protected SolrServer createNewSolrServer() + { + try { + // setup the server... + String url = "http://localhost:"+port+context; + CommonsHttpSolrServer s = new CommonsHttpSolrServer( url ); + s.setConnectionTimeout(100); // 1/10th sec + s.setDefaultMaxConnectionsPerHost(100); + s.setMaxTotalConnections(100); + return s; + } + catch( Exception ex ) { + throw new RuntimeException( ex ); + } + } +}