SOLR-14342: Improve core loading order in SolrCloud.

Makes collections available sooner and reduces leaderVoteWait timeouts in large SolrCloud clusters.
This fixes a previous attempt to do this.
Fixes #1366
This commit is contained in:
David Smiley 2020-03-26 23:44:20 -04:00
parent d1601f6fdf
commit a0b0c710b5
4 changed files with 182 additions and 225 deletions

View File

@ -66,6 +66,9 @@ Improvements
* SOLR-14260: Make SolrJ ConnectionSocketFactory pluggable via SocketFactoryRegistryProvider setting on HttpClientUtil
(Andy Throgmorton via David Smiley)
* SOLR-14342: Load cores in an order that makes collections available sooner and reduces leaderVoteWait timeouts in
large SolrCloud clusters. (David Smiley)
Optimizations
---------------------
* SOLR-8306: Do not collect expand documents when expand.rows=0 (Marshall Sanders, Amelia Henderson)

View File

@ -755,12 +755,7 @@ public class CoreContainer {
final List<Future<SolrCore>> futures = new ArrayList<>();
try {
List<CoreDescriptor> cds = coresLocator.discover(this);
if (isZooKeeperAware()) {
//sort the cores if it is in SolrCloud. In standalone node the order does not matter
CoreSorter coreComparator = new CoreSorter().init(this);
cds = new ArrayList<>(cds);//make a copy
Collections.sort(cds, coreComparator::compare);
}
cds = CoreSorter.sortCores(this, cds);
checkForDuplicateCoreNames(cds);
status |= CORE_DISCOVERY_COMPLETE;

View File

@ -19,8 +19,10 @@ package org.apache.solr.core;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.solr.cloud.CloudDescriptor;
@ -37,12 +39,11 @@ import static java.util.stream.Collectors.toList;
* waiting for replicas in the current node. This helps in avoiding leaderVote timeouts
* happening in other nodes of the cluster
*/
public class CoreSorter {
Map<String, CountsForEachShard> shardsVsReplicaCounts = new LinkedHashMap<>();
CoreContainer cc;
public final class CoreSorter implements Comparator<CoreDescriptor> {
private static final CountsForEachShard zero = new CountsForEachShard(0, 0, 0);
public final static Comparator<CountsForEachShard> countsComparator = (c1, c2) -> {
static final Comparator<CountsForEachShard> countsComparator = (c1, c2) -> {
if (c1 == null) c1 = zero;//just to avoid NPE
if (c2 == null) c2 = zero;
if (c1.totalReplicasInDownNodes < c2.totalReplicasInDownNodes) {
@ -78,15 +79,24 @@ public class CoreSorter {
return 0;
};
public CoreSorter init(CoreContainer cc) {
this.cc = cc;
if (cc == null || !cc.isZooKeeperAware()) {
return this;
/** Primary entry-point to sort the cores. */
public static List<CoreDescriptor> sortCores(CoreContainer coreContainer, List<CoreDescriptor> descriptors) {
//sort the cores if it is in SolrCloud. In standalone mode the order does not matter
if (coreContainer.isZooKeeperAware()) {
return descriptors.stream()
.sorted(new CoreSorter().init(coreContainer, descriptors))
.collect(toList()); // new list
}
String myNodeName = getNodeName();
return descriptors;
}
private final Map<String, CountsForEachShard> shardsVsReplicaCounts = new HashMap<>();
CoreSorter init(CoreContainer cc, Collection<CoreDescriptor> coreDescriptors) {
String myNodeName = cc.getNodeConfig().getNodeName();
ClusterState state = cc.getZkController().getClusterState();
for (CloudDescriptor cloudDescriptor : getCloudDescriptors()) {
for (CoreDescriptor coreDescriptor : coreDescriptors) {
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
String coll = cloudDescriptor.getCollectionName();
String sliceName = getShardName(cloudDescriptor);
if (shardsVsReplicaCounts.containsKey(sliceName)) continue;
@ -110,7 +120,6 @@ public class CoreSorter {
}
public int compare(CoreDescriptor cd1, CoreDescriptor cd2) {
String s1 = getShardName(cd1.getCloudDescriptor());
String s2 = getShardName(cd2.getCloudDescriptor());
@ -121,7 +130,6 @@ public class CoreSorter {
return result == 0 ? s1.compareTo(s2) : result;
}
static class CountsForEachShard {
public int totalReplicasInDownNodes = 0, myReplicas = 0, totalReplicasInLiveNodes = 0;
@ -131,21 +139,26 @@ public class CoreSorter {
this.totalReplicasInLiveNodes = totalReplicasInLiveNodes;
}
public boolean equals(Object obj) {
if (obj instanceof CountsForEachShard) {
CountsForEachShard that = (CountsForEachShard) obj;
return that.totalReplicasInDownNodes == totalReplicasInDownNodes && that.myReplicas == myReplicas;
}
return false;
}
@Override
public String toString() {
return "down : " + totalReplicasInDownNodes + " , up : " + totalReplicasInLiveNodes + " my : " + myReplicas;
}
// for tests
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CountsForEachShard that = (CountsForEachShard) o;
return totalReplicasInDownNodes == that.totalReplicasInDownNodes &&
myReplicas == that.myReplicas &&
totalReplicasInLiveNodes == that.totalReplicasInLiveNodes;
}
@Override
public int hashCode() {
return Objects.hash(totalReplicasInDownNodes, myReplicas, totalReplicasInLiveNodes);
}
}
static String getShardName(CloudDescriptor cd) {
@ -156,14 +169,9 @@ public class CoreSorter {
+ cd.getShardId();
}
String getNodeName() {
return cc.getNodeConfig().getNodeName();
}
/**Return all replicas for a given collection+slice combo
*/
Collection<Replica> getReplicas(ClusterState cs, String coll, String slice) {
private Collection<Replica> getReplicas(ClusterState cs, String coll, String slice) {
DocCollection c = cs.getCollectionOrNull(coll);
if (c == null) return emptyList();
Slice s = c.getSlice(slice);
@ -171,15 +179,4 @@ public class CoreSorter {
return s.getReplicas();
}
/**return cloud descriptors for all cores in this node
*/
Collection<CloudDescriptor> getCloudDescriptors() {
return cc.getCores()
.stream()
.map((core) -> core.getCoreDescriptor().getCloudDescriptor())
.collect(toList());
}
}

View File

@ -18,221 +18,183 @@ package org.apache.solr.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.core.CoreSorter.CountsForEachShard;
import org.apache.solr.util.MockCoreContainer;
import org.junit.Test;
import static java.util.stream.Collectors.toList;
import static org.apache.solr.core.CoreSorter.getShardName;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CoreSorterTest extends SolrTestCaseJ4 {
Map<String, Boolean> nodes = new LinkedHashMap<>();
Set<String> liveNodes = new HashSet<>();
private static final List<CountsForEachShard> inputCounts = Arrays.asList(
// DOWN LIVE MY
new CountsForEachShard(1, 3, 1),
new CountsForEachShard(0, 3, 2),
new CountsForEachShard(0, 3, 3),
new CountsForEachShard(0, 3, 4),
new CountsForEachShard(1, 0, 2),
new CountsForEachShard(1, 0, 1),
new CountsForEachShard(2, 5, 1),
new CountsForEachShard(2, 4, 2),
new CountsForEachShard(2, 3, 3)
);
private static final List<CountsForEachShard> expectedCounts = Arrays.asList(
new CountsForEachShard(0, 3, 2),
new CountsForEachShard(0, 3, 3),
new CountsForEachShard(0, 3, 4),
new CountsForEachShard(1, 3, 1),
new CountsForEachShard(2, 5, 1),
new CountsForEachShard(2, 4, 2),
new CountsForEachShard(2, 3, 3),
new CountsForEachShard(1, 0, 1),
new CountsForEachShard(1, 0, 2)
);
@Test
public void testComparator() {
List<CountsForEachShard> l = new ArrayList<>();
// DOWN LIVE MY
l.add(new CountsForEachShard(1, 3, 1));
l.add(new CountsForEachShard(0, 3, 2));
l.add(new CountsForEachShard(0, 3, 3));
l.add(new CountsForEachShard(0, 3, 4));
l.add(new CountsForEachShard(1, 0, 2));
l.add(new CountsForEachShard(1, 0, 1));
l.add(new CountsForEachShard(2, 5, 1));
l.add(new CountsForEachShard(2, 4, 2));
l.add(new CountsForEachShard(2, 3, 3));
List<CountsForEachShard> expected = Arrays.asList(
new CountsForEachShard(0, 3, 2),
new CountsForEachShard(0, 3, 3),
new CountsForEachShard(0, 3, 4),
new CountsForEachShard(1, 3, 1),
new CountsForEachShard(2, 5, 1),
new CountsForEachShard(2, 4, 2),
new CountsForEachShard(2, 3, 3),
new CountsForEachShard(1, 0, 1),
new CountsForEachShard(1, 0, 2)
);
for (int i = 0; i < 10; i++) {
List<CountsForEachShard> copy = new ArrayList<>(l);
List<CountsForEachShard> copy = new ArrayList<>(inputCounts);
Collections.shuffle(copy, random());
Collections.sort(copy, CoreSorter.countsComparator);
for (int j = 0; j < copy.size(); j++) {
assertEquals(expected.get(j), copy.get(j));
assertEquals(expectedCounts.get(j), copy.get(j));
}
}
}
public void testSort() throws Exception {
CoreContainer mockCC = getMockContainer();
MockCoreSorter coreSorter = (MockCoreSorter) new MockCoreSorter().init(mockCC);
List<CoreDescriptor> copy = new ArrayList<>(coreSorter.getLocalCores());
Collections.sort(copy, coreSorter::compare);
List<CountsForEachShard> l = copy.stream()
.map(CoreDescriptor::getCloudDescriptor)
.map(it -> coreSorter.shardsVsReplicaCounts.get(getShardName(it)))
.collect(toList());
for (int i = 1; i < l.size(); i++) {
CountsForEachShard curr = l.get(i);
CountsForEachShard prev = l.get(i-1);
assertTrue(CoreSorter.countsComparator.compare(prev, curr) < 1);
}
for (CountsForEachShard c : l) {
System.out.println(c);
}
}
private CoreContainer getMockContainer() {
@Test
public void integrationTest() {
assumeWorkingMockito();
List<CountsForEachShard> perShardCounts = new ArrayList<>(inputCounts);
Collections.shuffle(perShardCounts, random());
// compute nodes, some live, some down
final int maxNodesOfAType = perShardCounts.stream() // not too important how many we have, but lets have plenty
.mapToInt(c -> c.totalReplicasInLiveNodes + c.totalReplicasInDownNodes + c.myReplicas).max().getAsInt();
List<String> liveNodes = IntStream.range(0, maxNodesOfAType).mapToObj(i -> "192.168.0." + i + "_8983").collect(Collectors.toList());
Collections.shuffle(liveNodes, random());
String thisNode = liveNodes.get(0);
List<String> otherLiveNodes = liveNodes.subList(1, liveNodes.size());
List<String> downNodes = IntStream.range(0, maxNodesOfAType).mapToObj(i -> "192.168.1." + i + "_8983").collect(Collectors.toList());
// divide into two collections
int numCol1 = random().nextInt(perShardCounts.size());
Map<String,List<CountsForEachShard>> collToCounts = new HashMap<>();
collToCounts.put("col1", perShardCounts.subList(0, numCol1));
collToCounts.put("col2", perShardCounts.subList(numCol1, perShardCounts.size()));
Map<String,DocCollection> collToState = new HashMap<>();
Map<CountsForEachShard, List<CoreDescriptor>> myCountsToDescs = new HashMap<>();
for (Map.Entry<String, List<CountsForEachShard>> entry : collToCounts.entrySet()) {
String collection = entry.getKey();
List<CountsForEachShard> collCounts = entry.getValue();
Map<String, Slice> sliceMap = new HashMap<>(collCounts.size());
for (CountsForEachShard shardCounts : collCounts) {
String slice = "s" + shardCounts.hashCode();
List<Replica> replicas = new ArrayList<>();
for (int myRepNum = 0; myRepNum < shardCounts.myReplicas; myRepNum++) {
addNewReplica(replicas, collection, slice, Collections.singletonList(thisNode));
// save this mapping for later
myCountsToDescs.put(shardCounts, replicas.stream().map(this::newCoreDescriptor).collect(Collectors.toList()));
}
for (int myRepNum = 0; myRepNum < shardCounts.totalReplicasInLiveNodes; myRepNum++) {
addNewReplica(replicas, collection, slice, otherLiveNodes);
}
for (int myRepNum = 0; myRepNum < shardCounts.totalReplicasInDownNodes; myRepNum++) {
addNewReplica(replicas, collection, slice, downNodes);
}
Map<String, Replica> replicaMap = replicas.stream().collect(Collectors.toMap(Replica::getName, Function.identity()));
sliceMap.put(slice, new Slice(slice, replicaMap, map(), collection));
}
DocCollection col = new DocCollection(collection, sliceMap, map(), DocRouter.DEFAULT);
collToState.put(collection, col);
}
// reverse map
Map<CoreDescriptor, CountsForEachShard> myDescsToCounts = new HashMap<>();
for (Map.Entry<CountsForEachShard, List<CoreDescriptor>> entry : myCountsToDescs.entrySet()) {
for (CoreDescriptor descriptor : entry.getValue()) {
CountsForEachShard prev = myDescsToCounts.put(descriptor, entry.getKey());
assert prev == null; // sanity check
}
}
assert myCountsToDescs.size() == perShardCounts.size(); // just a sanity check
CoreContainer mockCC = mock(CoreContainer.class);
ZkController mockZKC = mock(ZkController.class);
ClusterState mockClusterState = mock(ClusterState.class);
when(mockCC.isZooKeeperAware()).thenReturn(true);
when(mockCC.getZkController()).thenReturn(mockZKC);
when(mockClusterState.getLiveNodes()).thenReturn(liveNodes);
when(mockZKC.getClusterState()).thenReturn(mockClusterState);
return mockCC;
}
{
when(mockCC.isZooKeeperAware()).thenReturn(true);
static class ReplicaInfo {
final int coll, slice, replica;
final String replicaName;
CloudDescriptor cd;
ReplicaInfo(int coll, int slice, int replica) {
this.coll = coll;
this.slice = slice;
this.replica = replica;
replicaName = "coll_" + coll + "_" + slice + "_" + replica;
Properties p = new Properties();
p.setProperty(CoreDescriptor.CORE_SHARD, "shard_" + slice);
p.setProperty(CoreDescriptor.CORE_COLLECTION, "coll_" + slice);
p.setProperty(CoreDescriptor.CORE_NODE_NAME, replicaName);
cd = new CloudDescriptor(null, replicaName, p);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ReplicaInfo) {
ReplicaInfo replicaInfo = (ReplicaInfo) obj;
return replicaInfo.replicaName.equals(replicaName);
}
return false;
}
@Override
public int hashCode() {
return replicaName.hashCode();
}
CloudDescriptor getCloudDescriptor() {
return cd;
}
public Replica getReplica(String node) {
return new Replica(replicaName, Utils.makeMap("core", replicaName, "node_name", node), cd.getCollectionName(), cd.getShardId());
}
public boolean equals(String coll, String slice) {
return cd.getCollectionName().equals(coll) && slice.equals(cd.getShardId());
}
}
class MockCoreSorter extends CoreSorter {
int numColls = 1 + random().nextInt(3);
int numReplicas = 2 + random().nextInt(2);
int numShards = 50 + random().nextInt(10);
String myNodeName;
Collection<CloudDescriptor> myCores = new ArrayList<>();
List<CoreDescriptor> localCores = new ArrayList<>();
Map<ReplicaInfo, String> replicaPositions = new LinkedHashMap<>();//replicaname vs. nodename
public MockCoreSorter() {
int totalNodes = 50 + random().nextInt(10);
int myNode = random().nextInt(totalNodes);
List<String> nodeNames = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) {
String s = "192.168.1." + i + ":8983_solr";
if (i == myNode) myNodeName = s;
boolean on = random().nextInt(100) < 70;
nodes.put(s,
on);//70% chance that the node is up;
nodeNames.add(s);
if(on) liveNodes.add(s);
}
for (int i = 0; i < numColls; i++) {
for (int j = 0; j < numShards; j++) {
for (int k = 0; k < numReplicas; k++) {
ReplicaInfo ri = new ReplicaInfo(i, j, k);
replicaPositions.put(ri, nodeNames.get(random().nextInt(totalNodes)));
ZkController mockZKC = mock(ZkController.class);
when(mockCC.getZkController()).thenReturn(mockZKC);
{
ClusterState mockClusterState = mock(ClusterState.class);
when(mockZKC.getClusterState()).thenReturn(mockClusterState);
{
when(mockClusterState.getLiveNodes()).thenReturn(new HashSet<>(liveNodes));
for (Map.Entry<String, DocCollection> entry : collToState.entrySet()) {
when(mockClusterState.getCollectionOrNull(entry.getKey())).thenReturn(entry.getValue());
}
}
}
for (Map.Entry<ReplicaInfo, String> e : replicaPositions.entrySet()) {
if (e.getValue().equals(myNodeName)) {
myCores.add(e.getKey().getCloudDescriptor());
localCores.add(new MockCoreContainer.MockCoreDescriptor() {
@Override
public CloudDescriptor getCloudDescriptor() {
return e.getKey().getCloudDescriptor();
}
});
NodeConfig mockNodeConfig = mock(NodeConfig.class);
when(mockNodeConfig.getNodeName()).thenReturn(thisNode);
when(mockCC.getNodeConfig()).thenReturn(mockNodeConfig);
}
List<CoreDescriptor> myDescs = new ArrayList<>(myDescsToCounts.keySet());
for (int i = 0; i < 10; i++) {
Collections.shuffle(myDescs, random());
List<CoreDescriptor> resultDescs = CoreSorter.sortCores(mockCC, myDescs);
// map descriptors back to counts, removing consecutive duplicates
List<CountsForEachShard> resultCounts = new ArrayList<>();
CountsForEachShard lastCounts = null;
for (CoreDescriptor resultDesc : resultDescs) {
CountsForEachShard counts = myDescsToCounts.get(resultDesc);
if (counts != lastCounts) {
resultCounts.add(counts);
}
lastCounts = counts;
}
}
@Override
String getNodeName() {
return myNodeName;
}
@Override
Collection<CloudDescriptor> getCloudDescriptors() {
return myCores;
}
public List<CoreDescriptor> getLocalCores() {
return localCores;
}
@Override
Collection<Replica> getReplicas(ClusterState cs, String coll, String slice) {
List<Replica> r = new ArrayList<>();
for (Map.Entry<ReplicaInfo, String> e : replicaPositions.entrySet()) {
if (e.getKey().equals(coll, slice)) {
r.add(e.getKey().getReplica(e.getValue()));
}
}
return r;
assertEquals(expectedCounts, resultCounts);
}
}
private CoreDescriptor newCoreDescriptor(Replica r) {
Map<String,String> props = map(
CoreDescriptor.CORE_SHARD, r.getSlice(),
CoreDescriptor.CORE_COLLECTION, r.getCollection(),
CoreDescriptor.CORE_NODE_NAME, r.getNodeName()
);
return new CoreDescriptor(r.getCoreName(), TEST_PATH(), props , null, mock(ZkController.class));
}
protected Replica addNewReplica(List<Replica> replicaList, String collection, String slice, List<String> possibleNodes) {
String replica = "r" + replicaList.size();
String node = possibleNodes.get(random().nextInt(possibleNodes.size())); // place on a random node
Replica r = new Replica(replica, map("core", replica, "node_name", node), collection, slice);
replicaList.add(r);
return r;
}
}