mirror of https://github.com/apache/lucene.git
SOLR-3194: Attaching a commit to an update request results in too many commits on each node.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1296631 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fbe25c380d
commit
b2257563e3
|
@ -92,8 +92,8 @@ public class SolrCmdDistributor {
|
||||||
public void finish() {
|
public void finish() {
|
||||||
|
|
||||||
// piggyback on any outstanding adds or deletes if possible.
|
// piggyback on any outstanding adds or deletes if possible.
|
||||||
flushAdds(1, null, null);
|
flushAdds(1);
|
||||||
flushDeletes(1, null, null);
|
flushDeletes(1);
|
||||||
|
|
||||||
checkResponses(true);
|
checkResponses(true);
|
||||||
}
|
}
|
||||||
|
@ -108,11 +108,11 @@ public class SolrCmdDistributor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams commitParams) throws IOException {
|
public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
|
||||||
checkResponses(false);
|
checkResponses(false);
|
||||||
|
|
||||||
// make sure any pending deletes are flushed
|
// make sure any pending deletes are flushed
|
||||||
flushDeletes(1, null, null);
|
flushDeletes(1);
|
||||||
|
|
||||||
// TODO: this is brittle
|
// TODO: this is brittle
|
||||||
// need to make a clone since these commands may be reused
|
// need to make a clone since these commands may be reused
|
||||||
|
@ -124,7 +124,7 @@ public class SolrCmdDistributor {
|
||||||
clone.setVersion(cmd.getVersion());
|
clone.setVersion(cmd.getVersion());
|
||||||
AddRequest addRequest = new AddRequest();
|
AddRequest addRequest = new AddRequest();
|
||||||
addRequest.cmd = clone;
|
addRequest.cmd = clone;
|
||||||
addRequest.params = commitParams;
|
addRequest.params = params;
|
||||||
|
|
||||||
for (Node node : nodes) {
|
for (Node node : nodes) {
|
||||||
List<AddRequest> alist = adds.get(node);
|
List<AddRequest> alist = adds.get(node);
|
||||||
|
@ -135,7 +135,7 @@ public class SolrCmdDistributor {
|
||||||
alist.add(addRequest);
|
alist.add(addRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
flushAdds(maxBufferedAddsPerServer, null, null);
|
flushAdds(maxBufferedAddsPerServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
|
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
|
||||||
|
@ -168,7 +168,7 @@ public class SolrCmdDistributor {
|
||||||
private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
|
private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
|
||||||
ModifiableSolrParams params) throws IOException {
|
ModifiableSolrParams params) throws IOException {
|
||||||
|
|
||||||
flushAdds(1, null, null);
|
flushAdds(1);
|
||||||
|
|
||||||
DeleteUpdateCommand clonedCmd = clone(cmd);
|
DeleteUpdateCommand clonedCmd = clone(cmd);
|
||||||
DeleteRequest deleteRequest = new DeleteRequest();
|
DeleteRequest deleteRequest = new DeleteRequest();
|
||||||
|
@ -184,7 +184,7 @@ public class SolrCmdDistributor {
|
||||||
dlist.add(deleteRequest);
|
dlist.add(deleteRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
flushDeletes(maxBufferedDeletesPerServer, null, null);
|
flushDeletes(maxBufferedDeletesPerServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
|
void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
|
||||||
|
@ -193,7 +193,7 @@ public class SolrCmdDistributor {
|
||||||
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
|
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean flushAdds(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) {
|
boolean flushAdds(int limit) {
|
||||||
// check for pending deletes
|
// check for pending deletes
|
||||||
|
|
||||||
Set<Node> removeNodes = new HashSet<Node>();
|
Set<Node> removeNodes = new HashSet<Node>();
|
||||||
|
@ -205,8 +205,6 @@ public class SolrCmdDistributor {
|
||||||
|
|
||||||
UpdateRequestExt ureq = new UpdateRequestExt();
|
UpdateRequestExt ureq = new UpdateRequestExt();
|
||||||
|
|
||||||
addCommit(ureq, ccmd);
|
|
||||||
|
|
||||||
ModifiableSolrParams combinedParams = new ModifiableSolrParams();
|
ModifiableSolrParams combinedParams = new ModifiableSolrParams();
|
||||||
|
|
||||||
for (AddRequest aReq : alist) {
|
for (AddRequest aReq : alist) {
|
||||||
|
@ -216,7 +214,6 @@ public class SolrCmdDistributor {
|
||||||
ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
|
ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commitParams != null) combinedParams.add(commitParams);
|
|
||||||
if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
|
if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
|
||||||
ureq.getParams().add(combinedParams);
|
ureq.getParams().add(combinedParams);
|
||||||
|
|
||||||
|
@ -232,7 +229,7 @@ public class SolrCmdDistributor {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean flushDeletes(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams commitParams) {
|
boolean flushDeletes(int limit) {
|
||||||
// check for pending deletes
|
// check for pending deletes
|
||||||
|
|
||||||
Set<Node> removeNodes = new HashSet<Node>();
|
Set<Node> removeNodes = new HashSet<Node>();
|
||||||
|
@ -242,8 +239,6 @@ public class SolrCmdDistributor {
|
||||||
if (dlist == null || dlist.size() < limit) return false;
|
if (dlist == null || dlist.size() < limit) return false;
|
||||||
UpdateRequestExt ureq = new UpdateRequestExt();
|
UpdateRequestExt ureq = new UpdateRequestExt();
|
||||||
|
|
||||||
addCommit(ureq, ccmd);
|
|
||||||
|
|
||||||
ModifiableSolrParams combinedParams = new ModifiableSolrParams();
|
ModifiableSolrParams combinedParams = new ModifiableSolrParams();
|
||||||
|
|
||||||
for (DeleteRequest dReq : dlist) {
|
for (DeleteRequest dReq : dlist) {
|
||||||
|
@ -255,7 +250,6 @@ public class SolrCmdDistributor {
|
||||||
ureq.deleteByQuery(cmd.query);
|
ureq.deleteByQuery(cmd.query);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commitParams != null) combinedParams.add(commitParams);
|
|
||||||
if (ureq.getParams() == null) ureq
|
if (ureq.getParams() == null) ureq
|
||||||
.setParams(new ModifiableSolrParams());
|
.setParams(new ModifiableSolrParams());
|
||||||
ureq.getParams().add(combinedParams);
|
ureq.getParams().add(combinedParams);
|
||||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.solr.update.UpdateHandler;
|
||||||
import org.apache.solr.update.UpdateLog;
|
import org.apache.solr.update.UpdateLog;
|
||||||
import org.apache.solr.update.VersionBucket;
|
import org.apache.solr.update.VersionBucket;
|
||||||
import org.apache.solr.update.VersionInfo;
|
import org.apache.solr.update.VersionInfo;
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -273,6 +272,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||||
if (isLeader) {
|
if (isLeader) {
|
||||||
params.set(SEEN_LEADER, true);
|
params.set(SEEN_LEADER, true);
|
||||||
}
|
}
|
||||||
|
params.remove("commit"); // this will be distributed from the local commit
|
||||||
cmdDistrib.distribAdd(cmd, nodes, params);
|
cmdDistrib.distribAdd(cmd, nodes, params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -493,6 +493,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||||
if (isLeader) {
|
if (isLeader) {
|
||||||
params.set(SEEN_LEADER, true);
|
params.set(SEEN_LEADER, true);
|
||||||
}
|
}
|
||||||
|
params.remove("commit"); // we already will have forwarded this from our local commit
|
||||||
cmdDistrib.distribDelete(cmd, nodes, params);
|
cmdDistrib.distribDelete(cmd, nodes, params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -569,6 +570,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params.remove("commit"); // this will be distributed from the local commit
|
||||||
cmdDistrib.distribDelete(cmd, leaders, params);
|
cmdDistrib.distribDelete(cmd, leaders, params);
|
||||||
|
|
||||||
if (!leaderForAnyShard) {
|
if (!leaderForAnyShard) {
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
id,cat,name,price,inStock,author_t,series_t,sequence_i,genre_s
|
||||||
|
0553573403,book,A Game of Thrones,7.99,true,George R.R. Martin,"A Song of Ice and Fire",1,fantasy
|
||||||
|
0553579908,book,A Clash of Kings,7.99,true,George R.R. Martin,"A Song of Ice and Fire",2,fantasy
|
||||||
|
0553573429,book,A Storm of Swords,7.99,true,George R.R. Martin,"A Song of Ice and Fire",3,fantasy
|
||||||
|
0553293354,book,Foundation,7.99,true,Isaac Asimov,Foundation Novels,1,scifi
|
||||||
|
0812521390,book,The Black Company,6.99,false,Glen Cook,The Chronicles of The Black Company,1,fantasy
|
||||||
|
0812550706,book,Ender's Game,6.99,true,Orson Scott Card,Ender,1,scifi
|
||||||
|
0441385532,book,Jhereg,7.95,false,Steven Brust,Vlad Taltos,1,fantasy
|
||||||
|
0380014300,book,Nine Princes In Amber,6.99,true,Roger Zelazny,the Chronicles of Amber,1,fantasy
|
||||||
|
0805080481,book,The Book of Three,5.99,true,Lloyd Alexander,The Chronicles of Prydain,1,fantasy
|
||||||
|
0805080499,book,The Black Cauldron,5.99,true,Lloyd Alexander,The Chronicles of Prydain,2,fantasy
|
|
|
@ -538,6 +538,11 @@
|
||||||
|
|
||||||
<field name="nullfirst" type="string" indexed="true" stored="true" sortMissingFirst="true" multiValued="false"/>
|
<field name="nullfirst" type="string" indexed="true" stored="true" sortMissingFirst="true" multiValued="false"/>
|
||||||
|
|
||||||
|
|
||||||
|
<field name="cat" type="string" indexed="true" stored="true" multiValued="true"/>
|
||||||
|
<field name="price" type="float" indexed="true" stored="true"/>
|
||||||
|
<field name="inStock" type="boolean" indexed="true" stored="true" />
|
||||||
|
|
||||||
<field name="subword" type="subword" indexed="true" stored="true"/>
|
<field name="subword" type="subword" indexed="true" stored="true"/>
|
||||||
<field name="subword_offsets" type="subword" indexed="true" stored="true" termOffsets="true"/>
|
<field name="subword_offsets" type="subword" indexed="true" stored="true" termOffsets="true"/>
|
||||||
<field name="numericsubword" type="numericsubword" indexed="true" stored="true"/>
|
<field name="numericsubword" type="numericsubword" indexed="true" stored="true"/>
|
||||||
|
|
|
@ -39,13 +39,17 @@ import org.apache.solr.client.solrj.SolrServer;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
||||||
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
||||||
|
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||||
|
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
|
||||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
|
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
|
||||||
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.update.SolrCmdDistributor.Request;
|
import org.apache.solr.update.SolrCmdDistributor.Request;
|
||||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||||
|
|
||||||
|
@ -274,12 +278,46 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
|
||||||
testANewCollectionInOneInstance();
|
testANewCollectionInOneInstance();
|
||||||
testSearchByCollectionName();
|
testSearchByCollectionName();
|
||||||
testANewCollectionInOneInstanceWithManualShardAssignement();
|
testANewCollectionInOneInstanceWithManualShardAssignement();
|
||||||
|
testNumberOfCommitsWithCommitAfterAdd();
|
||||||
|
|
||||||
// Thread.sleep(10000000000L);
|
// Thread.sleep(10000000000L);
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
super.printLayout();
|
super.printLayout();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testNumberOfCommitsWithCommitAfterAdd()
|
||||||
|
throws MalformedURLException, SolrServerException, IOException {
|
||||||
|
long startCommits = getNumCommits((CommonsHttpSolrServer) clients.get(0));
|
||||||
|
|
||||||
|
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/csv");
|
||||||
|
up.addFile(getFile("books_numeric_ids.csv"));
|
||||||
|
up.setCommitWithin(900000);
|
||||||
|
up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||||
|
NamedList<Object> result = clients.get(0).request(up);
|
||||||
|
|
||||||
|
long endCommits = getNumCommits((CommonsHttpSolrServer) clients.get(0));
|
||||||
|
|
||||||
|
assertEquals(startCommits + 1L, endCommits);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long getNumCommits(CommonsHttpSolrServer solrServer) throws MalformedURLException,
|
||||||
|
SolrServerException, IOException {
|
||||||
|
CommonsHttpSolrServer server = new CommonsHttpSolrServer(solrServer.getBaseURL());
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set("qt", "/admin/mbeans?key=updateHandler&stats=true");
|
||||||
|
// use generic request to avoid extra processing of queries
|
||||||
|
QueryRequest req = new QueryRequest(params);
|
||||||
|
NamedList<Object> resp = server.request(req);
|
||||||
|
NamedList mbeans = (NamedList) resp.get("solr-mbeans");
|
||||||
|
NamedList uhandlerCat = (NamedList) mbeans.get("UPDATEHANDLER");
|
||||||
|
NamedList uhandler = (NamedList) uhandlerCat.get("updateHandler");
|
||||||
|
NamedList stats = (NamedList) uhandler.get("stats");
|
||||||
|
Long commits = (Long) stats.get("commits");
|
||||||
|
System.out.println("resp:" + resp);
|
||||||
|
return commits;
|
||||||
|
}
|
||||||
|
|
||||||
private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception {
|
private void testANewCollectionInOneInstanceWithManualShardAssignement() throws Exception {
|
||||||
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
|
List<SolrServer> collectionClients = new ArrayList<SolrServer>();
|
||||||
SolrServer client = clients.get(0);
|
SolrServer client = clients.get(0);
|
||||||
|
|
Loading…
Reference in New Issue