mirror of
https://github.com/apache/lucene.git
synced 2025-02-28 05:19:17 +00:00
SOLR-7280: In cloud-mode sort the cores smartly before loading & limit threads to improve cluster stability
This commit is contained in:
parent
4f45226174
commit
6c1b75b06b
@ -147,6 +147,9 @@ Bug Fixes
|
||||
* SOLR-9287: Including 'score' in the 'fl' param when doing an RTG no longer causes an NPE
|
||||
(hossman, Ishan Chattopadhyaya)
|
||||
|
||||
* SOLR-7280: In cloud-mode sort the cores smartly before loading & limit threads to improve cluster stability
|
||||
(noble, Erick Erickson, shalin)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
@ -115,7 +115,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
* <p>
|
||||
* TODO: exceptions during close on attempts to update cloud state
|
||||
*/
|
||||
public final class ZkController {
|
||||
public class ZkController {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
static final int WAIT_DOWN_STATES_TIMEOUT_SECONDS = 60;
|
||||
|
@ -25,6 +25,8 @@ import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH
|
||||
import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
|
||||
import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
|
||||
import static org.apache.solr.common.params.CommonParams.ZK_PATH;
|
||||
import static org.apache.solr.core.NodeConfig.NodeConfigBuilder.DEFAULT_CORE_LOAD_THREADS;
|
||||
import static org.apache.solr.core.NodeConfig.NodeConfigBuilder.DEFAULT_CORE_LOAD_THREADS_IN_CLOUD;
|
||||
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -33,6 +35,8 @@ import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
@ -349,7 +353,7 @@ public class CoreContainer {
|
||||
}
|
||||
if (builder.getAuthSchemeRegistryProvider() != null) {
|
||||
httpClientBuilder.setAuthSchemeRegistryProvider(new AuthSchemeRegistryProvider() {
|
||||
|
||||
|
||||
@Override
|
||||
public Lookup<AuthSchemeProvider> getAuthSchemeRegistry() {
|
||||
return builder.getAuthSchemeRegistryProvider().getAuthSchemeRegistry();
|
||||
@ -485,17 +489,20 @@ public class CoreContainer {
|
||||
containerProperties.putAll(cfg.getSolrProperties());
|
||||
|
||||
// setup executor to load cores in parallel
|
||||
// do not limit the size of the executor in zk mode since cores may try and wait for each other.
|
||||
ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(
|
||||
( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ),
|
||||
cfg.getCoreLoadThreadCount(isZooKeeperAware() ? DEFAULT_CORE_LOAD_THREADS_IN_CLOUD : DEFAULT_CORE_LOAD_THREADS),
|
||||
new DefaultSolrThreadFactory("coreLoadExecutor") );
|
||||
final List<Future<SolrCore>> futures = new ArrayList<Future<SolrCore>>();
|
||||
final List<Future<SolrCore>> futures = new ArrayList<>();
|
||||
try {
|
||||
|
||||
List<CoreDescriptor> cds = coresLocator.discover(this);
|
||||
if (isZooKeeperAware()) {
|
||||
//sort the cores if it is in SolrCloud. In standalone node the order does not matter
|
||||
CoreSorter coreComparator = new CoreSorter().init(this);
|
||||
cds = new ArrayList<>(cds);//make a copy
|
||||
Collections.sort(cds, coreComparator::compare);
|
||||
}
|
||||
checkForDuplicateCoreNames(cds);
|
||||
|
||||
|
||||
for (final CoreDescriptor cd : cds) {
|
||||
if (cd.isTransient() || !cd.isLoadOnStartup()) {
|
||||
solrCores.putDynamicDescriptor(cd.getName(), cd);
|
||||
@ -1258,6 +1265,10 @@ public class CoreContainer {
|
||||
return authenticationPlugin == null ? null : authenticationPlugin.plugin;
|
||||
}
|
||||
|
||||
public NodeConfig getNodeConfig() {
|
||||
return cfg;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class CloserThread extends Thread {
|
||||
|
186
solr/core/src/java/org/apache/solr/core/CoreSorter.java
Normal file
186
solr/core/src/java/org/apache/solr/core/CoreSorter.java
Normal file
@ -0,0 +1,186 @@
|
||||
package org.apache.solr.core;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This is a utility class that sorts cores in such a way as to minimize other cores
|
||||
* waiting for replicas in the current node. This helps in avoiding leaderVote timeouts
|
||||
* happening in other nodes of the cluster
|
||||
*
|
||||
*/
|
||||
public class CoreSorter {
|
||||
Map<String, CountsForEachShard> shardsVsReplicaCounts = new LinkedHashMap<>();
|
||||
CoreContainer cc;
|
||||
private static final CountsForEachShard zero = new CountsForEachShard(0, 0, 0);
|
||||
|
||||
public final static Comparator<CountsForEachShard> countsComparator = (c1, c2) -> {
|
||||
if (c1 == null) c1 = zero;//just to avoid NPE
|
||||
if (c2 == null) c2 = zero;
|
||||
if (c1.totalReplicasInDownNodes < c2.totalReplicasInDownNodes) {
|
||||
//Prioritize replicas with least no:of down nodes waiting.
|
||||
//It's better to bring up a node that is a member of a shard
|
||||
//with 0 down nodes than 1 down node because it will make the shard
|
||||
// complete earlier and avoid waiting by the other live nodes
|
||||
if (c1.totalReplicasInLiveNodes > 0) {
|
||||
//means nobody else is waiting for this , so no need to prioritize
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
if (c2.totalReplicasInDownNodes < c1.totalReplicasInDownNodes) {
|
||||
//same is the above, just to take care of the case where c2 has to be prioritized
|
||||
if (c2.totalReplicasInLiveNodes > 0) {
|
||||
//means nobody else is waiting for this , so no need to priotitize
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
//Prioritize replicas where most no:of other nodes are waiting for
|
||||
// For example if 1 other replicas are waiting for this replica, then
|
||||
// prioritize that over the replica were zero other nodes are waiting
|
||||
if (c1.totalReplicasInLiveNodes > c2.totalReplicasInLiveNodes) return -1;
|
||||
if (c2.totalReplicasInLiveNodes > c1.totalReplicasInLiveNodes) return 1;
|
||||
|
||||
//If all else is same. prioritize fewer replicas I have because that will complete the
|
||||
//quorum for shard faster. If I have only one replica for a shard I can finish it faster
|
||||
// than a shard with 2 replicas in this node
|
||||
if (c1.myReplicas < c2.myReplicas) return -1;
|
||||
if (c2.myReplicas < c1.myReplicas) return 1;
|
||||
//if everything is same return 0
|
||||
return 0;
|
||||
};
|
||||
|
||||
|
||||
public CoreSorter init(CoreContainer cc) {
|
||||
this.cc = cc;
|
||||
if (cc == null || !cc.isZooKeeperAware()) {
|
||||
return this;
|
||||
}
|
||||
String myNodeName = getNodeName();
|
||||
ClusterState state = cc.getZkController().getClusterState();
|
||||
for (CloudDescriptor cloudDescriptor : getCloudDescriptors()) {
|
||||
String coll = cloudDescriptor.getCollectionName();
|
||||
String sliceName = getShardName(cloudDescriptor);
|
||||
if (shardsVsReplicaCounts.containsKey(sliceName)) continue;
|
||||
CountsForEachShard c = new CountsForEachShard(0, 0, 0);
|
||||
for (Replica replica : getReplicas(state, coll, cloudDescriptor.getShardId())) {
|
||||
if (replica.getNodeName().equals(myNodeName)) {
|
||||
c.myReplicas++;
|
||||
} else {
|
||||
Set<String> liveNodes = state.getLiveNodes();
|
||||
if (liveNodes.contains(replica.getNodeName())) {
|
||||
c.totalReplicasInLiveNodes++;
|
||||
} else {
|
||||
c.totalReplicasInDownNodes++;
|
||||
}
|
||||
}
|
||||
}
|
||||
shardsVsReplicaCounts.put(sliceName, c);
|
||||
}
|
||||
|
||||
return this;
|
||||
|
||||
}
|
||||
|
||||
|
||||
public int compare(CoreDescriptor cd1, CoreDescriptor cd2) {
|
||||
String s1 = getShardName(cd1.getCloudDescriptor());
|
||||
String s2 = getShardName(cd2.getCloudDescriptor());
|
||||
if (s1 == null || s2 == null) return cd1.getName().compareTo(cd2.getName());
|
||||
CountsForEachShard c1 = shardsVsReplicaCounts.get(s1);
|
||||
CountsForEachShard c2 = shardsVsReplicaCounts.get(s2);
|
||||
int result = countsComparator.compare(c1, c2);
|
||||
return result == 0 ? s1.compareTo(s2) : result;
|
||||
}
|
||||
|
||||
|
||||
static class CountsForEachShard {
|
||||
public int totalReplicasInDownNodes = 0, myReplicas = 0, totalReplicasInLiveNodes = 0;
|
||||
|
||||
public CountsForEachShard(int totalReplicasInDownNodes, int totalReplicasInLiveNodes,int myReplicas) {
|
||||
this.totalReplicasInDownNodes = totalReplicasInDownNodes;
|
||||
this.myReplicas = myReplicas;
|
||||
this.totalReplicasInLiveNodes = totalReplicasInLiveNodes;
|
||||
}
|
||||
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof CountsForEachShard) {
|
||||
CountsForEachShard that = (CountsForEachShard) obj;
|
||||
return that.totalReplicasInDownNodes == totalReplicasInDownNodes && that.myReplicas == myReplicas;
|
||||
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "down : " + totalReplicasInDownNodes + " , up : " + totalReplicasInLiveNodes + " my : " + myReplicas;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
static String getShardName(CloudDescriptor cd) {
|
||||
return cd == null ?
|
||||
null :
|
||||
cd.getCollectionName()
|
||||
+ "_"
|
||||
+ cd.getShardId();
|
||||
}
|
||||
|
||||
|
||||
String getNodeName() {
|
||||
return cc.getNodeConfig().getNodeName();
|
||||
}
|
||||
|
||||
/**Return all replicas for a given collection+slice combo
|
||||
*/
|
||||
Collection<Replica> getReplicas(ClusterState cs, String coll, String slice) {
|
||||
DocCollection c = cs.getCollectionOrNull(coll);
|
||||
if (c == null) return emptyList();
|
||||
Slice s = c.getSlice(slice);
|
||||
if (s == null) return emptyList();
|
||||
return s.getReplicas();
|
||||
}
|
||||
|
||||
|
||||
/**return cloud descriptors for all cores in this node
|
||||
*/
|
||||
Collection<CloudDescriptor> getCloudDescriptors() {
|
||||
return cc.getCores()
|
||||
.stream()
|
||||
.map((core) -> core.getCoreDescriptor().getCloudDescriptor())
|
||||
.collect(toList());
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -50,7 +50,7 @@ public class NodeConfig {
|
||||
|
||||
private final CloudConfig cloudConfig;
|
||||
|
||||
private final int coreLoadThreads;
|
||||
private final Integer coreLoadThreads;
|
||||
|
||||
private final int transientCacheSize;
|
||||
|
||||
@ -64,7 +64,7 @@ public class NodeConfig {
|
||||
PluginInfo shardHandlerFactoryConfig, UpdateShardHandlerConfig updateShardHandlerConfig,
|
||||
String coreAdminHandlerClass, String collectionsAdminHandlerClass,
|
||||
String infoHandlerClass, String configSetsHandlerClass,
|
||||
LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, int coreLoadThreads,
|
||||
LogWatcherConfig logWatcherConfig, CloudConfig cloudConfig, Integer coreLoadThreads,
|
||||
int transientCacheSize, boolean useSchemaCache, String managementPath, SolrResourceLoader loader,
|
||||
Properties solrProperties, PluginInfo[] backupRepositoryPlugins) {
|
||||
this.nodeName = nodeName;
|
||||
@ -87,7 +87,7 @@ public class NodeConfig {
|
||||
this.solrProperties = solrProperties;
|
||||
this.backupRepositoryPlugins = backupRepositoryPlugins;
|
||||
|
||||
if (this.cloudConfig != null && this.coreLoadThreads < 2) {
|
||||
if (this.cloudConfig != null && this.getCoreLoadThreadCount(NodeConfigBuilder.DEFAULT_CORE_LOAD_THREADS) < 2) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"SolrCloud requires a value of at least 2 for coreLoadThreads (configured value = " + this.coreLoadThreads + ")");
|
||||
}
|
||||
@ -109,8 +109,8 @@ public class NodeConfig {
|
||||
return updateShardHandlerConfig;
|
||||
}
|
||||
|
||||
public int getCoreLoadThreadCount() {
|
||||
return coreLoadThreads;
|
||||
public int getCoreLoadThreadCount(int def) {
|
||||
return coreLoadThreads == null ? def : coreLoadThreads;
|
||||
}
|
||||
|
||||
public String getSharedLibDirectory() {
|
||||
@ -185,7 +185,7 @@ public class NodeConfig {
|
||||
private String configSetsHandlerClass = DEFAULT_CONFIGSETSHANDLERCLASS;
|
||||
private LogWatcherConfig logWatcherConfig = new LogWatcherConfig(true, null, null, 50);
|
||||
private CloudConfig cloudConfig;
|
||||
private int coreLoadThreads = DEFAULT_CORE_LOAD_THREADS;
|
||||
private Integer coreLoadThreads;
|
||||
private int transientCacheSize = DEFAULT_TRANSIENT_CACHE_SIZE;
|
||||
private boolean useSchemaCache = false;
|
||||
private String managementPath;
|
||||
@ -195,7 +195,9 @@ public class NodeConfig {
|
||||
private final SolrResourceLoader loader;
|
||||
private final String nodeName;
|
||||
|
||||
private static final int DEFAULT_CORE_LOAD_THREADS = 3;
|
||||
public static final int DEFAULT_CORE_LOAD_THREADS = 3;
|
||||
//No:of core load threads in cloud mode is set to a default of 24
|
||||
public static final int DEFAULT_CORE_LOAD_THREADS_IN_CLOUD = 24;
|
||||
|
||||
private static final int DEFAULT_TRANSIENT_CACHE_SIZE = Integer.MAX_VALUE;
|
||||
|
||||
|
246
solr/core/src/test/org/apache/solr/core/CoreSorterTest.java
Normal file
246
solr/core/src/test/org/apache/solr/core/CoreSorterTest.java
Normal file
@ -0,0 +1,246 @@
|
||||
package org.apache.solr.core;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreSorter.CountsForEachShard;
|
||||
import org.apache.solr.util.MockCoreContainer;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.solr.core.CoreSorter.getShardName;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expectLastCall;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.reset;
|
||||
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
public class CoreSorterTest extends SolrTestCaseJ4 {
|
||||
Map<String, Boolean> nodes = new LinkedHashMap<>();
|
||||
Set<String> liveNodes = new HashSet<>();
|
||||
|
||||
public void testComparator() {
|
||||
List<CountsForEachShard> l = new ArrayList<>();
|
||||
// DN LIV MY
|
||||
l.add(new CountsForEachShard(1, 3, 1));
|
||||
l.add(new CountsForEachShard(0, 3, 2));
|
||||
l.add(new CountsForEachShard(0, 3, 3));
|
||||
l.add(new CountsForEachShard(0, 3, 4));
|
||||
l.add(new CountsForEachShard(1, 0, 2));
|
||||
l.add(new CountsForEachShard(1, 0, 1));
|
||||
l.add(new CountsForEachShard(2, 5, 1));
|
||||
l.add(new CountsForEachShard(2, 4, 2));
|
||||
l.add(new CountsForEachShard(2, 3, 3));
|
||||
|
||||
List<CountsForEachShard> expected = Arrays.asList(
|
||||
new CountsForEachShard(0, 3, 2),
|
||||
new CountsForEachShard(0, 3, 3),
|
||||
new CountsForEachShard(0, 3, 4),
|
||||
new CountsForEachShard(1, 3, 1),
|
||||
new CountsForEachShard(2, 5, 1),
|
||||
new CountsForEachShard(2, 4, 2),
|
||||
new CountsForEachShard(2, 3, 3),
|
||||
new CountsForEachShard(1, 0, 1),
|
||||
new CountsForEachShard(1, 0, 2)
|
||||
|
||||
);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
List<CountsForEachShard> copy = new ArrayList<>(l);
|
||||
Collections.shuffle(copy);
|
||||
Collections.sort(copy, CoreSorter.countsComparator);
|
||||
for (int j = 0; j < copy.size(); j++) {
|
||||
assertEquals(expected.get(j), copy.get(j));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSort() throws Exception {
|
||||
CoreContainer mockCC = getMockContainer();
|
||||
MockCoreSorter coreSorter = (MockCoreSorter) new MockCoreSorter().init(mockCC);
|
||||
List<CoreDescriptor> copy = new ArrayList<>(coreSorter.getLocalCores());
|
||||
Collections.sort(copy, coreSorter::compare);
|
||||
List<CountsForEachShard> l = copy.stream()
|
||||
.map(CoreDescriptor::getCloudDescriptor)
|
||||
.map(it -> coreSorter.shardsVsReplicaCounts.get(getShardName(it)))
|
||||
.collect(toList());
|
||||
for (int i = 1; i < l.size(); i++) {
|
||||
CountsForEachShard curr = l.get(i);
|
||||
CountsForEachShard prev = l.get(i-1);
|
||||
assertTrue(CoreSorter.countsComparator.compare(prev, curr) < 1);
|
||||
}
|
||||
|
||||
for (CountsForEachShard c : l) {
|
||||
System.out.println(c);
|
||||
}
|
||||
}
|
||||
|
||||
private CoreContainer getMockContainer() {
|
||||
CoreContainer mockCC = createMock(CoreContainer.class);
|
||||
ZkController mockZKC = createMock(ZkController.class);
|
||||
ClusterState mockClusterState = createMock(ClusterState.class);
|
||||
reset(mockCC, mockZKC, mockClusterState);
|
||||
mockCC.isZooKeeperAware();
|
||||
expectLastCall().andAnswer(() -> Boolean.TRUE).anyTimes();
|
||||
mockCC.getZkController();
|
||||
expectLastCall().andAnswer(() -> mockZKC).anyTimes();
|
||||
mockClusterState.getLiveNodes();
|
||||
expectLastCall().andAnswer(() -> liveNodes).anyTimes();
|
||||
mockZKC.getClusterState();
|
||||
expectLastCall().andAnswer(() -> mockClusterState).anyTimes();
|
||||
replay(mockCC, mockZKC, mockClusterState);
|
||||
return mockCC;
|
||||
}
|
||||
|
||||
static class ReplicaInfo {
|
||||
final int coll, slice, replica;
|
||||
final String replicaName;
|
||||
CloudDescriptor cd;
|
||||
|
||||
ReplicaInfo(int coll, int slice, int replica) {
|
||||
this.coll = coll;
|
||||
this.slice = slice;
|
||||
this.replica = replica;
|
||||
replicaName = "coll_" + coll + "_" + slice + "_" + replica;
|
||||
Properties p = new Properties();
|
||||
p.setProperty(CoreDescriptor.CORE_SHARD, "shard_" + slice);
|
||||
p.setProperty(CoreDescriptor.CORE_COLLECTION, "coll_" + slice);
|
||||
p.setProperty(CoreDescriptor.CORE_NODE_NAME, replicaName);
|
||||
cd = new CloudDescriptor(replicaName, p, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof ReplicaInfo) {
|
||||
ReplicaInfo replicaInfo = (ReplicaInfo) obj;
|
||||
return replicaInfo.replicaName.equals(replicaName);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return replicaName.hashCode();
|
||||
}
|
||||
|
||||
CloudDescriptor getCloudDescriptor() {
|
||||
return cd;
|
||||
|
||||
}
|
||||
|
||||
public Replica getReplica(String node) {
|
||||
return new Replica(replicaName, Utils.makeMap("core", replicaName, "node_name", node));
|
||||
}
|
||||
|
||||
public boolean equals(String coll, String slice) {
|
||||
return cd.getCollectionName().equals(coll) && slice.equals(cd.getShardId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MockCoreSorter extends CoreSorter {
|
||||
int numColls = 1 + random().nextInt(3);
|
||||
int numReplicas = 2 + random().nextInt(2);
|
||||
int numShards = 50 + random().nextInt(10);
|
||||
String myNodeName;
|
||||
Collection<CloudDescriptor> myCores = new ArrayList<>();
|
||||
List<CoreDescriptor> localCores = new ArrayList<>();
|
||||
|
||||
Map<ReplicaInfo, String> replicaPositions = new LinkedHashMap<>();//replicaname vs. nodename
|
||||
|
||||
public MockCoreSorter() {
|
||||
int totalNodes = 50 + random().nextInt(10);
|
||||
int myNode = random().nextInt(totalNodes);
|
||||
List<String> nodeNames = new ArrayList<>();
|
||||
for (int i = 0; i < totalNodes; i++) {
|
||||
String s = "192.168.1." + i + ":8983_solr";
|
||||
if (i == myNode) myNodeName = s;
|
||||
boolean on = random().nextInt(100) < 70;
|
||||
nodes.put(s,
|
||||
on);//70% chance that the node is up;
|
||||
nodeNames.add(s);
|
||||
if(on) liveNodes.add(s);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numColls; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
for (int k = 0; k < numReplicas; k++) {
|
||||
ReplicaInfo ri = new ReplicaInfo(i, j, k);
|
||||
replicaPositions.put(ri, nodeNames.get(random().nextInt(totalNodes)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<ReplicaInfo, String> e : replicaPositions.entrySet()) {
|
||||
if (e.getValue().equals(myNodeName)) {
|
||||
myCores.add(e.getKey().getCloudDescriptor());
|
||||
localCores.add(new MockCoreContainer.MockCoreDescriptor() {
|
||||
@Override
|
||||
public CloudDescriptor getCloudDescriptor() {
|
||||
return e.getKey().getCloudDescriptor();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
String getNodeName() {
|
||||
return myNodeName;
|
||||
}
|
||||
|
||||
@Override
|
||||
Collection<CloudDescriptor> getCloudDescriptors() {
|
||||
return myCores;
|
||||
|
||||
}
|
||||
|
||||
public List<CoreDescriptor> getLocalCores() {
|
||||
return localCores;
|
||||
}
|
||||
|
||||
@Override
|
||||
Collection<Replica> getReplicas(ClusterState cs, String coll, String slice) {
|
||||
List<Replica> r = new ArrayList<>();
|
||||
for (Map.Entry<ReplicaInfo, String> e : replicaPositions.entrySet()) {
|
||||
if (e.getKey().equals(coll, slice)) {
|
||||
r.add(e.getKey().getReplica(e.getValue()));
|
||||
}
|
||||
}
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -73,7 +73,7 @@ public class TestSolrXml extends SolrTestCaseJ4 {
|
||||
assertEquals("collection handler class", "testCollectionsHandler", cfg.getCollectionsHandlerClass());
|
||||
assertEquals("info handler class", "testInfoHandler", cfg.getInfoHandlerClass());
|
||||
assertEquals("config set handler class", "testConfigSetsHandler", cfg.getConfigSetsHandlerClass());
|
||||
assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount());
|
||||
assertEquals("core load threads", 11, cfg.getCoreLoadThreadCount(0));
|
||||
assertThat("core root dir", cfg.getCoreRootDirectory().toString(), containsString("testCoreRootDirectory"));
|
||||
assertEquals("distrib conn timeout", 22, cfg.getUpdateShardHandlerConfig().getDistributedConnectionTimeout());
|
||||
assertEquals("distrib socket timeout", 33, cfg.getUpdateShardHandlerConfig().getDistributedSocketTimeout());
|
||||
|
Loading…
x
Reference in New Issue
Block a user