SOLR-11730: Add simulated tests for nodeAdded / nodeLost dynamic in a large cluster.

Plus some other fixes:
* Fix leader election throttle and cluster state versioning in the simulator.
* PolicyHelper was still using a static ThreadLocal field, use ObjectCache isntead.
This commit is contained in:
Andrzej Bialecki 2017-12-22 12:54:48 +01:00
parent 67e1b4a19b
commit 091f45dd7b
14 changed files with 479 additions and 155 deletions

View File

@ -70,7 +70,7 @@ public class ActionThrottle {
long diff = timeSource.getTime() - lastActionStartedAt;
int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS);
long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS);
log.info("The last {} attempt started {}ms ago.", name, diffMs);
log.debug("The last {} attempt started {}ms ago.", name, diffMs);
int sleep = 0;
if (diffMs > 0 && diff < minNsBetweenActions) {

View File

@ -74,7 +74,7 @@ public class ExecutePlanAction extends TriggerActionBase {
req.setWaitForFinalState(true);
String asyncId = event.getSource() + '/' + event.getId() + '/' + counter;
String znode = saveAsyncId(cloudManager.getDistribStateManager(), event, asyncId);
log.debug("Saved requestId: {} in znode: {}", asyncId, znode);
log.trace("Saved requestId: {} in znode: {}", asyncId, znode);
// TODO: find a better way of using async calls using dataProvider API !!!
req.setAsyncId(asyncId);
SolrResponse asyncResponse = cloudManager.request(req);
@ -132,7 +132,7 @@ public class ExecutePlanAction extends TriggerActionBase {
statusResponse = (CollectionAdminRequest.RequestStatusResponse)cloudManager.request(CollectionAdminRequest.requestStatus(requestId));
state = statusResponse.getRequestStatus();
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) {
log.debug("Task with requestId={} finished with state={} in {}s", requestId, state, i * 5);
log.trace("Task with requestId={} finished with state={} in {}s", requestId, state, i * 5);
cloudManager.request(CollectionAdminRequest.deleteAsyncId(requestId));
return statusResponse;
} else if (state == RequestStatusState.NOT_FOUND) {
@ -156,7 +156,7 @@ public class ExecutePlanAction extends TriggerActionBase {
throw e;
}
if (i > 0 && i % 5 == 0) {
log.debug("Task with requestId={} still not complete after {}s. Last state={}", requestId, i * 5, state);
log.trace("Task with requestId={} still not complete after {}s. Last state={}", requestId, i * 5, state);
}
cloudManager.getTimeSource().sleep(5000);
}

View File

@ -163,7 +163,7 @@ public class NodeLostTrigger extends TriggerBase {
removeMarker(n);
});
} else {
log.debug("NodeLostTrigger listener for lost nodes: {} is not ready, will try later", nodeNames);
log.debug("NodeLostTrigger processor for lost nodes: {} is not ready, will try later", nodeNames);
}
} else {
nodeNames.forEach(n -> {

View File

@ -37,6 +37,10 @@ public class LiveNodesSet {
return Collections.unmodifiableSet(set);
}
public int size() {
return set.size();
}
public void registerLiveNodesListener(LiveNodesListener listener) {
listeners.add(listener);
}

View File

@ -27,7 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
@ -96,10 +96,10 @@ public class SimCloudManager implements SolrCloudManager {
private TimeSource timeSource;
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
private final ExecutorService simCloudManagerPool;
private final Map<String, AtomicLong> opCounts = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
private ExecutorService simCloudManagerPool;
private Overseer.OverseerThread triggerThread;
private ThreadGroup triggerThreadGroup;
private SolrResourceLoader loader;
@ -327,7 +327,8 @@ public class SimCloudManager implements SolrCloudManager {
/**
* Simulate the effect of restarting Overseer leader - in this case this means restarting the
* OverseerTriggerThread and optionally killing a node.
* OverseerTriggerThread and optionally killing a node. All background tasks currently in progress
* will be interrupted.
* @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
*/
public void simRestartOverseer(String killNodeId) throws Exception {
@ -335,9 +336,17 @@ public class SimCloudManager implements SolrCloudManager {
triggerThread.interrupt();
IOUtils.closeQuietly(triggerThread);
if (killNodeId != null) {
simRemoveNode(killNodeId, true);
simRemoveNode(killNodeId, false);
}
objectCache.clear();
try {
simCloudManagerPool.shutdownNow();
} catch (Exception e) {
// ignore
}
simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new DefaultSolrThreadFactory("simCloudManagerPool"));
OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
@ -378,6 +387,10 @@ public class SimCloudManager implements SolrCloudManager {
return opCounts;
}
public void simResetOpCounts() {
opCounts.clear();
}
/**
* Get the number of processed operations of a specified type.
* @param op operation name, eg. MOVEREPLICA
@ -497,7 +510,7 @@ public class SimCloudManager implements SolrCloudManager {
if (action == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
}
LOG.debug("Invoking Collection Action :{} with params {}", action.toLower(), req.getParams().toQueryString());
LOG.trace("Invoking Collection Action :{} with params {}", action.toLower(), req.getParams().toQueryString());
NamedList results = new NamedList();
rsp.setResponse(results);
incrementCount(action.name());

View File

@ -30,7 +30,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -72,12 +71,17 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
@ -108,7 +112,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private final ReentrantLock lock = new ReentrantLock();
private final ActionThrottle leaderThrottle;
private final Map<String, Map<String, ActionThrottle>> leaderThrottles = new ConcurrentHashMap<>();
// default map of: operation -> delay
private final Map<String, Long> defaultOpDelays = new HashMap<>();
@ -116,7 +120,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
private volatile int clusterStateVersion = -1;
private volatile int clusterStateVersion = 0;
private Map<String, Object> lastSavedProperties = null;
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
@ -133,7 +137,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getSimDistribStateManager();
this.leaderThrottle = new ActionThrottle("leader", 5000, cloudManager.getTimeSource());
// names are CollectionAction operation names, delays are in ms (simulated time)
defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.DELETEREPLICA.name(), 5000L);
@ -191,10 +194,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
/**
* Reset the leader election throttle.
* Reset the leader election throttles.
*/
public void simResetLeaderThrottle() {
leaderThrottle.reset();
public void simResetLeaderThrottles() {
leaderThrottles.clear();
}
private ActionThrottle getThrottle(String collection, String shard) {
return leaderThrottles.computeIfAbsent(collection, coll -> new ConcurrentHashMap<>())
.computeIfAbsent(shard, s -> new ActionThrottle("leader", 5000, cloudManager.getTimeSource()));
}
/**
@ -225,30 +233,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
}
// utility class to run leader election in a separate thread and with throttling
// Note: leader election is a no-op if a shard leader already exists for each shard
private class LeaderElection implements Callable<Boolean> {
Collection<String> collections;
boolean saveClusterState;
LeaderElection(Collection<String> collections, boolean saveClusterState) {
this.collections = collections;
this.saveClusterState = saveClusterState;
}
@Override
public Boolean call() {
leaderThrottle.minimumWaitBetweenActions();
leaderThrottle.markAttemptingAction();
try {
simRunLeaderElection(collections, saveClusterState);
} catch (Exception e) {
return false;
}
return true;
}
}
/**
* Remove node from a cluster. This is equivalent to a situation when a node is lost.
* All replicas that were assigned to this node are marked as DOWN.
@ -273,7 +257,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
}
if (!collections.isEmpty()) {
cloudManager.submit(new LeaderElection(collections, true));
simRunLeaderElection(collections, true);
}
return res;
} finally {
@ -326,7 +310,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
if (!collections.isEmpty()) {
collectionsStatesRef.set(null);
cloudManager.submit(new LeaderElection(collections, true));
simRunLeaderElection(collections, true);
return true;
} else {
return false;
@ -431,7 +415,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 10);
if (runLeaderElection) {
cloudManager.submit(new LeaderElection(Collections.singleton(replicaInfo.getCollection()), true));
simRunLeaderElection(Collections.singleton(replicaInfo.getCollection()), true);
}
} finally {
lock.unlock();
@ -468,7 +452,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 10);
}
LOG.trace("-- simRemoveReplica {}", ri);
cloudManager.submit(new LeaderElection(Collections.singleton(ri.getCollection()), true));
simRunLeaderElection(Collections.singleton(ri.getCollection()), true);
return;
}
}
@ -482,14 +466,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* Save clusterstate.json to {@link DistribStateManager}.
* @return saved state
*/
private ClusterState saveClusterState(ClusterState state) throws IOException {
private synchronized ClusterState saveClusterState(ClusterState state) throws IOException {
byte[] data = Utils.toJSON(state);
try {
VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
int version = oldData != null ? oldData.getVersion() : -1;
Assert.assertEquals(clusterStateVersion, version + 1);
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
LOG.trace("-- saved cluster state version=" + clusterStateVersion +
", zkVersion=" + (version + 1) + ", {}", state);
clusterStateVersion++;
} catch (Exception e) {
throw new IOException(e);
}
@ -515,75 +499,89 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param collections list of affected collections
* @param saveClusterState if true then save cluster state regardless of changes.
*/
private synchronized void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
private void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
ClusterState state = getClusterState();
AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
if (saveClusterState) {
collectionsStatesRef.set(null);
}
state.forEachCollection(dc -> {
if (!collections.contains(dc.getName())) {
return;
}
dc.getSlices().forEach(s -> {
Replica leader = s.getLeader();
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
LOG.trace("Running leader election for " + dc.getName() + " / " + s.getName());
if (s.getReplicas().isEmpty()) { // no replicas - punt
return;
dc.getSlices().forEach(s ->
cloudManager.submit(() -> {
simRunLeaderElection(dc.getName(), s, saveClusterState);
return true;
})
);
});
}
private void simRunLeaderElection(String collection, Slice s, boolean saveClusterState) throws Exception {
AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
Replica leader = s.getLeader();
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
LOG.trace("Running leader election for " + collection + " / " + s.getName());
if (s.getReplicas().isEmpty()) { // no replicas - punt
return;
}
ActionThrottle lt = getThrottle(collection, s.getName());
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
// mark all replicas as non-leader (probably not necessary) and collect all active and live
List<ReplicaInfo> active = new ArrayList<>();
s.getReplicas().forEach(r -> {
AtomicReference<ReplicaInfo> riRef = new AtomicReference<>();
// find our ReplicaInfo for this replica
nodeReplicaMap.get(r.getNodeName()).forEach(info -> {
if (info.getName().equals(r.getName())) {
riRef.set(info);
}
// mark all replicas as non-leader (probably not necessary) and collect all active and live
List<ReplicaInfo> active = new ArrayList<>();
s.getReplicas().forEach(r -> {
AtomicReference<ReplicaInfo> riRef = new AtomicReference<>();
// find our ReplicaInfo for this replica
nodeReplicaMap.get(r.getNodeName()).forEach(info -> {
if (info.getName().equals(r.getName())) {
riRef.set(info);
}
});
ReplicaInfo ri = riRef.get();
if (ri == null) {
throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
}
synchronized (ri) {
if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
stateChanged.set(true);
}
if (r.isActive(liveNodes.get())) {
active.add(ri);
} else { // if it's on a node that is not live mark it down
if (!liveNodes.contains(r.getNodeName())) {
ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
}
}
}
});
if (active.isEmpty()) {
LOG.warn("-- can't find any active replicas for " + dc.getName() + " / " + s.getName());
return;
});
ReplicaInfo ri = riRef.get();
if (ri == null) {
throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
}
synchronized (ri) {
if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
stateChanged.set(true);
}
// pick first active one
ReplicaInfo ri = null;
for (ReplicaInfo a : active) {
if (!a.getType().equals(Replica.Type.PULL)) {
ri = a;
break;
if (r.isActive(liveNodes.get())) {
active.add(ri);
} else { // if it's on a node that is not live mark it down
if (!liveNodes.contains(r.getNodeName())) {
ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
stateChanged.set(true);
}
}
if (ri == null) {
LOG.warn("-- can't find any suitable replica type for " + dc.getName() + " / " + s.getName());
return;
}
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
stateChanged.set(true);
LOG.debug("-- elected new leader for " + dc.getName() + " / " + s.getName() + ": " + ri);
} else {
LOG.trace("-- already has leader for {} / {}", dc.getName(), s.getName());
}
});
});
if (saveClusterState || stateChanged.get()) {
if (active.isEmpty()) {
LOG.warn("-- can't find any active replicas for " + collection + " / " + s.getName());
return;
}
// pick first active one
ReplicaInfo ri = null;
for (ReplicaInfo a : active) {
if (!a.getType().equals(Replica.Type.PULL)) {
ri = a;
break;
}
}
if (ri == null) {
LOG.warn("-- can't find any suitable replica type for " + collection + " / " + s.getName());
return;
}
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
stateChanged.set(true);
LOG.debug("-- elected new leader for " + collection + " / " + s.getName() + ": " + ri);
} else {
LOG.trace("-- already has leader for {} / {}", collection, s.getName());
}
if (stateChanged.get()) {
collectionsStatesRef.set(null);
}
}
@ -618,6 +616,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
// calculate expected number of positions
int numTlogReplicas = props.getInt(TLOG_REPLICAS, 0);
int numNrtReplicas = props.getInt(NRT_REPLICAS, props.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
int numPullReplicas = props.getInt(PULL_REPLICAS, 0);
int totalReplicas = shardNames.size() * (numNrtReplicas + numPullReplicas + numTlogReplicas);
Assert.assertEquals("unexpected number of replica positions", totalReplicas, replicaPositions.size());
final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size());
AtomicInteger replicaNum = new AtomicInteger(1);
replicaPositions.forEach(pos -> {
@ -652,7 +656,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
});
});
cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
simRunLeaderElection(Collections.singleton(collectionName), true);
if (waitForFinalState) {
boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
@ -678,6 +682,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
try {
collProperties.remove(collection);
sliceProperties.remove(collection);
leaderThrottles.remove(collection);
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
@ -716,9 +721,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
nodeReplicaMap.clear();
collProperties.clear();
sliceProperties.clear();
cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
values.put("cores", 0);
});
leaderThrottles.clear();
cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> values.put("cores", 0));
collectionsStatesRef.set(null);
} finally {
lock.unlock();
@ -831,7 +835,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
});
Map<String, Object> colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
} finally {
lock.unlock();
@ -899,7 +903,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
}
@ -1201,7 +1205,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
lock.lock();
collectionsStatesRef.set(null);
clusterStateVersion++;
saveClusterState.set(true);
try {
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();

View File

@ -26,7 +26,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider;
@ -149,6 +151,35 @@ public class SimNodeStateProvider implements NodeStateProvider {
}
}
/**
* Remove values that correspond to dead nodes. If values contained a 'nodeRole'
* key then /roles.json is updated.
*/
public void simRemoveDeadNodes() {
Set<String> myNodes = new HashSet<>(nodeValues.keySet());
myNodes.removeAll(liveNodesSet.get());
AtomicBoolean updateRoles = new AtomicBoolean(false);
myNodes.forEach(n -> {
LOG.debug("- removing dead node values: " + n);
Map<String, Object> vals = nodeValues.remove(n);
if (vals.containsKey("nodeRole")) {
updateRoles.set(true);
}
});
if (updateRoles.get()) {
saveRoles();
}
}
/**
* Return a set of nodes that are not live but their values are still present.
*/
public Set<String> simGetDeadNodes() {
Set<String> myNodes = new TreeSet<>(nodeValues.keySet());
myNodes.removeAll(liveNodesSet.get());
return myNodes;
}
/**
* Get all node values.
*/

View File

@ -20,15 +20,22 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
@ -63,9 +70,11 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
/** The cluster. */
protected static SimCloudManager cluster;
protected static int clusterNodeCount = 0;
protected static void configureCluster(int nodeCount, TimeSource timeSource) throws Exception {
cluster = SimCloudManager.createCluster(nodeCount, timeSource);
clusterNodeCount = nodeCount;
}
@AfterClass
@ -76,12 +85,91 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
cluster = null;
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if (cluster != null) {
log.info("\n");
log.info("#############################################");
log.info("############ FINAL CLUSTER STATS ############");
log.info("#############################################\n");
log.info("## Live nodes:\t\t" + cluster.getLiveNodesSet().size());
int emptyNodes = 0;
int maxReplicas = 0;
int minReplicas = Integer.MAX_VALUE;
Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>();
int numReplicas = 0;
for (String node : cluster.getLiveNodesSet().get()) {
List<ReplicaInfo> replicas = cluster.getSimClusterStateProvider().simGetReplicaInfos(node);
numReplicas += replicas.size();
if (replicas.size() > maxReplicas) {
maxReplicas = replicas.size();
}
if (minReplicas > replicas.size()) {
minReplicas = replicas.size();
}
for (ReplicaInfo ri : replicas) {
replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
.computeIfAbsent(ri.getState(), s -> new AtomicInteger())
.incrementAndGet();
}
if (replicas.isEmpty()) {
emptyNodes++;
}
}
if (minReplicas == Integer.MAX_VALUE) {
minReplicas = 0;
}
log.info("## Empty nodes:\t" + emptyNodes);
Set<String> deadNodes = cluster.getSimNodeStateProvider().simGetDeadNodes();
log.info("## Dead nodes:\t\t" + deadNodes.size());
deadNodes.forEach(n -> log.info("##\t\t" + n));
log.info("## Collections:\t" + cluster.getSimClusterStateProvider().simListCollections());
log.info("## Max replicas per node:\t" + maxReplicas);
log.info("## Min replicas per node:\t" + minReplicas);
log.info("## Total replicas:\t\t" + numReplicas);
replicaStates.forEach((c, map) -> {
AtomicInteger repCnt = new AtomicInteger();
map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get()));
log.info("## * " + c + "\t\t" + repCnt.get());
map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-12s %4d", s, cnt.get())));
});
log.info("######### Final Solr op counts ##########");
cluster.simGetOpCounts().forEach((k, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-14s %4d", k, cnt.get())));
log.info("######### Autoscaling event counts ###########");
TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
for (SolrInputDocument d : cluster.simGetSystemCollection()) {
if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
continue;
}
counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
.computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
.incrementAndGet();
}
counts.forEach((trigger, map) -> {
log.info("## * Trigger: " + trigger);
map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-11s %4d", s, cnt.get())));
});
}
}
@Override
public void setUp() throws Exception {
super.setUp();
if (cluster != null) {
// clear any persisted auto scaling configuration
// clear any persisted configuration
cluster.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1);
cluster.getDistribStateManager().setData(ZkStateReader.ROLES, Utils.toJSON(new HashMap<>()), -1);
// restore the expected number of nodes
int currentSize = cluster.getLiveNodesSet().size();
if (currentSize < clusterNodeCount) {
int addCnt = clusterNodeCount - currentSize;
while (addCnt-- > 0) {
cluster.simAddNode();
}
} else if (currentSize > clusterNodeCount) {
cluster.simRemoveRandomNodes(currentSize - clusterNodeCount, true, random());
}
// clean any persisted trigger state or events
removeChildren(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
removeChildren(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
@ -89,8 +177,12 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
cluster.getSimClusterStateProvider().simDeleteAllCollections();
cluster.simClearSystemCollection();
cluster.getSimClusterStateProvider().simResetLeaderThrottle();
// clear any dead nodes
cluster.getSimNodeStateProvider().simRemoveDeadNodes();
cluster.getSimClusterStateProvider().simResetLeaderThrottles();
cluster.simRestartOverseer(null);
cluster.getTimeSource().sleep(5000);
cluster.simResetOpCounts();
}
}

View File

@ -81,15 +81,6 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
triggerFiredLatch = new CountDownLatch(1);
actionContextPropsRef.set(null);
if (cluster.getClusterStateProvider().getLiveNodes().size() > NODE_COUNT) {
// stop some to get to original state
int numJetties = cluster.getClusterStateProvider().getLiveNodes().size();
for (int i = 0; i < numJetties - NODE_COUNT; i++) {
String node = cluster.getSimClusterStateProvider().simGetRandomNode(random());
cluster.getSimClusterStateProvider().simRemoveNode(node);
}
}
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
@ -202,7 +193,7 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
"'waitFor' : '1s'," +
"'enabled' : true," +
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name':'test','class':'" + TestComputePlanAction.AssertingTriggerAction.class.getName() + "'}]" +
"{'name':'test','class':'" + AssertingTriggerAction.class.getName() + "'}]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
@ -245,7 +236,7 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
Map context = actionContextPropsRef.get();
assertNotNull(context);
List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
assertNotNull("The operations computed by ComputePlanAction should not be null " + actionContextPropsRef.get(), operations);
assertNotNull("The operations computed by ComputePlanAction should not be null " + actionContextPropsRef.get() + "\nevent: " + eventRef.get(), operations);
operations.forEach(solrRequest -> log.info(solrRequest.getParams().toString()));
assertEquals("ComputePlanAction should have computed exactly 2 operation", 2, operations.size());

View File

@ -46,7 +46,6 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.common.util.TimeSource;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@ -66,19 +65,6 @@ public class TestExecutePlanAction extends SimSolrCloudTestCase {
configureCluster(NODE_COUNT, TimeSource.get("simTime:50"));
}
@Before
public void setUp() throws Exception {
super.setUp();
if (cluster.getClusterStateProvider().getLiveNodes().size() < NODE_COUNT) {
// start some to get to original state
int numJetties = cluster.getClusterStateProvider().getLiveNodes().size();
for (int i = 0; i < NODE_COUNT - numJetties; i++) {
cluster.simAddNode();
}
}
}
@After
public void printState() throws Exception {
log.info("-------------_ FINAL STATE --------------");

View File

@ -42,6 +42,7 @@ import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
import org.apache.solr.cloud.autoscaling.CapturedEvent;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.NamedList;
@ -79,15 +80,10 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
@Before
public void setupTest() throws Exception {
waitForSeconds = 1 + random().nextInt(3);
waitForSeconds = 5;
triggerFiredCount.set(0);
triggerFiredLatch = new CountDownLatch(1);
listenerEvents.clear();
while (cluster.getClusterStateProvider().getLiveNodes().size() < NUM_NODES) {
// perhaps a test stopped a node but didn't start it back
// lets start a node
cluster.simAddNode();
}
}
public static class TestTriggerListener extends TriggerListenerBase {
@ -163,6 +159,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 5, 5, 5, 5);
create.setMaxShardsPerNode(1);
create.setAutoAddReplicas(false);
create.setCreateNodeSet(String.join(",", nodes));
create.process(solrClient);
@ -196,7 +193,6 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
}
log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
log.info("OP COUNTS: " + cluster.simGetOpCounts());
long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
log.info("==== Flaky replicas: {}. Additional MOVEREPLICA count: {}", flakyReplicas, (newMoveReplicaOps - moveReplicaOps));
// flaky nodes lead to a number of MOVEREPLICA that is non-zero but lower than the number of flaky replicas
@ -205,6 +201,208 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
newMoveReplicaOps - moveReplicaOps < flakyReplicas);
}
@Test
public void testAddNode() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
"]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// create a collection with more than 1 replica per node
String collectionName = "testNodeAdded";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", NUM_NODES / 10, NUM_NODES / 10, NUM_NODES / 10, NUM_NODES / 10);
create.setMaxShardsPerNode(5);
create.setAutoAddReplicas(false);
create.process(solrClient);
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 10, NUM_NODES / 10 * 3)) + " ms");
int numAddNode = NUM_NODES / 5;
List<String> addNodesList = new ArrayList<>(numAddNode);
for (int i = 0; i < numAddNode; i++) {
addNodesList.add(cluster.simAddNode());
cluster.getTimeSource().sleep(5000);
}
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
int startedEventPos = -1;
for (int i = 0; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODEADDED".equals(d.getFieldValue("event.type_s")) &&
"STARTED".equals(d.getFieldValue("stage_s"))) {
startedEventPos = i;
break;
}
}
assertTrue("no STARTED event", startedEventPos > -1);
SolrInputDocument startedEvent = systemColl.get(startedEventPos);
int ignored = 0;
int lastIgnoredPos = startedEventPos;
for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODEADDED".equals(d.getFieldValue("event.type_s"))) {
if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
ignored++;
lastIgnoredPos = i;
}
}
}
assertTrue("no IGNORED events", ignored > 0);
// make sure some replicas have been moved
assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 10, NUM_NODES / 10 * 3)) + " ms");
int count = 50;
SolrInputDocument finishedEvent = null;
long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
while (count-- > 0) {
cluster.getTimeSource().sleep(150000);
long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
if (currentNumOps == lastNumOps) {
int size = systemColl.size() - 1;
for (int i = size; i > lastIgnoredPos; i--) {
SolrInputDocument d = systemColl.get(i);
if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
finishedEvent = d;
break;
}
}
break;
} else {
lastNumOps = currentNumOps;
}
}
assertTrue("did not finish processing changes", finishedEvent != null);
long delta = (Long)finishedEvent.getFieldValue("event.time_l") - (Long)startedEvent.getFieldValue("event.time_l");
log.info("#### System stabilized after " + TimeUnit.NANOSECONDS.toMillis(delta) + " ms");
assertTrue("unexpected number of MOVEREPLICA ops", cluster.simGetOpCount("MOVEREPLICA") > 1);
}
@Test
public void testNodeLost() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_trigger'," +
"'event' : 'nodeLost'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
"]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
// create a collection with 1 replica per node
String collectionName = "testNodeLost";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", NUM_NODES / 5, NUM_NODES / 10);
create.setMaxShardsPerNode(5);
create.setAutoAddReplicas(false);
create.process(solrClient);
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
// start killing nodes
int numNodes = NUM_NODES / 5;
List<String> nodes = new ArrayList<>(cluster.getLiveNodesSet().get());
for (int i = 0; i < numNodes; i++) {
// this may also select a node where a replica is moved to, so the total number of
// MOVEREPLICA may vary
cluster.simRemoveNode(nodes.get(i), false);
cluster.getTimeSource().sleep(4000);
}
List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
int startedEventPos = -1;
for (int i = 0; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODELOST".equals(d.getFieldValue("event.type_s")) &&
"STARTED".equals(d.getFieldValue("stage_s"))) {
startedEventPos = i;
break;
}
}
assertTrue("no STARTED event: " + systemColl, startedEventPos > -1);
SolrInputDocument startedEvent = systemColl.get(startedEventPos);
int ignored = 0;
int lastIgnoredPos = startedEventPos;
for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
SolrInputDocument d = systemColl.get(i);
if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("NODELOST".equals(d.getFieldValue("event.type_s"))) {
if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
ignored++;
lastIgnoredPos = i;
}
}
}
assertTrue("no IGNORED events", ignored > 0);
// make sure some replicas have been moved
assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
int count = 50;
SolrInputDocument finishedEvent = null;
long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
while (count-- > 0) {
cluster.getTimeSource().sleep(150000);
long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
if (currentNumOps == lastNumOps) {
int size = systemColl.size() - 1;
for (int i = size; i > lastIgnoredPos; i--) {
SolrInputDocument d = systemColl.get(i);
if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
continue;
}
if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
finishedEvent = d;
break;
}
}
break;
} else {
lastNumOps = currentNumOps;
}
}
assertTrue("did not finish processing changes", finishedEvent != null);
long delta = (Long)finishedEvent.getFieldValue("event.time_l") - (Long)startedEvent.getFieldValue("event.time_l");
log.info("#### System stabilized after " + TimeUnit.NANOSECONDS.toMillis(delta) + " ms");
long ops = cluster.simGetOpCount("MOVEREPLICA");
assertTrue("unexpected number of MOVEREPLICA ops: " + ops, ops >= 40);
}
@Test
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
public void testSearchRate() throws Exception {
@ -255,7 +453,6 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
// simulate search traffic
cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true);
Thread.sleep(1000000000);
// boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
// assertTrue("The trigger did not fire at all", await);
// wait for listener to capture the SUCCEEDED stage

