SOLR-9736: Solr resolves the collection name against the first available leader or first replica of the first slice

This commit is contained in:
Shalin Shekhar Mangar 2016-11-15 14:18:43 +05:30
parent b57a5e41f8
commit 0d290ae136
4 changed files with 268 additions and 32 deletions

View File

@ -152,6 +152,10 @@ Bug Fixes
* SOLR-9751: PreAnalyzedField can cause managed schema corruption. (Steve Rowe)
* SOLR-9736: Solr resolves the collection name against the first available leader or first replica
of the first slice. This puts undue pressure on leader cores and likely on the wrong ones. This is
fixed to randomly pick a leader on updates or a replica core otherwise. (Cao Manh Dat via shalin)
Other Changes
----------------------

View File

@ -191,7 +191,7 @@ public class HttpSolrCall {
return queryParams;
}
private void init() throws Exception {
void init() throws Exception {
//The states of client that is invalid in this request
Aliases aliases = null;
String corename = "";
@ -271,7 +271,11 @@ public class HttpSolrCall {
if (core == null && cores.isZooKeeperAware()) {
// we couldn't find the core - lets make sure a collection was not specified instead
core = getCoreByCollection(corename);
boolean isPreferLeader = false;
if (path.endsWith("/update") || path.contains("/update/")) {
isPreferLeader = true;
}
core = getCoreByCollection(corename, isPreferLeader);
if (core != null) {
// we found a core, update the path
path = path.substring(idx);
@ -753,7 +757,7 @@ public class HttpSolrCall {
return result;
}
private SolrCore getCoreByCollection(String collectionName) {
private SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) {
ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
@ -761,37 +765,27 @@ public class HttpSolrCall {
if (collection == null) {
return null;
}
Map<String, Slice> slices = collection.getActiveSlicesMap();
if (slices == null) {
return null;
}
Set<String> liveNodes = clusterState.getLiveNodes();
// look for a core on this node
Set<Map.Entry<String, Slice>> entries = slices.entrySet();
SolrCore core = null;
//Hitting the leaders is useful when it's an update request.
//For queries it doesn't matter and hence we don't distinguish here.
for (Map.Entry<String, Slice> entry : entries) {
// first see if we have the leader
Replica leaderProps = collection.getLeader(entry.getKey());
if (leaderProps != null && liveNodes.contains(leaderProps.getNodeName()) && leaderProps.getState() == Replica.State.ACTIVE) {
core = checkProps(leaderProps);
if (core != null) {
return core;
}
}
if (isPreferLeader) {
List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
SolrCore core = randomlyGetSolrCore(liveNodes, leaderReplicas);
if (core != null) return core;
}
// check everyone then
Map<String, Replica> shards = entry.getValue().getReplicasMap();
Set<Map.Entry<String, Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String, Replica> shardEntry : shardEntries) {
Replica zkProps = shardEntry.getValue();
if (liveNodes.contains(zkProps.getNodeName()) && zkProps.getState() == Replica.State.ACTIVE) {
core = checkProps(zkProps);
if (core != null) {
return core;
}
List<Replica> replicas = collection.getReplicas(cores.getZkController().getNodeName());
return randomlyGetSolrCore(liveNodes, replicas);
}
private SolrCore randomlyGetSolrCore(Set<String> liveNodes, List<Replica> replicas) {
if (replicas != null) {
RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
while (it.hasNext()) {
Replica replica = it.next();
if (liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
SolrCore core = checkProps(replica);
if (core != null) return core;
}
}
}
@ -1027,4 +1021,35 @@ public class HttpSolrCall {
static final String CONNECTION_HEADER = "Connection";
static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
static final String CONTENT_LENGTH_HEADER = "Content-Length";
/**
* A faster method for randomly picking items when you do not need to
* consume all items.
*/
private static class RandomIterator<E> implements Iterator<E> {
private Random rand;
private ArrayList<E> elements;
private int size;
public RandomIterator(Random rand, Collection<E> elements) {
this.rand = rand;
this.elements = new ArrayList<>(elements);
this.size = elements.size();
}
@Override
public boolean hasNext() {
return size > 0;
}
@Override
public E next() {
int idx = rand.nextInt(size);
E e1 = elements.get(idx);
E e2 = elements.get(size-1);
elements.set(idx,e2);
size--;
return e1;
}
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.servlet;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.junit.BeforeClass;
import org.junit.Test;
public class HttpSolrCallGetCoreTest extends SolrCloudTestCase {
private static final String COLLECTION = "collection1";
private static final int NUM_SHARD = 3;
private static final int REPLICA_FACTOR = 2;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
CollectionAdminRequest
.createCollection(COLLECTION, "config", NUM_SHARD, REPLICA_FACTOR)
.setMaxShardsPerNode(NUM_SHARD * REPLICA_FACTOR)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
}
@Test
public void test() throws Exception {
assertCoreChosen(NUM_SHARD, new TestRequest("/collection1/update"));
assertCoreChosen(NUM_SHARD, new TestRequest("/collection1/update/json"));
assertCoreChosen(NUM_SHARD * REPLICA_FACTOR, new TestRequest("/collection1/select"));
}
private void assertCoreChosen(int numCores, TestRequest testRequest) {
JettySolrRunner jettySolrRunner = cluster.getJettySolrRunner(0);
Set<String> coreNames = new HashSet<>();
SolrDispatchFilter dispatchFilter = jettySolrRunner.getSolrDispatchFilter();
for (int i = 0; i < NUM_SHARD * REPLICA_FACTOR * 20; i++) {
if (coreNames.size() == numCores) return;
HttpSolrCall httpSolrCall = new HttpSolrCall(dispatchFilter, dispatchFilter.getCores(), testRequest, new TestResponse(), false);
try {
httpSolrCall.init();
} catch (Exception e) {
} finally {
coreNames.add(httpSolrCall.core.getName());
httpSolrCall.destroy();
}
}
assertEquals(numCores, coreNames.size());
}
private static class TestResponse extends Response {
public TestResponse() {
super(null, null);
}
@Override
public ServletOutputStream getOutputStream() throws IOException {
return new ServletOutputStream() {
@Override
public boolean isReady() {
return true;
}
@Override
public void setWriteListener(WriteListener writeListener) {
}
@Override
public void write(int b) throws IOException {
}
};
}
@Override
public boolean isCommitted() {
return true;
}
}
private static class TestRequest extends Request {
private String path;
public TestRequest(String path) {
super(null, null);
this.path = path;
}
@Override
public String getQueryString() {
return "wt=json&version=2";
}
@Override
public String getContentType() {
return "application/json";
}
@Override
public String getServletPath() {
return path;
}
@Override
public String getRequestURI() {
return path;
}
@Override
public ServletInputStream getInputStream() throws IOException {
return new ServletInputStream() {
@Override
public boolean isFinished() {
return true;
}
@Override
public boolean isReady() {
return true;
}
@Override
public void setReadListener(ReadListener readListener) {
}
@Override
public int read() throws IOException {
return 0;
}
};
}
}
}

View File

@ -51,6 +51,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
private final Map<String, List<Replica>> nodeNameReplicas;
private final Map<String, List<Replica>> nodeNameLeaderReplicas;
private final DocRouter router;
private final String znode;
@ -76,6 +78,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
this.slices = slices;
this.activeSlices = new HashMap<>();
this.nodeNameLeaderReplicas = new HashMap<>();
this.nodeNameReplicas = new HashMap<>();
this.replicationFactor = (Integer) verifyProp(props, REPLICATION_FACTOR);
this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
@ -86,14 +90,36 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
while (iter.hasNext()) {
Map.Entry<String, Slice> slice = iter.next();
if (slice.getValue().getState() == Slice.State.ACTIVE)
if (slice.getValue().getState() == Slice.State.ACTIVE) {
this.activeSlices.put(slice.getKey(), slice.getValue());
}
for (Replica replica : slice.getValue()) {
addNodeNameReplica(replica);
}
}
this.router = router;
this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
assert name != null && slices != null;
}
private void addNodeNameReplica(Replica replica) {
List<Replica> replicas = nodeNameReplicas.get(replica.getNodeName());
if (replicas == null) {
replicas = new ArrayList<>();
nodeNameReplicas.put(replica.getNodeName(), replicas);
}
replicas.add(replica);
if (replica.getStr(Slice.LEADER) != null) {
List<Replica> leaderReplicas = nodeNameLeaderReplicas.get(replica.getNodeName());
if (leaderReplicas == null) {
leaderReplicas = new ArrayList<>();
nodeNameLeaderReplicas.put(replica.getNodeName(), leaderReplicas);
}
leaderReplicas.add(replica);
}
}
public static Object verifyProp(Map<String, Object> props, String propName) {
Object o = props.get(propName);
if (o == null) return null;
@ -160,6 +186,20 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return activeSlices;
}
/**
* Get the list of replicas hosted on the given node or <code>null</code> if none.
*/
public List<Replica> getReplicas(String nodeName) {
return nodeNameReplicas.get(nodeName);
}
/**
* Get the list of all leaders hosted on the given node or <code>null</code> if none.
*/
public List<Replica> getLeaderReplicas(String nodeName) {
return nodeNameLeaderReplicas.get(nodeName);
}
public int getZNodeVersion(){
return znodeVersion;
}