SOLR-4923: Commits to non leaders as part of a request that also contain updates can execute out of order.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1493779 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-06-17 14:31:22 +00:00
parent 26416ba2e3
commit fc9c0a8f14
7 changed files with 146 additions and 38 deletions

View File

@ -157,6 +157,8 @@ Bug Fixes
> SOLR-4850, cores defined as loadOnStartup=true, transient=false can't be searched
(Erick Erickson)
* SOLR-4923: Commits to non leaders as part of a request that also contain updates
can execute out of order. (hossman, Mark Miller)
Other Changes
----------------------

View File

@ -1131,15 +1131,37 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
updateCommand = cmd;
List<Node> nodes = null;
boolean singleLeader = false;
if (zkEnabled) {
zkCheck();
nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName());
if (isLeader && nodes.size() == 1) {
singleLeader = true;
}
}
if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
doLocalCommit(cmd);
} else if (zkEnabled) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
params.set(COMMIT_END_POINT, true);
if (nodes != null) {
cmdDistrib.distribCommit(cmd, nodes, params);
finish();
}
}
}
}
private void doLocalCommit(CommitUpdateCommand cmd) throws IOException {
if (vinfo != null) {
vinfo.lockForUpdate();
}
@ -1156,23 +1178,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
vinfo.unlockForUpdate();
}
}
// TODO: we should consider this? commit everyone in the current collection
if (zkEnabled) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
params.set(COMMIT_END_POINT, true);
String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor());
List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName(), coreNodeName);
if (nodes != null) {
cmdDistrib.distribCommit(cmd, nodes, params);
finish();
}
}
}
}
@Override
@ -1184,7 +1189,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, String coreNodeName) {
private List<Node> getCollectionUrls(SolrQueryRequest req, String collection) {
ClusterState clusterState = req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getClusterState();
List<Node> urls = new ArrayList<Node>();
@ -1200,7 +1205,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
for (Entry<String,Replica> entry : shardMap.entrySet()) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !entry.getKey().equals(coreNodeName)) {
if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
urls.add(new StdNode(nodeProps));
}
}

View File

@ -152,7 +152,7 @@ public class AliasIntegrationTest extends AbstractFullDistribZkTestBase {
createAlias("testalias", "collection2,collection1");
// search with new cloud client
CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress());
CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
query = new SolrQuery("*:*");
query.set("collection", "testalias");
res = cloudSolrServer.query(query);

View File

@ -160,7 +160,26 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(false);
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
del("*:*");
queryAndCompareShards(params("q", "*:*", "distrib", "false", "sanity_check", "is_empty"));
// ask every individual replica of every shard to update+commit the same doc id
// with an incrementing counter on each update+commit
int foo_i_counter = 0;
for (SolrServer server : clients) {
foo_i_counter++;
indexDoc(server, params("commit", "true"), // SOLR-4923
sdoc(id,1, i1,100, tlong,100, "foo_i", foo_i_counter));
// after every update+commit, check all the shards consistency
queryAndCompareShards(params("q", "id:1", "distrib", "false",
"sanity_check", "non_distrib_id_1_lookup"));
queryAndCompareShards(params("q", "id:1",
"sanity_check", "distrib_id_1_lookup"));
}
indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
@ -195,10 +214,10 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
}
commit();
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
queryAndCompareShards(params("q", "*:*",
"sort", "id desc",
"distrib", "false",
"sanity_check", "is_empty"));
// random value sort
for (String f : fieldNames) {
@ -1079,7 +1098,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
if (commondCloudSolrServer == null) {
synchronized(this) {
try {
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress());
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
commondCloudSolrServer.getLbServer().setConnectionTimeout(15000);
commondCloudSolrServer.getLbServer().setSoTimeout(30000);

View File

@ -91,6 +91,13 @@ public class CloudSolrServer extends SolrServer {
this.lbServer = new LBHttpSolrServer(myClient);
this.updatesToLeaders = true;
}
public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException {
this.zkHost = zkHost;
this.myClient = HttpClientUtil.createClient(null);
this.lbServer = new LBHttpSolrServer(myClient);
this.updatesToLeaders = updatesToLeaders;
}
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,

View File

@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
@ -424,6 +425,9 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
indexDoc(doc);
}
/**
* Indexes the document in both the control client, and a randomly selected client
*/
protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException {
controlClient.add(doc);
@ -432,6 +436,17 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
client.add(doc);
}
/**
* Indexes the document in both the control client and the specified client asserting
* that the respones are equivilent
*/
protected UpdateResponse indexDoc(SolrServer server, SolrParams params, SolrInputDocument... sdocs) throws IOException, SolrServerException {
UpdateResponse controlRsp = add(controlClient, params, sdocs);
UpdateResponse specificRsp = add(server, params, sdocs);
compareSolrResponses(specificRsp, controlRsp);
return specificRsp;
}
protected UpdateResponse add(SolrServer server, SolrParams params, SolrInputDocument... sdocs) throws IOException, SolrServerException {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams(params));
@ -546,6 +561,9 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
}
public QueryResponse queryAndCompare(SolrParams params, SolrServer... servers) throws SolrServerException {
return queryAndCompare(params, Arrays.<SolrServer>asList(servers));
}
public QueryResponse queryAndCompare(SolrParams params, Iterable<SolrServer> servers) throws SolrServerException {
QueryResponse first = null;
for (SolrServer server : servers) {
QueryResponse rsp = server.query(new ModifiableSolrParams(params));
@ -783,8 +801,14 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
return null;
}
protected void compareSolrResponses(SolrResponse a, SolrResponse b) {
String cmp = compare(a.getResponse(), b.getResponse(), flags, handle);
if (cmp != null) {
log.error("Mismatched responses:\n" + a + "\n" + b);
Assert.fail(cmp);
}
}
protected void compareResponses(QueryResponse a, QueryResponse b) {
String cmp;
if (System.getProperty("remove.version.field") != null) {
// we don't care if one has a version and the other doesnt -
// control vs distrib
@ -800,11 +824,7 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
}
}
}
cmp = compare(a.getResponse(), b.getResponse(), flags, handle);
if (cmp != null) {
log.error("Mismatched responses:\n" + a + "\n" + b);
Assert.fail(cmp);
}
compareSolrResponses(a, b);
}
@Test

