SOLR-10285: Skip LEADER messages when there are leader only shards

This commit is contained in:
Cao Manh Dat 2017-10-03 09:00:02 +07:00
parent 5b3a5152bd
commit fd2b4f3f86
3 changed files with 112 additions and 12 deletions

View File

@ -171,6 +171,8 @@ Optimizations
* SOLR-8344: Decide default when requested fields are both column and row stored. (Cao Manh Dat, David Smiley)
* SOLR-10285: Skip LEADER messages when there are leader only shards (Cao Manh Dat, Joshua Humphries)
Other Changes
----------------------

View File

@ -115,6 +115,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
protected ZkStateReader zkStateReader;
private Integer leaderZkNodeParentVersion;
// Prevents a race between cancelling and becoming leader.
@ -128,6 +129,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
collection, shardId), props, zkStateReader.getZkClient());
this.leaderElector = leaderElector;
this.zkClient = zkStateReader.getZkClient();
this.zkStateReader = zkStateReader;
this.shardId = shardId;
this.collection = collection;
}
@ -216,14 +218,24 @@ class ShardLeaderElectionContextBase extends ElectionContext {
}
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP,
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
boolean isAlreadyLeader = false;
if (zkStateReader.getClusterState() != null &&
zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) {
Replica leader = zkStateReader.getLeader(collection, shardId);
if (leader != null
&& leader.getBaseUrl().equals(leaderProps.get(ZkStateReader.BASE_URL_PROP))
&& leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) {
isAlreadyLeader = true;
}
}
if (!isAlreadyLeader) {
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.BASE_URL_PROP, leaderProps.get(ZkStateReader.BASE_URL_PROP),
ZkStateReader.CORE_NAME_PROP, leaderProps.get(ZkStateReader.CORE_NAME_PROP));
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
}
}
public LeaderElector getLeaderElector() {
@ -309,10 +321,12 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
// clear the leader in clusterstate
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
// Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
Overseer.getStateUpdateQueue(zkClient).offer(Utils.toJSON(m));
}
boolean allReplicasInLine = false;
if (!weAreReplacement) {

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.junit.BeforeClass;
public class TestShardsWithSingleReplica extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
configureCluster(3)
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
}
public void testSkipLeaderOperations() throws Exception {
String overseerLeader = getOverseerLeader();
List<JettySolrRunner> notOverseerNodes = cluster.getJettySolrRunners()
.stream()
.filter(solrRunner -> !solrRunner.getNodeName().equals(overseerLeader))
.collect(Collectors.toList());
String collection = "collection1";
CollectionAdminRequest
.createCollection(collection, 2, 1)
.setCreateNodeSet(notOverseerNodes
.stream()
.map(JettySolrRunner::getNodeName)
.collect(Collectors.joining(","))
)
.process(cluster.getSolrClient());
for (JettySolrRunner solrRunner : notOverseerNodes) {
cluster.stopJettySolrRunner(solrRunner);
}
waitForState("Expected empty liveNodes", collection,
(liveNodes, collectionState) -> liveNodes.size() == 1);
CollectionAdminResponse resp = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
for (JettySolrRunner solrRunner : notOverseerNodes) {
cluster.startJettySolrRunner(solrRunner);
}
waitForState("Expected 2x1 for collection: " + collection, collection,
clusterShape(2, 1));
CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
}
private int getNumLeaderOpeations(CollectionAdminResponse resp) {
return (int) resp.getResponse().findRecursive("overseer_operations", "leader", "requests");
}
private String getOverseerLeader() throws IOException, SolrServerException {
CollectionAdminResponse resp = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
return (String) resp.getResponse().get("leader");
}
}