SOLR-12923: Fix some issues w/concurrency and exception swallowing in SimClusterStateProvider/SimCloudManager

There are 3 tightly related bug fixes in these changes:

1) ConcurrentModificationExceptions were being thrown by some SimClusterStateProvider methods when
   creating collections/replicas due to the use of ArrayLists nodeReplicaMap. These ArrayLists were changed
   to use synchronizedList wrappers.
2) The Exceptions from #1 were being swallowed/hidden by code using SimCloudManager.submit() w/o checking
   the result of the resulting Future object. (As a result, tests waiting for a particular ClusterShape
   would timeout regardless of how long they waited.)   To protect against "silent" failures like this,
   this SimCloudManager.submit() has been updated to wrap all input Callables such that any uncaught errors
   will be logged and "counted."  SimSolrCloudTestCase will ensure a suite level failure if any such failures
   are counted.
3) The changes in #2 exposed additional concurrency problems with the Callables involved in leader election:
   These would frequently throw IllegalStateExceptions due to assumptions about the state/existence of
   replicas when the Callables were created vs when they were later run -- notably a Callable may have been
   created that held a reference to a Slice, but by the time that Callable was run the collection (or a
   node, etc...) refered to by that Slice may have been deleted.  While fixing this, the leader election
   logic was also cleaned up such that adding a replica only triggers leader election for that shard, not
   every shard in the collection.

