diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 730b3ebf82f..c1283f70691 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -157,6 +157,8 @@ Bug Fixes > SOLR-4850, cores defined as loadOnStartup=true, transient=false can't be searched (Erick Erickson) +* SOLR-4923: Commits to non leaders as part of a request that also contain updates + can execute out of order. (hossman, Mark Miller) Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 47d57a1c604..37976eb0f42 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -1131,15 +1131,37 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } - @Override public void processCommit(CommitUpdateCommand cmd) throws IOException { updateCommand = cmd; - + List nodes = null; + boolean singleLeader = false; if (zkEnabled) { zkCheck(); + + nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor() + .getCloudDescriptor().getCollectionName()); + if (isLeader && nodes.size() == 1) { + singleLeader = true; + } } + if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) { + doLocalCommit(cmd); + } else if (zkEnabled) { + ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + if (!req.getParams().getBool(COMMIT_END_POINT, false)) { + params.set(COMMIT_END_POINT, true); + + if (nodes != null) { + cmdDistrib.distribCommit(cmd, nodes, params); + finish(); + } + } + } + } + + private void doLocalCommit(CommitUpdateCommand cmd) throws IOException { if (vinfo != null) { vinfo.lockForUpdate(); } @@ -1156,23 +1178,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { vinfo.unlockForUpdate(); } } - // TODO: we should consider this? commit everyone in the current collection - - if (zkEnabled) { - ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); - if (!req.getParams().getBool(COMMIT_END_POINT, false)) { - params.set(COMMIT_END_POINT, true); - - String coreNodeName = zkController.getCoreNodeName(req.getCore().getCoreDescriptor()); - List nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor() - .getCloudDescriptor().getCollectionName(), coreNodeName); - - if (nodes != null) { - cmdDistrib.distribCommit(cmd, nodes, params); - finish(); - } - } - } } @Override @@ -1184,7 +1189,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { - private List getCollectionUrls(SolrQueryRequest req, String collection, String coreNodeName) { + private List getCollectionUrls(SolrQueryRequest req, String collection) { ClusterState clusterState = req.getCore().getCoreDescriptor() .getCoreContainer().getZkController().getClusterState(); List urls = new ArrayList(); @@ -1200,7 +1205,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { for (Entry entry : shardMap.entrySet()) { ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); - if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !entry.getKey().equals(coreNodeName)) { + if (clusterState.liveNodesContain(nodeProps.getNodeName())) { urls.add(new StdNode(nodeProps)); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java index 2b191931116..bc37e9f9681 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java @@ -152,7 +152,7 @@ public class AliasIntegrationTest extends AbstractFullDistribZkTestBase { createAlias("testalias", "collection2,collection1"); // search with new cloud client - CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress()); + CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); query = new SolrQuery("*:*"); query.set("collection", "testalias"); res = cloudSolrServer.query(query); diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index 76c61763b4d..a9dadd1440b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -160,7 +160,26 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { waitForRecoveriesToFinish(false); + handle.clear(); + handle.put("QTime", SKIPVAL); + handle.put("timestamp", SKIPVAL); + del("*:*"); + queryAndCompareShards(params("q", "*:*", "distrib", "false", "sanity_check", "is_empty")); + + // ask every individual replica of every shard to update+commit the same doc id + // with an incrementing counter on each update+commit + int foo_i_counter = 0; + for (SolrServer server : clients) { + foo_i_counter++; + indexDoc(server, params("commit", "true"), // SOLR-4923 + sdoc(id,1, i1,100, tlong,100, "foo_i", foo_i_counter)); + // after every update+commit, check all the shards consistency + queryAndCompareShards(params("q", "id:1", "distrib", "false", + "sanity_check", "non_distrib_id_1_lookup")); + queryAndCompareShards(params("q", "id:1", + "sanity_check", "distrib_id_1_lookup")); + } indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men" ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d); @@ -195,10 +214,10 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { } commit(); - - handle.clear(); - handle.put("QTime", SKIPVAL); - handle.put("timestamp", SKIPVAL); + queryAndCompareShards(params("q", "*:*", + "sort", "id desc", + "distrib", "false", + "sanity_check", "is_empty")); // random value sort for (String f : fieldNames) { @@ -1079,7 +1098,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { if (commondCloudSolrServer == null) { synchronized(this) { try { - commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress()); + commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION); commondCloudSolrServer.getLbServer().setConnectionTimeout(15000); commondCloudSolrServer.getLbServer().setSoTimeout(30000); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java index f5fd166e58f..1fdf6912843 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -91,6 +91,13 @@ public class CloudSolrServer extends SolrServer { this.lbServer = new LBHttpSolrServer(myClient); this.updatesToLeaders = true; } + + public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException { + this.zkHost = zkHost; + this.myClient = HttpClientUtil.createClient(null); + this.lbServer = new LBHttpSolrServer(myClient); + this.updatesToLeaders = updatesToLeaders; +} /** * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state, diff --git a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java index 743240aa23b..1a8ebf43d53 100644 --- a/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java +++ b/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java @@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; @@ -424,6 +425,9 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 { indexDoc(doc); } + /** + * Indexes the document in both the control client, and a randomly selected client + */ protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException { controlClient.add(doc); @@ -432,6 +436,17 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 { client.add(doc); } + /** + * Indexes the document in both the control client and the specified client asserting + * that the respones are equivilent + */ + protected UpdateResponse indexDoc(SolrServer server, SolrParams params, SolrInputDocument... sdocs) throws IOException, SolrServerException { + UpdateResponse controlRsp = add(controlClient, params, sdocs); + UpdateResponse specificRsp = add(server, params, sdocs); + compareSolrResponses(specificRsp, controlRsp); + return specificRsp; + } + protected UpdateResponse add(SolrServer server, SolrParams params, SolrInputDocument... sdocs) throws IOException, SolrServerException { UpdateRequest ureq = new UpdateRequest(); ureq.setParams(new ModifiableSolrParams(params)); @@ -546,6 +561,9 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 { } public QueryResponse queryAndCompare(SolrParams params, SolrServer... servers) throws SolrServerException { + return queryAndCompare(params, Arrays.asList(servers)); + } + public QueryResponse queryAndCompare(SolrParams params, Iterable servers) throws SolrServerException { QueryResponse first = null; for (SolrServer server : servers) { QueryResponse rsp = server.query(new ModifiableSolrParams(params)); @@ -783,8 +801,14 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 { return null; } + protected void compareSolrResponses(SolrResponse a, SolrResponse b) { + String cmp = compare(a.getResponse(), b.getResponse(), flags, handle); + if (cmp != null) { + log.error("Mismatched responses:\n" + a + "\n" + b); + Assert.fail(cmp); + } + } protected void compareResponses(QueryResponse a, QueryResponse b) { - String cmp; if (System.getProperty("remove.version.field") != null) { // we don't care if one has a version and the other doesnt - // control vs distrib @@ -800,11 +824,7 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 { } } } - cmp = compare(a.getResponse(), b.getResponse(), flags, handle); - if (cmp != null) { - log.error("Mismatched responses:\n" + a + "\n" + b); - Assert.fail(cmp); - } + compareSolrResponses(a, b); } @Test diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 26a26820b18..c2db59feb0a 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -230,7 +230,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes protected CloudSolrServer createCloudClient(String defaultCollection) throws MalformedURLException { - CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress()); + CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); if (defaultCollection != null) server.setDefaultCollection(defaultCollection); server.getLbServer().getHttpClient().getParams() .setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000); @@ -803,13 +803,68 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes } } + /** + * Executes a query against each live and active replica of the specified shard + * and aserts that the results are identical. + * + * @see #queryAndCompare + */ + public QueryResponse queryAndCompareReplicas(SolrParams params, String shard) + throws Exception { + + ArrayList shardClients = new ArrayList(7); + + updateMappingsFromZk(jettys, clients); + ZkStateReader zkStateReader = cloudClient.getZkStateReader(); + List solrJetties = shardToJetty.get(shard); + assertNotNull("no jetties found for shard: " + shard, solrJetties); + + + for (CloudJettyRunner cjetty : solrJetties) { + ZkNodeProps props = cjetty.info; + String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP); + boolean active = props.getStr(ZkStateReader.STATE_PROP).equals(ZkStateReader.ACTIVE); + boolean live = zkStateReader.getClusterState().liveNodesContain(nodeName); + if (active && live) { + shardClients.add(cjetty.client.solrClient); + } + } + return queryAndCompare(params, shardClients); + } + + /** + * For each Shard, executes a query against each live and active replica of that shard + * and asserts that the results are identical for each replica of the same shard. + * Because results are not compared between replicas of different shards, this method + * should be safe for comparing the results of any query, even if it contains + * "distrib=false", because the replicas should all be identical. + * + * @see AbstractFullDistribZkTestBase#queryAndCompareReplicas(SolrParams, String) + */ + public void queryAndCompareShards(SolrParams params) throws Exception { + + updateMappingsFromZk(jettys, clients); + List shards = new ArrayList(shardToJetty.keySet()); + for (String shard : shards) { + queryAndCompareReplicas(params, shard); + } + } + + /** + * Returns a non-null string if replicas within the same shard do not have a + * consistent number of documents. + */ protected void checkShardConsistency(String shard) throws Exception { checkShardConsistency(shard, false, false); } - /* Returns a non-null string if replicas within the same shard are not consistent. - * If expectFailure==false, the exact differences found will be logged since this would be an unexpected failure. - * verbose causes extra debugging into to be displayed, even if everything is consistent. + /** + * Returns a non-null string if replicas within the same shard do not have a + * consistent number of documents. + * If expectFailure==false, the exact differences found will be logged since + * this would be an unexpected failure. + * verbose causes extra debugging into to be displayed, even if everything is + * consistent. */ protected String checkShardConsistency(String shard, boolean expectFailure, boolean verbose) throws Exception { @@ -1505,7 +1560,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes if (commondCloudSolrServer == null) { synchronized(this) { try { - commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress()); + commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION); commondCloudSolrServer.connect(); } catch (MalformedURLException e) {