View File

@ -230,7 +230,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected CloudSolrServer createCloudClient(String defaultCollection)
throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
server.getLbServer().getHttpClient().getParams()
.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
@ -803,13 +803,68 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
}
/**
* Executes a query against each live and active replica of the specified shard
* and aserts that the results are identical.
*
* @see #queryAndCompare
*/
public QueryResponse queryAndCompareReplicas(SolrParams params, String shard)
throws Exception {
ArrayList<SolrServer> shardClients = new ArrayList<SolrServer>(7);
updateMappingsFromZk(jettys, clients);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
List<CloudJettyRunner> solrJetties = shardToJetty.get(shard);
assertNotNull("no jetties found for shard: " + shard, solrJetties);
for (CloudJettyRunner cjetty : solrJetties) {
ZkNodeProps props = cjetty.info;
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
boolean active = props.getStr(ZkStateReader.STATE_PROP).equals(ZkStateReader.ACTIVE);
boolean live = zkStateReader.getClusterState().liveNodesContain(nodeName);
if (active && live) {
shardClients.add(cjetty.client.solrClient);
}
}
return queryAndCompare(params, shardClients);
}
/**
* For each Shard, executes a query against each live and active replica of that shard
* and asserts that the results are identical for each replica of the same shard.
* Because results are not compared between replicas of different shards, this method
* should be safe for comparing the results of any query, even if it contains
* "distrib=false", because the replicas should all be identical.
*
* @see AbstractFullDistribZkTestBase#queryAndCompareReplicas(SolrParams, String)
*/
public void queryAndCompareShards(SolrParams params) throws Exception {
updateMappingsFromZk(jettys, clients);
List<String> shards = new ArrayList<String>(shardToJetty.keySet());
for (String shard : shards) {
queryAndCompareReplicas(params, shard);
}
}
/**
* Returns a non-null string if replicas within the same shard do not have a
* consistent number of documents.
*/
protected void checkShardConsistency(String shard) throws Exception {
checkShardConsistency(shard, false, false);
}
/* Returns a non-null string if replicas within the same shard are not consistent.
* If expectFailure==false, the exact differences found will be logged since this would be an unexpected failure.
* verbose causes extra debugging into to be displayed, even if everything is consistent.
/**
* Returns a non-null string if replicas within the same shard do not have a
* consistent number of documents.
* If expectFailure==false, the exact differences found will be logged since
* this would be an unexpected failure.
* verbose causes extra debugging into to be displayed, even if everything is
* consistent.
*/
protected String checkShardConsistency(String shard, boolean expectFailure, boolean verbose)
throws Exception {
@ -1505,7 +1560,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
if (commondCloudSolrServer == null) {
synchronized(this) {
try {
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress());
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
commondCloudSolrServer.connect();
} catch (MalformedURLException e) {