While auditing this code, cleanup was also done to ensure all usage of SimClusterStateProvider.lock was
also cleaned up to remove all risky points where an exception may have been possible after aquiring the
lock but before the try/finally that ensured it would be unlocked.
This commit is contained in:
Chris Hostetter 2019-03-14 22:27:48 -07:00
parent fbd05167f4
commit 76babf876a
6 changed files with 367 additions and 211 deletions

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -124,7 +125,12 @@ public class SimCloudManager implements SolrCloudManager {
private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>();
private final MockSearchableSolrClient solrClient;
private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
/**
* @see #submit
* @see #getBackgroundTaskFailureCount
* @see LoggingCallable
*/
private final AtomicLong backgroundTaskFailureCounter = new AtomicLong(0);
private ExecutorService simCloudManagerPool;
private Overseer.OverseerThread triggerThread;
@ -565,13 +571,26 @@ public class SimCloudManager implements SolrCloudManager {
/**
* Submit a task to execute in a thread pool.
* Every callable submitted will be wrapped such that errors not handled w/in the callable
* will be logged and counted for later assertions.
*
* @param callable task to execute
* @return future to obtain results
* @see #getBackgroundTaskFailureCount
*/
public <T> Future<T> submit(Callable<T> callable) {
return simCloudManagerPool.submit(callable);
return simCloudManagerPool.submit(new LoggingCallable(backgroundTaskFailureCounter, callable));
}
/**
* Returns a total count of the number of tasks submitted to {@link #submit} that have failed
* with any throwable other then <code>InteruptedException</code>
*
* @see #submit
*/
public long getBackgroundTaskFailureCount() {
return backgroundTaskFailureCounter.get();
}
// ---------- type-safe methods to obtain simulator components ----------
public SimClusterStateProvider getSimClusterStateProvider() {
return clusterStateProvider;
@ -650,8 +669,16 @@ public class SimCloudManager implements SolrCloudManager {
@Override
public SolrResponse request(SolrRequest req) throws IOException {
try {
Future<SolrResponse> rsp = submit(() -> simHandleSolrRequest(req));
return rsp.get();
// NOTE: we're doing 2 odd things here:
// 1) rather then calling simHandleSolrRequest directly, we're submitting it to the
// executor service and immediately waiting on the Future.
// - This can introduce a delays if there are a lot of existing background tasks submitted
// 2) we use simCloudManagerPool directly, instead of using the public submit() method
// - this is because there may be "user level" errors (ie: bad input) deliberately generated
// by the testcase. we're going to immediately catch & re-throw any exceptions, so we don't
// need/want to be wrapped in a LoggingCallable w/getBackgroundTaskFailureCount() tracking
Future<SolrResponse> rsp = simCloudManagerPool.submit(() -> simHandleSolrRequest(req));
return rsp.get(120, TimeUnit.SECONDS); // longer then this and something is seriously wrong
} catch (Exception e) {
throw new IOException(e);
}
@ -885,6 +912,10 @@ public class SimCloudManager implements SolrCloudManager {
@Override
public void close() throws IOException {
// make sure we shutdown the pool first, so any in active background tasks get interupted
// before we start closing resources they may be using.
simCloudManagerPool.shutdownNow();
if (metricsHistoryHandler != null) {
IOUtils.closeQuietly(metricsHistoryHandler);
}
@ -900,7 +931,6 @@ public class SimCloudManager implements SolrCloudManager {
Thread.currentThread().interrupt();
}
IOUtils.closeQuietly(objectCache);
simCloudManagerPool.shutdownNow();
}
/**
@ -910,4 +940,37 @@ public class SimCloudManager implements SolrCloudManager {
public OverseerTriggerThread getOverseerTriggerThread() {
return ((OverseerTriggerThread) triggerThread.getThread());
}
/**
* Wrapper for any Callable that will log a warn/error in the event of InterruptException/Throwable.
* Also increments the passed in counter so the CloudManger can later report total errors programatically.
*
* @see #submit
* @see #getBackgroundTaskFailureCount
*/
private static final class LoggingCallable<T> implements Callable<T> {
final AtomicLong failCounter;
final Callable<T> inner;
public LoggingCallable(final AtomicLong failCounter, final Callable<T> inner) {
assert null != failCounter;
assert null != inner;
this.failCounter = failCounter;
this.inner = inner;
}
public T call() throws Exception {
try {
return inner.call();
} catch (InterruptedException ignored) {
log.warn("Callable interupted", ignored);
throw ignored;
} catch (Throwable t) {
failCounter.incrementAndGet();
log.error("Callable failed", t);
throw t;
}
}
}
}

View File

@ -217,7 +217,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
s.getReplicas().forEach(r -> {
ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties());
if (liveNodes.get().contains(r.getNodeName())) {
nodeReplicaMap.computeIfAbsent(r.getNodeName(), Utils.NEW_ARRAYLIST_FUN).add(ri);
nodeReplicaMap.computeIfAbsent(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN).add(ri);
}
});
});
@ -264,13 +264,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// todo: maybe hook up DistribStateManager /clusterstate.json watchers?
private ReplicaInfo getReplicaInfo(Replica r) {
List<ReplicaInfo> list = nodeReplicaMap.get(r.getNodeName());
if (list == null) {
return null;
}
for (ReplicaInfo ri : list) {
if (r.getCoreName().equals(ri.getCore())) {
return ri;
final List<ReplicaInfo> list = nodeReplicaMap.computeIfAbsent
(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
synchronized (list) {
for (ReplicaInfo ri : list) {
if (r.getCoreName().equals(ri.getCore())) {
return ri;
}
}
}
return null;
@ -286,7 +286,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new Exception("Node " + nodeId + " already exists");
}
createEphemeralLiveNode(nodeId);
nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
nodeReplicaMap.computeIfAbsent(nodeId, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
liveNodes.add(nodeId);
updateOverseerLeader();
}
@ -378,8 +378,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// this method needs to be called under a lock
private void setReplicaStates(String nodeId, Replica.State state, Set<String> changedCollections) {
List<ReplicaInfo> replicas = nodeReplicaMap.get(nodeId);
if (replicas != null) {
List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
synchronized (replicas) {
replicas.forEach(r -> {
r.getVariables().put(ZkStateReader.STATE_PROP, state.toString());
if (state != Replica.State.ACTIVE) {
@ -412,13 +412,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
liveNodes.add(nodeId);
createEphemeralLiveNode(nodeId);
Set<String> collections = new HashSet<>();
lock.lockInterruptibly();
try {
setReplicaStates(nodeId, Replica.State.RECOVERING, collections);
} finally {
lock.unlock();
}
cloudManager.getTimeSource().sleep(1000);
lock.lockInterruptibly();
try {
setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
@ -499,42 +502,44 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/
public void simAddReplica(String nodeId, ReplicaInfo replicaInfo, boolean runLeaderElection) throws Exception {
ensureNotClosed();
// make sure SolrCore name is unique across cluster and coreNodeName within collection
for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
for (ReplicaInfo ri : e.getValue()) {
if (ri.getCore().equals(replicaInfo.getCore())) {
throw new Exception("Duplicate SolrCore name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
}
if (ri.getName().equals(replicaInfo.getName()) && ri.getCollection().equals(replicaInfo.getCollection())) {
throw new Exception("Duplicate coreNode name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
}
}
}
if (!liveNodes.contains(nodeId)) {
throw new Exception("Target node " + nodeId + " is not live: " + liveNodes);
}
// verify info
if (replicaInfo.getCore() == null) {
throw new Exception("Missing core: " + replicaInfo);
}
// XXX replica info is not supposed to have this as a variable
replicaInfo.getVariables().remove(ZkStateReader.SHARD_ID_PROP);
if (replicaInfo.getName() == null) {
throw new Exception("Missing name: " + replicaInfo);
}
if (replicaInfo.getNode() == null) {
throw new Exception("Missing node: " + replicaInfo);
}
if (!replicaInfo.getNode().equals(nodeId)) {
throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
}
lock.lockInterruptibly();
try {
// make sure SolrCore name is unique across cluster and coreNodeName within collection
for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
final List<ReplicaInfo> replicas = e.getValue();
synchronized (replicas) {
for (ReplicaInfo ri : replicas) {
if (ri.getCore().equals(replicaInfo.getCore())) {
throw new Exception("Duplicate SolrCore name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
}
if (ri.getName().equals(replicaInfo.getName()) && ri.getCollection().equals(replicaInfo.getCollection())) {
throw new Exception("Duplicate coreNode name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
}
}
}
}
if (!liveNodes.contains(nodeId)) {
throw new Exception("Target node " + nodeId + " is not live: " + liveNodes);
}
// verify info
if (replicaInfo.getCore() == null) {
throw new Exception("Missing core: " + replicaInfo);
}
// XXX replica info is not supposed to have this as a variable
replicaInfo.getVariables().remove(ZkStateReader.SHARD_ID_PROP);
if (replicaInfo.getName() == null) {
throw new Exception("Missing name: " + replicaInfo);
}
if (replicaInfo.getNode() == null) {
throw new Exception("Missing node: " + replicaInfo);
}
if (!replicaInfo.getNode().equals(nodeId)) {
throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
}
opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name());
List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
// mark replica as active
replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
// add a property expected in Policy calculations, if missing
@ -543,8 +548,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaInfo.getVariables().put(Variable.coreidxsize,
new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)));
}
replicas.add(replicaInfo);
nodeReplicaMap.computeIfAbsent(nodeId, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN).add(replicaInfo);
colShardReplicaMap.computeIfAbsent(replicaInfo.getCollection(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(replicaInfo.getShard(), s -> new ArrayList<>())
.add(replicaInfo);
@ -575,7 +579,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
collectionsStatesRef.set(null);
log.trace("-- simAddReplica {}", replicaInfo);
if (runLeaderElection) {
simRunLeaderElection(Collections.singleton(replicaInfo.getCollection()), true);
simRunLeaderElection(replicaInfo.getCollection(), replicaInfo.getShard(), true);
}
} finally {
lock.unlock();
@ -589,35 +593,40 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/
public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception {
ensureNotClosed();
lock.lockInterruptibly();
List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
try {
for (int i = 0; i < replicas.size(); i++) {
if (coreNodeName.equals(replicas.get(i).getName())) {
ReplicaInfo ri = replicas.remove(i);
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
final List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent
(nodeId, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
synchronized (replicas) {
for (int i = 0; i < replicas.size(); i++) {
if (coreNodeName.equals(replicas.get(i).getName())) {
ReplicaInfo ri = replicas.remove(i);
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
.remove(ri);
collectionsStatesRef.set(null);
collectionsStatesRef.set(null);
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
// update the number of cores in node values, if node is live
if (liveNodes.contains(nodeId)) {
Integer cores = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES);
if (cores == null || cores == 0) {
throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId);
// update the number of cores in node values, if node is live
if (liveNodes.contains(nodeId)) {
Integer cores = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES);
if (cores == null || cores == 0) {
throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId);
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores - 1);
Integer disk = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.DISK);
if (disk == null || disk == 0) {
throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId);
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 1);
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores - 1);
Integer disk = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.DISK);
if (disk == null || disk == 0) {
throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId);
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 1);
log.trace("-- simRemoveReplica {}", ri);
simRunLeaderElection(ri.getCollection(), ri.getShard(), true);
return;
}
log.trace("-- simRemoveReplica {}", ri);
simRunLeaderElection(Collections.singleton(ri.getCollection()), true);
return;
}
}
throw new Exception("Replica " + coreNodeName + " not found on node " + nodeId);
@ -677,51 +686,70 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
ClusterState state = getClusterState();
state.forEachCollection(dc -> {
if (!collections.contains(dc.getName())) {
return;
}
dc.getSlices().forEach(s -> {
if (s.getState() == Slice.State.INACTIVE) {
log.trace("-- slice state is {}, skip leader election {} / {}", s.getState(), dc.getName(), s.getName());
if (!collections.contains(dc.getName())) {
return;
}
if (s.getState() != Slice.State.ACTIVE) {
log.trace("-- slice state is {}, but I will run leader election {} / {}", s.getState(), dc.getName(), s.getName());
}
if (s.getLeader() != null) {
log.trace("-- already has leader {} / {}", dc.getName(), s.getName());
return;
}
if (s.getReplicas().isEmpty()) {
log.trace("-- no replicas in {} / {}", dc.getName(), s.getName());
return;
}
log.trace("-- submit leader election for {} / {}", dc.getName(), s.getName());
cloudManager.submit(() -> {
simRunLeaderElection(dc.getName(), s, saveClusterState);
return true;
});
dc.getSlices().forEach(s -> {
log.trace("-- submit leader election for {} / {}", dc.getName(), s.getName());
cloudManager.submit(() -> {
simRunLeaderElection(dc.getName(), s.getName(), saveClusterState);
return true;
});
});
});
});
}
private void simRunLeaderElection(String collection, Slice s, boolean saveState) throws Exception {
AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
private void simRunLeaderElection(final String collection, final String slice,
final boolean saveState) throws Exception {
log.trace("Attempting leader election ({} / {})", collection, slice);
final AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
lock.lockInterruptibly();
try {
Replica leader = s.getLeader();
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
log.trace("Running leader election for {} / {}", collection, s.getName());
if (s.getReplicas().isEmpty()) { // no replicas - punt
log.trace("-- no replicas in {} / {}", collection, s.getName());
return;
}
ActionThrottle lt = getThrottle(collection, s.getName());
synchronized (lt) {
// collect all active and live
List<ReplicaInfo> active = new ArrayList<>();
AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
s.getReplicas().forEach(r -> {
final ClusterState state = getClusterState();
final DocCollection col = state.getCollectionOrNull(collection);
if (null == col) {
log.trace("-- collection does not exist (anymore), skipping leader election ({} / {})",
collection, slice);
return;
}
final Slice s = col.getSlice(slice);
if (null == s) {
log.trace("-- slice does not exist, skipping leader election ({} / {})",
collection, slice);
return;
}
if (s.getState() == Slice.State.INACTIVE) {
log.trace("-- slice state is {}, skipping leader election ({} / {})",
s.getState(), collection, slice);
return;
}
if (s.getReplicas().isEmpty()) {
log.trace("-- no replicas, skipping leader election ({} / {})", collection, slice);
return;
}
final Replica leader = s.getLeader();
if (null != leader && liveNodes.contains(leader.getNodeName())) {
log.trace("-- already has livenode leader, skipping leader election {} / {}",
collection, slice);
return;
}
if (s.getState() != Slice.State.ACTIVE) {
log.trace("-- slice state is {}, but I will run leader election anyway ({} / {})",
s.getState(), collection, slice);
}
log.debug("Running leader election ({} / {})", collection, slice);
ActionThrottle lt = getThrottle(collection, s.getName());
synchronized (lt) {
// collect all active and live
List<ReplicaInfo> active = new ArrayList<>();
AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
s.getReplicas().forEach(r -> {
// find our ReplicaInfo for this replica
ReplicaInfo ri = getReplicaInfo(r);
if (ri == null) {
@ -746,39 +774,36 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
});
if (alreadyHasLeader.get()) {
log.trace("-- already has leader {} / {}: {}", collection, s.getName(), s);
return;
}
if (active.isEmpty()) {
log.warn("Can't find any active replicas for {} / {}: {}", collection, s.getName(), s);
log.debug("-- liveNodes: {}", liveNodes.get());
return;
}
// pick first active one
ReplicaInfo ri = null;
for (ReplicaInfo a : active) {
if (!a.getType().equals(Replica.Type.PULL)) {
ri = a;
break;
}
}
if (ri == null) {
log.warn("-- can't find any suitable replica type for {} / {}: {}", collection, s.getName(), s);
return;
}
// now mark the leader election throttle
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
log.debug("-- elected new leader for {} / {} (currentVersion={}): {}", collection,
s.getName(), clusterStateVersion, ri);
stateChanged.set(true);
if (alreadyHasLeader.get()) {
log.trace("-- already has leader {} / {}: {}", collection, s.getName(), s);
return;
}
} else {
log.trace("-- already has leader for {} / {}", collection, s.getName());
if (active.isEmpty()) {
log.warn("Can't find any active replicas for {} / {}: {}", collection, s.getName(), s);
log.debug("-- liveNodes: {}", liveNodes.get());
return;
}
// pick first active one
ReplicaInfo ri = null;
for (ReplicaInfo a : active) {
if (!a.getType().equals(Replica.Type.PULL)) {
ri = a;
break;
}
}
if (ri == null) {
log.warn("-- can't find any suitable replica type for {} / {}: {}", collection, s.getName(), s);
return;
}
// now mark the leader election throttle
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
log.debug("-- elected new leader for {} / {} (currentVersion={}): {}", collection,
s.getName(), clusterStateVersion, ri);
stateChanged.set(true);
}
} finally {
if (stateChanged.get() || saveState) {
@ -837,8 +862,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
final String withCollectionShard = wcShard;
lock.lockInterruptibly();
ZkWriteCommand cmd = ZkWriteCommand.noop();
lock.lockInterruptibly();
try {
cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
if (cmd.noop) {
@ -979,6 +1005,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (async != null) {
results.add(CoreAdminParams.REQUESTID, async);
}
lock.lockInterruptibly();
try {
collProperties.remove(collection);
@ -991,26 +1018,28 @@ public class SimClusterStateProvider implements ClusterStateProvider {
opDelays.remove(collection);
nodeReplicaMap.forEach((n, replicas) -> {
for (Iterator<ReplicaInfo> it = replicas.iterator(); it.hasNext(); ) {
ReplicaInfo ri = it.next();
if (ri.getCollection().equals(collection)) {
it.remove();
// update the number of cores in node values
Integer cores = (Integer) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
if (cores != null) { // node is still up
if (cores == 0) {
throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n);
}
try {
cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores - 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted");
synchronized (replicas) {
for (Iterator<ReplicaInfo> it = replicas.iterator(); it.hasNext(); ) {
ReplicaInfo ri = it.next();
if (ri.getCollection().equals(collection)) {
it.remove();
// update the number of cores in node values
Integer cores = (Integer) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
if (cores != null) { // node is still up
if (cores == 0) {
throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n);
}
try {
cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores - 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted");
}
}
}
}
}
}
});
});
collectionsStatesRef.set(null);
results.add("success", "");
} catch (Exception e) {
@ -1025,8 +1054,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/
public void simDeleteAllCollections() throws Exception {
lock.lockInterruptibly();
collectionsStatesRef.set(null);
try {
collectionsStatesRef.set(null);
collProperties.clear();
sliceProperties.clear();
leaderThrottles.clear();
@ -1144,15 +1174,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET)
);
try {
simAddReplica(addReplicasProps, results);
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
// this also takes care of leader election
simAddReplica(addReplicasProps, results);
} catch (Exception e) {
throw new RuntimeException(e);
}
collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
} finally {
lock.unlock();
@ -1373,14 +1402,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
nodeReplicaMap.forEach((n, replicas) -> {
Iterator<ReplicaInfo> it = replicas.iterator();
while (it.hasNext()) {
ReplicaInfo ri = it.next();
if (ri.getCollection().equals(collectionName) && ri.getShard().equals(sliceName)) {
it.remove();
synchronized (replicas) {
Iterator<ReplicaInfo> it = replicas.iterator();
while (it.hasNext()) {
ReplicaInfo ri = it.next();
if (ri.getCollection().equals(collectionName) && ri.getShard().equals(sliceName)) {
it.remove();
}
}
}
}
});
});
collectionsStatesRef.set(null);
results.add("success", "");
} catch (Exception e) {
@ -1791,9 +1822,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value
*/
public void simSetCollectionProperty(String coll, String key, String value) throws Exception {
Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
lock.lockInterruptibly();
try {
final Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
if (value == null) {
props.remove(key);
} else {
@ -1812,9 +1843,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param properties slice properties
*/
public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception {
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
lock.lockInterruptibly();
try {
final Map<String, Object> sliceProps = sliceProperties.computeIfAbsent
(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
sliceProps.clear();
if (properties != null) {
sliceProps.putAll(properties);
@ -1954,13 +1986,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @return copy of the list of replicas on that node, or empty list if none
*/
public List<ReplicaInfo> simGetReplicaInfos(String node) {
List<ReplicaInfo> replicas = nodeReplicaMap.get(node);
if (replicas == null) {
return Collections.emptyList();
} else {
// make a defensive copy to avoid ConcurrentModificationException
return Arrays.asList(replicas.toArray(new ReplicaInfo[0]));
}
final List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent
(node, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
// make a defensive copy to avoid ConcurrentModificationException
return Arrays.asList(replicas.toArray(new ReplicaInfo[replicas.size()]));
}
public List<ReplicaInfo> simGetReplicaInfos(String collection, String shard) {
@ -1983,9 +2012,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
public Map<String, Map<String, Object>> simGetCollectionStats() throws IOException, InterruptedException {
Map<String, Map<String, Object>> stats = new TreeMap<>();
lock.lockInterruptibly();
try {
final Map<String, Map<String, Object>> stats = new TreeMap<>();
collectionsStatesRef.set(null);
ClusterState state = getClusterState();
state.forEachCollection(coll -> {
@ -2119,39 +2148,46 @@ public class SimClusterStateProvider implements ClusterStateProvider {
@Override
public ClusterState getClusterState() throws IOException {
ensureNotClosed();
Map<String, DocCollection> states = getCollectionStates();
ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
return state;
lock.lock();
try {
Map<String, DocCollection> states = getCollectionStates();
ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
return state;
} finally {
lock.unlock();
}
}
// this method uses a simple cache in collectionsStatesRef. Operations that modify
// cluster state should always reset this cache so that the changes become visible
private Map<String, DocCollection> getCollectionStates() throws IOException {
Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
if (collectionStates != null) {
return collectionStates;
}
lock.lock();
collectionsStatesRef.set(null);
log.debug("** creating new collection states, currentVersion={}", clusterStateVersion);
try {
Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
if (collectionStates != null) {
return collectionStates;
}
collectionsStatesRef.set(null);
log.debug("** creating new collection states, currentVersion={}", clusterStateVersion);
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
nodeReplicaMap.forEach((n, replicas) -> {
replicas.forEach(ri -> {
Map<String, Object> props;
synchronized (ri) {
props = new HashMap<>(ri.getVariables());
synchronized (replicas) {
replicas.forEach(ri -> {
Map<String, Object> props;
synchronized (ri) {
props = new HashMap<>(ri.getVariables());
}
props.put(ZkStateReader.NODE_NAME_PROP, n);
props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
Replica r = new Replica(ri.getName(), props);
collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
.computeIfAbsent(ri.getShard(), s -> new HashMap<>())
.put(ri.getName(), r);
});
}
props.put(ZkStateReader.NODE_NAME_PROP, n);
props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
Replica r = new Replica(ri.getName(), props);
collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
.computeIfAbsent(ri.getShard(), s -> new HashMap<>())
.put(ri.getName(), r);
});
});
// add empty slices
sliceProperties.forEach((c, perSliceProps) -> {

View File

@ -32,6 +32,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,12 +59,29 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
clusterNodeCount = nodeCount;
}
@After
private void checkBackgroundTaskFailureCount() {
if (cluster != null) {
assertBackgroundTaskFailureCount(cluster);
}
}
protected static void assertBackgroundTaskFailureCount(SimCloudManager c) {
assert null != c;
assertEquals("Cluster had background tasks submitted which failed",
0, c.getBackgroundTaskFailureCount());
}
@AfterClass
public static void shutdownCluster() throws Exception {
if (cluster != null) {
cluster.close();
try {
cluster.close();
assertBackgroundTaskFailureCount(cluster);
} finally {
cluster = null;
}
}
cluster = null;
}
protected static void assertAutoscalingUpdateComplete() throws Exception {

View File

@ -46,8 +46,10 @@ import org.slf4j.LoggerFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
/**
*
* TODO: test can easily fail w/a count that is too low by a small amount (unrelated to BATCH_SIZE)
* TODO: test should not need arbitrary sleep calls if code + test are both working properly w/o concurrency bugs
*/
@org.apache.lucene.util.LuceneTestCase.AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-12923")
@TimeoutSuite(millis = 48 * 3600 * 1000)
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.ComputePlanAction=INFO;org.apache.solr.cloud.autoscaling.ExecutePlanAction=DEBUG;org.apache.solr.cloud.autoscaling.ScheduledTriggers=DEBUG")
//@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.CloudTestUtils=TRACE")

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
@ -246,6 +247,41 @@ public class TestSimLargeCluster extends SimSolrCloudTestCase {
newMoveReplicaOps - moveReplicaOps < flakyReplicas);
}
@Test
public void testCreateLargeSimCollections() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
final int numCollections = atLeast(10);
for (int i = 0; i < numCollections; i++) {
final int numShards = TestUtil.nextInt(random(), 5, 20);
final int nReps = TestUtil.nextInt(random(), 10, 25);
final int tReps = TestUtil.nextInt(random(), 10, 25);
final int pReps = TestUtil.nextInt(random(), 10, 25);
final int repsPerShard = (nReps + tReps + pReps);
final int totalCores = repsPerShard * numShards;
final int maxShardsPerNode = atLeast(2) + (totalCores / NUM_NODES);
final String name = "large_sim_collection" + i;
final CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection
(name, "conf", numShards, nReps, tReps, pReps);
create.setMaxShardsPerNode(maxShardsPerNode);
create.setAutoAddReplicas(false);
log.info("CREATE: {}", create);
create.process(solrClient);
// Since our current goal is to try and find situations where cores are just flat out missing
// no matter how long we wait, let's be excessive and generous in our timeout.
// (REMINDER: this uses the cluster's timesource, and ADDREPLICA has a hardcoded delay of 500ms)
CloudTestUtils.waitForState(cluster, name, totalCores, TimeUnit.SECONDS,
CloudTestUtils.clusterShape(numShards, repsPerShard, false, true));
final CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(name);
log.info("DELETE: {}", delete);
delete.process(solrClient);
}
}
@Test
public void testAddNode() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();

View File

@ -87,6 +87,7 @@ public class Utils {
public static final Function NEW_LINKED_HASHMAP_FUN = o -> new LinkedHashMap<>();
public static final Function NEW_ATOMICLONG_FUN = o -> new AtomicLong();
public static final Function NEW_ARRAYLIST_FUN = o -> new ArrayList<>();
public static final Function NEW_SYNCHRONIZED_ARRAYLIST_FUN = o -> Collections.synchronizedList(new ArrayList<>());
public static final Function NEW_HASHSET_FUN = o -> new HashSet<>();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());