View File

@ -137,7 +137,7 @@ public class TestNodeAddedTrigger extends SimSolrCloudTestCase {
return true;
});
trigger.run(); // first run should detect the new node
cluster.simRemoveNode(newNode, true);
cluster.simRemoveNode(newNode, false);
int counter = 0;
do {
trigger.run();

View File

@ -81,8 +81,8 @@ public class TestNodeLostTrigger extends SimSolrCloudTestCase {
Iterator<String> it = cluster.getLiveNodesSet().get().iterator();
String lostNodeName1 = it.next();
String lostNodeName2 = it.next();
cluster.simRemoveNode(lostNodeName1, true);
cluster.simRemoveNode(lostNodeName2, true);
cluster.simRemoveNode(lostNodeName1, false);
cluster.simRemoveNode(lostNodeName2, false);
timeSource.sleep(1000);
AtomicBoolean fired = new AtomicBoolean(false);
@ -223,7 +223,7 @@ public class TestNodeLostTrigger extends SimSolrCloudTestCase {
trigger.run(); // starts tracking live nodes
// stop the newly created node
cluster.simRemoveNode(newNode, true);
cluster.simRemoveNode(newNode, false);
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
@ -263,7 +263,7 @@ public class TestNodeLostTrigger extends SimSolrCloudTestCase {
trigger.run();
// stop the newly created node
cluster.simRemoveNode(newNode, true);
cluster.simRemoveNode(newNode, false);
trigger.run(); // this run should detect the lost node
trigger.close(); // close the old trigger

View File

@ -47,9 +47,15 @@ import static org.apache.solr.common.util.Utils.time;
import static org.apache.solr.common.util.Utils.timeElapsed;
public class PolicyHelper {
private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String POLICY_MAPPING_KEY = "PolicyHelper.policyMapping";
private static ThreadLocal<Map<String, String>> getPolicyMapping(SolrCloudManager cloudManager) {
return (ThreadLocal<Map<String, String>>)cloudManager.getObjectCache()
.computeIfAbsent(POLICY_MAPPING_KEY, k -> new ThreadLocal<>());
}
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
SolrCloudManager cloudManager,
Map<String, String> optionalPolicyMapping,
@ -59,6 +65,7 @@ public class PolicyHelper {
int pullReplicas,
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
ThreadLocal<Map<String, String>> policyMapping = getPolicyMapping(cloudManager);
ClusterStateProvider stateProvider = new DelegatingClusterStateProvider(cloudManager.getClusterStateProvider()) {
@Override
public String getPolicyNameByCollection(String coll) {