diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index af249be75c2..d876a25624b 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -147,6 +147,9 @@ Bug Fixes * SOLR-9287: Including 'score' in the 'fl' param when doing an RTG no longer causes an NPE (hossman, Ishan Chattopadhyaya) +* SOLR-7280: In cloud-mode sort the cores smartly before loading & limit threads to improve cluster stability + (noble, Erick Erickson, shalin) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 3e4cbe50b44..f613141f17d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -115,7 +115,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; *

* TODO: exceptions during close on attempts to update cloud state */ -public final class ZkController { +public class ZkController { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); static final int WAIT_DOWN_STATES_TIMEOUT_SECONDS = 60; diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index cd05bbd3e05..21f495ccb05 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -25,6 +25,8 @@ import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH; import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH; import static org.apache.solr.common.params.CommonParams.ZK_PATH; +import static org.apache.solr.core.NodeConfig.NodeConfigBuilder.DEFAULT_CORE_LOAD_THREADS; +import static org.apache.solr.core.NodeConfig.NodeConfigBuilder.DEFAULT_CORE_LOAD_THREADS_IN_CLOUD; import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP; import java.io.IOException; @@ -33,6 +35,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -349,7 +353,7 @@ public class CoreContainer { } if (builder.getAuthSchemeRegistryProvider() != null) { httpClientBuilder.setAuthSchemeRegistryProvider(new AuthSchemeRegistryProvider() { - + @Override public Lookup getAuthSchemeRegistry() { return builder.getAuthSchemeRegistryProvider().getAuthSchemeRegistry(); @@ -485,17 +489,20 @@ public class CoreContainer { containerProperties.putAll(cfg.getSolrProperties()); // setup executor to load cores in parallel - // do not limit the size of the executor in zk mode since cores may try and wait for each other. ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool( - ( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ), + cfg.getCoreLoadThreadCount(isZooKeeperAware() ? DEFAULT_CORE_LOAD_THREADS_IN_CLOUD : DEFAULT_CORE_LOAD_THREADS), new DefaultSolrThreadFactory("coreLoadExecutor") ); - final List> futures = new ArrayList>(); + final List> futures = new ArrayList<>(); try { - List cds = coresLocator.discover(this); + if (isZooKeeperAware()) { + //sort the cores if it is in SolrCloud. In standalone node the order does not matter + CoreSorter coreComparator = new CoreSorter().init(this); + cds = new ArrayList<>(cds);//make a copy + Collections.sort(cds, coreComparator::compare); + } checkForDuplicateCoreNames(cds); - for (final CoreDescriptor cd : cds) { if (cd.isTransient() || !cd.isLoadOnStartup()) { solrCores.putDynamicDescriptor(cd.getName(), cd); @@ -1258,6 +1265,10 @@ public class CoreContainer { return authenticationPlugin == null ? null : authenticationPlugin.plugin; } + public NodeConfig getNodeConfig() { + return cfg; + } + } class CloserThread extends Thread { diff --git a/solr/core/src/java/org/apache/solr/core/CoreSorter.java b/solr/core/src/java/org/apache/solr/core/CoreSorter.java new file mode 100644 index 00000000000..807400975d2 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/CoreSorter.java @@ -0,0 +1,186 @@ +package org.apache.solr.core; + +import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; + +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This is a utility class that sorts cores in such a way as to minimize other cores + * waiting for replicas in the current node. This helps in avoiding leaderVote timeouts + * happening in other nodes of the cluster + * + */ +public class CoreSorter { + Map shardsVsReplicaCounts = new LinkedHashMap<>(); + CoreContainer cc; + private static final CountsForEachShard zero = new CountsForEachShard(0, 0, 0); + + public final static Comparator countsComparator = (c1, c2) -> { + if (c1 == null) c1 = zero;//just to avoid NPE + if (c2 == null) c2 = zero; + if (c1.totalReplicasInDownNodes < c2.totalReplicasInDownNodes) { + //Prioritize replicas with least no:of down nodes waiting. + //It's better to bring up a node that is a member of a shard + //with 0 down nodes than 1 down node because it will make the shard + // complete earlier and avoid waiting by the other live nodes + if (c1.totalReplicasInLiveNodes > 0) { + //means nobody else is waiting for this , so no need to prioritize + return -1; + } + } + if (c2.totalReplicasInDownNodes < c1.totalReplicasInDownNodes) { + //same is the above, just to take care of the case where c2 has to be prioritized + if (c2.totalReplicasInLiveNodes > 0) { + //means nobody else is waiting for this , so no need to priotitize + return 1; + } + } + + //Prioritize replicas where most no:of other nodes are waiting for + // For example if 1 other replicas are waiting for this replica, then + // prioritize that over the replica were zero other nodes are waiting + if (c1.totalReplicasInLiveNodes > c2.totalReplicasInLiveNodes) return -1; + if (c2.totalReplicasInLiveNodes > c1.totalReplicasInLiveNodes) return 1; + + //If all else is same. prioritize fewer replicas I have because that will complete the + //quorum for shard faster. If I have only one replica for a shard I can finish it faster + // than a shard with 2 replicas in this node + if (c1.myReplicas < c2.myReplicas) return -1; + if (c2.myReplicas < c1.myReplicas) return 1; + //if everything is same return 0 + return 0; + }; + + + public CoreSorter init(CoreContainer cc) { + this.cc = cc; + if (cc == null || !cc.isZooKeeperAware()) { + return this; + } + String myNodeName = getNodeName(); + ClusterState state = cc.getZkController().getClusterState(); + for (CloudDescriptor cloudDescriptor : getCloudDescriptors()) { + String coll = cloudDescriptor.getCollectionName(); + String sliceName = getShardName(cloudDescriptor); + if (shardsVsReplicaCounts.containsKey(sliceName)) continue; + CountsForEachShard c = new CountsForEachShard(0, 0, 0); + for (Replica replica : getReplicas(state, coll, cloudDescriptor.getShardId())) { + if (replica.getNodeName().equals(myNodeName)) { + c.myReplicas++; + } else { + Set liveNodes = state.getLiveNodes(); + if (liveNodes.contains(replica.getNodeName())) { + c.totalReplicasInLiveNodes++; + } else { + c.totalReplicasInDownNodes++; + } + } + } + shardsVsReplicaCounts.put(sliceName, c); + } + + return this; + + } + + + public int compare(CoreDescriptor cd1, CoreDescriptor cd2) { + String s1 = getShardName(cd1.getCloudDescriptor()); + String s2 = getShardName(cd2.getCloudDescriptor()); + if (s1 == null || s2 == null) return cd1.getName().compareTo(cd2.getName()); + CountsForEachShard c1 = shardsVsReplicaCounts.get(s1); + CountsForEachShard c2 = shardsVsReplicaCounts.get(s2); + int result = countsComparator.compare(c1, c2); + return result == 0 ? s1.compareTo(s2) : result; + } + + + static class CountsForEachShard { + public int totalReplicasInDownNodes = 0, myReplicas = 0, totalReplicasInLiveNodes = 0; + + public CountsForEachShard(int totalReplicasInDownNodes, int totalReplicasInLiveNodes,int myReplicas) { + this.totalReplicasInDownNodes = totalReplicasInDownNodes; + this.myReplicas = myReplicas; + this.totalReplicasInLiveNodes = totalReplicasInLiveNodes; + } + + public boolean equals(Object obj) { + if (obj instanceof CountsForEachShard) { + CountsForEachShard that = (CountsForEachShard) obj; + return that.totalReplicasInDownNodes == totalReplicasInDownNodes && that.myReplicas == myReplicas; + + } + return false; + } + + @Override + public String toString() { + return "down : " + totalReplicasInDownNodes + " , up : " + totalReplicasInLiveNodes + " my : " + myReplicas; + } + + + } + + static String getShardName(CloudDescriptor cd) { + return cd == null ? + null : + cd.getCollectionName() + + "_" + + cd.getShardId(); + } + + + String getNodeName() { + return cc.getNodeConfig().getNodeName(); + } + + /**Return all replicas for a given collection+slice combo + */ + Collection getReplicas(ClusterState cs, String coll, String slice) { + DocCollection c = cs.getCollectionOrNull(coll); + if (c == null) return emptyList(); + Slice s = c.getSlice(slice); + if (s == null) return emptyList(); + return s.getReplicas(); + } + + + /**return cloud descriptors for all cores in this node + */ + Collection getCloudDescriptors() { + return cc.getCores() + .stream() + .map((core) -> core.getCoreDescriptor().getCloudDescriptor()) + .collect(toList()); + } + + +} diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java index e72fbc92cc9..3db453bc861 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java +++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java @@ -50,7 +50,7 @@ public class NodeConfig { private final CloudConfig cloudConfig; - private final int coreLoadThreads; + private final Integer coreLoadThreads; private final int transientCacheSize; @@ -64,7 +64,7 @@ public class NodeConfig { PluginInfo shardHandlerFactoryConfig, UpdateShardHandlerConfig updateShardHandlerConfig, String coreAdminHandlerClass, String collectionsAdminHandlerClass, String infoHandlerClass, String configSetsHandlerClass, - LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, int coreLoadThreads, + LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads, int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader, Properties solrProperties, PluginInfo[] backupRepositoryPlugins) { this.nodeName = nodeName; @@ -87,7 +87,7 @@ public class NodeConfig { this.solrProperties = solrProperties; this.backupRepositoryPlugins = backupRepositoryPlugins; - if (this.cloudConfig != null && this.coreLoadThreads < 2) { + if (this.cloudConfig != null && this.getCoreLoadThreadCount(NodeConfigBuilder.DEFAULT_CORE_LOAD_THREADS) < 2) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCloud requires a value of at least 2 for coreLoadThreads (configured value = " + this.coreLoadThreads + ")"); } @@ -109,8 +109,8 @@ public class NodeConfig { return updateShardHandlerConfig; } - public int getCoreLoadThreadCount() { - return coreLoadThreads; + public int getCoreLoadThreadCount(int def) { + return coreLoadThreads == null ? def : coreLoadThreads; } public String getSharedLibDirectory() { @@ -185,7 +185,7 @@ public class NodeConfig { private String configSetsHandlerClass = DEFAULT_CONFIGSETSHANDLERCLASS; private LogWatcherConfig logWatcherConfig = new LogWatcherConfig(true, null, null, 50); private CloudConfig cloudConfig; - private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS; + private Integer coreLoadThreads; private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE; private boolean useSchemaCache = false; private String managementPath; @@ -195,7 +195,9 @@ public class NodeConfig { private final SolrResourceLoader loader; private final String nodeName; - private static final int DEFAULT_CORE_LOAD_THREADS = 3; + public static final int DEFAULT_CORE_LOAD_THREADS = 3; + //No:of core load threads in cloud mode is set to a default of 24 + public static final int DEFAULT_CORE_LOAD_THREADS_IN_CLOUD = 24; private static final int DEFAULT_TRANSIENT_CACHE_SIZE = Integer.MAX_VALUE; diff --git a/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java b/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java new file mode 100644 index 00000000000..5b550bf9f00 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java @@ -0,0 +1,246 @@ +package org.apache.solr.core; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.ZkController; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreSorter.CountsForEachShard; +import org.apache.solr.util.MockCoreContainer; + +import static java.util.stream.Collectors.toList; +import static org.apache.solr.core.CoreSorter.getShardName; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; + + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class CoreSorterTest extends SolrTestCaseJ4 { + Map nodes = new LinkedHashMap<>(); + Set liveNodes = new HashSet<>(); + + public void testComparator() { + List l = new ArrayList<>(); + // DN LIV MY + l.add(new CountsForEachShard(1, 3, 1)); + l.add(new CountsForEachShard(0, 3, 2)); + l.add(new CountsForEachShard(0, 3, 3)); + l.add(new CountsForEachShard(0, 3, 4)); + l.add(new CountsForEachShard(1, 0, 2)); + l.add(new CountsForEachShard(1, 0, 1)); + l.add(new CountsForEachShard(2, 5, 1)); + l.add(new CountsForEachShard(2, 4, 2)); + l.add(new CountsForEachShard(2, 3, 3)); + + List expected = Arrays.asList( + new CountsForEachShard(0, 3, 2), + new CountsForEachShard(0, 3, 3), + new CountsForEachShard(0, 3, 4), + new CountsForEachShard(1, 3, 1), + new CountsForEachShard(2, 5, 1), + new CountsForEachShard(2, 4, 2), + new CountsForEachShard(2, 3, 3), + new CountsForEachShard(1, 0, 1), + new CountsForEachShard(1, 0, 2) + + ); + + for (int i = 0; i < 10; i++) { + List copy = new ArrayList<>(l); + Collections.shuffle(copy); + Collections.sort(copy, CoreSorter.countsComparator); + for (int j = 0; j < copy.size(); j++) { + assertEquals(expected.get(j), copy.get(j)); + } + } + } + + public void testSort() throws Exception { + CoreContainer mockCC = getMockContainer(); + MockCoreSorter coreSorter = (MockCoreSorter) new MockCoreSorter().init(mockCC); + List copy = new ArrayList<>(coreSorter.getLocalCores()); + Collections.sort(copy, coreSorter::compare); + List l = copy.stream() + .map(CoreDescriptor::getCloudDescriptor) + .map(it -> coreSorter.shardsVsReplicaCounts.get(getShardName(it))) + .collect(toList()); + for (int i = 1; i < l.size(); i++) { + CountsForEachShard curr = l.get(i); + CountsForEachShard prev = l.get(i-1); + assertTrue(CoreSorter.countsComparator.compare(prev, curr) < 1); + } + + for (CountsForEachShard c : l) { + System.out.println(c); + } + } + + private CoreContainer getMockContainer() { + CoreContainer mockCC = createMock(CoreContainer.class); + ZkController mockZKC = createMock(ZkController.class); + ClusterState mockClusterState = createMock(ClusterState.class); + reset(mockCC, mockZKC, mockClusterState); + mockCC.isZooKeeperAware(); + expectLastCall().andAnswer(() -> Boolean.TRUE).anyTimes(); + mockCC.getZkController(); + expectLastCall().andAnswer(() -> mockZKC).anyTimes(); + mockClusterState.getLiveNodes(); + expectLastCall().andAnswer(() -> liveNodes).anyTimes(); + mockZKC.getClusterState(); + expectLastCall().andAnswer(() -> mockClusterState).anyTimes(); + replay(mockCC, mockZKC, mockClusterState); + return mockCC; + } + + static class ReplicaInfo { + final int coll, slice, replica; + final String replicaName; + CloudDescriptor cd; + + ReplicaInfo(int coll, int slice, int replica) { + this.coll = coll; + this.slice = slice; + this.replica = replica; + replicaName = "coll_" + coll + "_" + slice + "_" + replica; + Properties p = new Properties(); + p.setProperty(CoreDescriptor.CORE_SHARD, "shard_" + slice); + p.setProperty(CoreDescriptor.CORE_COLLECTION, "coll_" + slice); + p.setProperty(CoreDescriptor.CORE_NODE_NAME, replicaName); + cd = new CloudDescriptor(replicaName, p, null); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ReplicaInfo) { + ReplicaInfo replicaInfo = (ReplicaInfo) obj; + return replicaInfo.replicaName.equals(replicaName); + } + return false; + } + + + @Override + public int hashCode() { + return replicaName.hashCode(); + } + + CloudDescriptor getCloudDescriptor() { + return cd; + + } + + public Replica getReplica(String node) { + return new Replica(replicaName, Utils.makeMap("core", replicaName, "node_name", node)); + } + + public boolean equals(String coll, String slice) { + return cd.getCollectionName().equals(coll) && slice.equals(cd.getShardId()); + } + } + + + class MockCoreSorter extends CoreSorter { + int numColls = 1 + random().nextInt(3); + int numReplicas = 2 + random().nextInt(2); + int numShards = 50 + random().nextInt(10); + String myNodeName; + Collection myCores = new ArrayList<>(); + List localCores = new ArrayList<>(); + + Map replicaPositions = new LinkedHashMap<>();//replicaname vs. nodename + + public MockCoreSorter() { + int totalNodes = 50 + random().nextInt(10); + int myNode = random().nextInt(totalNodes); + List nodeNames = new ArrayList<>(); + for (int i = 0; i < totalNodes; i++) { + String s = "192.168.1." + i + ":8983_solr"; + if (i == myNode) myNodeName = s; + boolean on = random().nextInt(100) < 70; + nodes.put(s, + on);//70% chance that the node is up; + nodeNames.add(s); + if(on) liveNodes.add(s); + } + + for (int i = 0; i < numColls; i++) { + for (int j = 0; j < numShards; j++) { + for (int k = 0; k < numReplicas; k++) { + ReplicaInfo ri = new ReplicaInfo(i, j, k); + replicaPositions.put(ri, nodeNames.get(random().nextInt(totalNodes))); + } + } + } + + for (Map.Entry e : replicaPositions.entrySet()) { + if (e.getValue().equals(myNodeName)) { + myCores.add(e.getKey().getCloudDescriptor()); + localCores.add(new MockCoreContainer.MockCoreDescriptor() { + @Override + public CloudDescriptor getCloudDescriptor() { + return e.getKey().getCloudDescriptor(); + } + }); + } + } + } + + @Override + String getNodeName() { + return myNodeName; + } + + @Override + Collection getCloudDescriptors() { + return myCores; + + } + + public List getLocalCores() { + return localCores; + } + + @Override + Collection getReplicas(ClusterState cs, String coll, String slice) { + List r = new ArrayList<>(); + for (Map.Entry e : replicaPositions.entrySet()) { + if (e.getKey().equals(coll, slice)) { + r.add(e.getKey().getReplica(e.getValue())); + } + } + return r; + } + } + + +} diff --git a/solr/core/src/test/org/apache/solr/core/TestSolrXml.java b/solr/core/src/test/org/apache/solr/core/TestSolrXml.java index 4343efec7c6..e005c9fe3da 100644 --- a/solr/core/src/test/org/apache/solr/core/TestSolrXml.java +++ b/solr/core/src/test/org/apache/solr/core/TestSolrXml.java @@ -73,7 +73,7 @@ public class TestSolrXml extends SolrTestCaseJ4 { assertEquals("collection handler class", "testCollectionsHandler", cfg.getCollectionsHandlerClass()); assertEquals("info handler class", "testInfoHandler", cfg.getInfoHandlerClass()); assertEquals("config set handler class", "testConfigSetsHandler", cfg.getConfigSetsHandlerClass()); - assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount()); + assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount(0)); assertThat("core root dir", cfg.getCoreRootDirectory().toString(), containsString("testCoreRootDirectory")); assertEquals("distrib conn timeout", 22, cfg.getUpdateShardHandlerConfig().getDistributedConnectionTimeout()); assertEquals("distrib socket timeout", 33, cfg.getUpdateShardHandlerConfig().getDistributedSocketTimeout());