diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index 630ec605829..9f98eb52f05 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -92,8 +92,8 @@ public class SolrCmdDistributor { public void finish() { // piggyback on any outstanding adds or deletes if possible. - flushAdds(1, null, null); - flushDeletes(1, null, null); + flushAdds(1); + flushDeletes(1); checkResponses(true); } @@ -108,11 +108,11 @@ public class SolrCmdDistributor { } } - public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams commitParams) throws IOException { + public void distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException { checkResponses(false); // make sure any pending deletes are flushed - flushDeletes(1, null, null); + flushDeletes(1); // TODO: this is brittle // need to make a clone since these commands may be reused @@ -124,7 +124,7 @@ public class SolrCmdDistributor { clone.setVersion(cmd.getVersion()); AddRequest addRequest = new AddRequest(); addRequest.cmd = clone; - addRequest.params = commitParams; + addRequest.params = params; for (Node node : nodes) { List alist = adds.get(node); @@ -135,7 +135,7 @@ public class SolrCmdDistributor { alist.add(addRequest); } - flushAdds(maxBufferedAddsPerServer, null, null); + flushAdds(maxBufferedAddsPerServer); } public void distribCommit(CommitUpdateCommand cmd, List nodes, @@ -168,7 +168,7 @@ public class SolrCmdDistributor { private void doDelete(DeleteUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException { - flushAdds(1, null, null); + flushAdds(1); DeleteUpdateCommand clonedCmd = clone(cmd); DeleteRequest deleteRequest = new DeleteRequest(); @@ -184,7 +184,7 @@ public class SolrCmdDistributor { dlist.add(deleteRequest); } - flushDeletes(maxBufferedDeletesPerServer, null, null); + flushDeletes(maxBufferedDeletesPerServer); } void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) { @@ -193,7 +193,7 @@ public class SolrCmdDistributor { : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher); } - boolean flushAdds(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) { + boolean flushAdds(int limit) { // check for pending deletes Set removeNodes = new HashSet(); @@ -205,8 +205,6 @@ public class SolrCmdDistributor { UpdateRequestExt ureq = new UpdateRequestExt(); - addCommit(ureq, ccmd); - ModifiableSolrParams combinedParams = new ModifiableSolrParams(); for (AddRequest aReq : alist) { @@ -216,7 +214,6 @@ public class SolrCmdDistributor { ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); } - if (commitParams != null) combinedParams.add(commitParams); if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams()); ureq.getParams().add(combinedParams); @@ -232,7 +229,7 @@ public class SolrCmdDistributor { return true; } - boolean flushDeletes(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) { + boolean flushDeletes(int limit) { // check for pending deletes Set removeNodes = new HashSet(); @@ -242,8 +239,6 @@ public class SolrCmdDistributor { if (dlist == null || dlist.size() < limit) return false; UpdateRequestExt ureq = new UpdateRequestExt(); - addCommit(ureq, ccmd); - ModifiableSolrParams combinedParams = new ModifiableSolrParams(); for (DeleteRequest dReq : dlist) { @@ -255,7 +250,6 @@ public class SolrCmdDistributor { ureq.deleteByQuery(cmd.query); } - if (commitParams != null) combinedParams.add(commitParams); if (ureq.getParams() == null) ureq .setParams(new ModifiableSolrParams()); ureq.getParams().add(combinedParams); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index e6fa2d07cf4..4ef01214cb8 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -60,7 +60,6 @@ import org.apache.solr.update.UpdateHandler; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.VersionBucket; import org.apache.solr.update.VersionInfo; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,6 +272,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (isLeader) { params.set(SEEN_LEADER, true); } + params.remove("commit"); // this will be distributed from the local commit cmdDistrib.distribAdd(cmd, nodes, params); } @@ -493,6 +493,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (isLeader) { params.set(SEEN_LEADER, true); } + params.remove("commit"); // we already will have forwarded this from our local commit cmdDistrib.distribDelete(cmd, nodes, params); } @@ -569,6 +570,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } + params.remove("commit"); // this will be distributed from the local commit cmdDistrib.distribDelete(cmd, leaders, params); if (!leaderForAnyShard) { diff --git a/solr/core/src/test-files/books_numeric_ids.csv b/solr/core/src/test-files/books_numeric_ids.csv new file mode 100644 index 00000000000..817e8b769cf --- /dev/null +++ b/solr/core/src/test-files/books_numeric_ids.csv @@ -0,0 +1,11 @@ +id,cat,name,price,inStock,author_t,series_t,sequence_i,genre_s +0553573403,book,A Game of Thrones,7.99,true,George R.R. Martin,"A Song of Ice and Fire",1,fantasy +0553579908,book,A Clash of Kings,7.99,true,George R.R. Martin,"A Song of Ice and Fire",2,fantasy +0553573429,book,A Storm of Swords,7.99,true,George R.R. Martin,"A Song of Ice and Fire",3,fantasy +0553293354,book,Foundation,7.99,true,Isaac Asimov,Foundation Novels,1,scifi +0812521390,book,The Black Company,6.99,false,Glen Cook,The Chronicles of The Black Company,1,fantasy +0812550706,book,Ender's Game,6.99,true,Orson Scott Card,Ender,1,scifi +0441385532,book,Jhereg,7.95,false,Steven Brust,Vlad Taltos,1,fantasy +0380014300,book,Nine Princes In Amber,6.99,true,Roger Zelazny,the Chronicles of Amber,1,fantasy +0805080481,book,The Book of Three,5.99,true,Lloyd Alexander,The Chronicles of Prydain,1,fantasy +0805080499,book,The Black Cauldron,5.99,true,Lloyd Alexander,The Chronicles of Prydain,2,fantasy diff --git a/solr/core/src/test-files/solr/conf/schema.xml b/solr/core/src/test-files/solr/conf/schema.xml index a7f3edc9c8f..7cb97e59d9e 100644 --- a/solr/core/src/test-files/solr/conf/schema.xml +++ b/solr/core/src/test-files/solr/conf/schema.xml @@ -538,6 +538,11 @@ + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index 4668f027a87..3efa76376da 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -39,13 +39,17 @@ import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; +import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.Create; +import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; import org.apache.solr.update.SolrCmdDistributor.Request; import org.apache.solr.util.DefaultSolrThreadFactory; @@ -274,12 +278,46 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { testANewCollectionInOneInstance(); testSearchByCollectionName(); testANewCollectionInOneInstanceWithManualShardAssignement(); + testNumberOfCommitsWithCommitAfterAdd(); + // Thread.sleep(10000000000L); if (DEBUG) { super.printLayout(); } } + private void testNumberOfCommitsWithCommitAfterAdd() + throws MalformedURLException, SolrServerException, IOException { + long startCommits = getNumCommits((CommonsHttpSolrServer) clients.get(0)); + + ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/csv"); + up.addFile(getFile("books_numeric_ids.csv")); + up.setCommitWithin(900000); + up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true); + NamedList result = clients.get(0).request(up); + + long endCommits = getNumCommits((CommonsHttpSolrServer) clients.get(0)); + + assertEquals(startCommits + 1L, endCommits); + } + + private Long getNumCommits(CommonsHttpSolrServer solrServer) throws MalformedURLException, + SolrServerException, IOException { + CommonsHttpSolrServer server = new CommonsHttpSolrServer(solrServer.getBaseURL()); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("qt", "/admin/mbeans?key=updateHandler&stats=true"); + // use generic request to avoid extra processing of queries + QueryRequest req = new QueryRequest(params); + NamedList resp = server.request(req); + NamedList mbeans = (NamedList) resp.get("solr-mbeans"); + NamedList uhandlerCat = (NamedList) mbeans.get("UPDATEHANDLER"); + NamedList uhandler = (NamedList) uhandlerCat.get("updateHandler"); + NamedList stats = (NamedList) uhandler.get("stats"); + Long commits = (Long) stats.get("commits"); + System.out.println("resp:" + resp); + return commits; + } + private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception { List collectionClients = new ArrayList(); SolrServer client = clients.get(0);