mirror of https://github.com/apache/lucene.git
SOLR-14486: Autoscaling simulation framework should stop using /clusterstate.json.
This commit is contained in:
parent
44fc5b989a
commit
57b7d8a8db
|
@ -71,6 +71,9 @@ Other Changes
|
|||
If you have security concerns or other reasons to disable the Admin UI, you can modify `SOLR_ADMIN_UI_DISABLED`
|
||||
`solr.in.sh`/`solr.in.cmd` at start. (marcussorealheis)
|
||||
|
||||
* SOLR-14486: Autoscaling simulation framework no longer creates /clusterstate.json (format 1),
|
||||
instead it creates individual per-collection /state.json files (format 2). (ab)
|
||||
|
||||
================== 8.6.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -181,7 +181,6 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
if (distribStateManager == null) {
|
||||
this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
|
||||
// init common paths
|
||||
stateManager.makePath(ZkStateReader.CLUSTER_STATE);
|
||||
stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
|
||||
stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
|
||||
|
|
|
@ -48,7 +48,9 @@ import java.util.stream.Collectors;
|
|||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
|
@ -98,6 +100,7 @@ import org.apache.solr.core.SolrInfoBean;
|
|||
import org.apache.solr.metrics.SolrMetricManager;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -119,8 +122,8 @@ import static org.apache.solr.common.params.CommonParams.NAME;
|
|||
* <li>using autoscaling policy for replica placements</li>
|
||||
* <li>maintaining and up-to-date list of /live_nodes and nodeAdded / nodeLost markers</li>
|
||||
* <li>running a simulated leader election on collection changes (with throttling), when needed</li>
|
||||
* <li>maintaining an up-to-date /clusterstate.json (single file format), which also tracks replica states,
|
||||
* leader election changes, replica property changes, etc. Note: this file is only written,
|
||||
* <li>maintaining an up-to-date /state.json per-collection files, which also track replica states,
|
||||
* leader election changes, replica property changes, etc. Note: these files are only written,
|
||||
* but never read by the framework!</li>
|
||||
* <li>maintaining an up-to-date /clusterprops.json. Note: this file is only written, but never read by the
|
||||
* framework!</li>
|
||||
|
@ -153,12 +156,131 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
private volatile int clusterStateVersion = 0;
|
||||
private volatile String overseerLeader = null;
|
||||
|
||||
private volatile Map<String, Object> lastSavedProperties = null;
|
||||
|
||||
private final AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
|
||||
private class CachedCollectionRef {
|
||||
private final String name;
|
||||
private int zkVersion;
|
||||
private DocCollection coll;
|
||||
ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
CachedCollectionRef(String name, int zkVersion) {
|
||||
this.name = name;
|
||||
this.zkVersion = zkVersion;
|
||||
}
|
||||
|
||||
public DocCollection getColl() throws InterruptedException, IOException {
|
||||
DocCollection dc = coll;
|
||||
if (dc != null) {
|
||||
return dc;
|
||||
}
|
||||
lock.lock();
|
||||
try {
|
||||
if (coll != null) {
|
||||
return coll;
|
||||
} else {
|
||||
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
synchronized (replicas) {
|
||||
replicas.forEach(ri -> {
|
||||
if (!ri.getCollection().equals(name)) {
|
||||
return;
|
||||
}
|
||||
Map<String, Object> props;
|
||||
synchronized (ri) {
|
||||
props = new HashMap<>(ri.getVariables());
|
||||
}
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, n);
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
|
||||
props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
|
||||
props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
|
||||
Replica r = new Replica(ri.getName(), props, ri.getCollection(), ri.getShard());
|
||||
collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), s -> new HashMap<>())
|
||||
.put(ri.getName(), r);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// add empty slices
|
||||
sliceProperties.forEach((c, perSliceProps) -> {
|
||||
if (!c.equals(name)) {
|
||||
return;
|
||||
}
|
||||
perSliceProps.forEach((slice, props) -> {
|
||||
collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
|
||||
});
|
||||
});
|
||||
// add empty collections
|
||||
collProperties.keySet().forEach(c -> {
|
||||
if (!c.equals(name)) {
|
||||
return;
|
||||
}
|
||||
collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
|
||||
});
|
||||
|
||||
Map<String, Map<String, Replica>> shards = collMap.get(name);
|
||||
Map<String, Slice> slices = new HashMap<>();
|
||||
shards.forEach((s, replicas) -> {
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(name, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
|
||||
Slice slice = new Slice(s, replicas, sliceProps, name);
|
||||
slices.put(s, slice);
|
||||
});
|
||||
Map<String, Object> collProps = collProperties.computeIfAbsent(name, c -> new ConcurrentHashMap<>());
|
||||
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
|
||||
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
|
||||
String path = ZkStateReader.getCollectionPath(name);
|
||||
coll = new DocCollection(name, slices, collProps, router, zkVersion + 1, path);
|
||||
try {
|
||||
SimDistribStateManager stateManager = cloudManager.getSimDistribStateManager();
|
||||
byte[] data = Utils.toJSON(Collections.singletonMap(name, coll));
|
||||
if (!stateManager.hasData(path)) {
|
||||
try {
|
||||
stateManager.makePath(path, data, CreateMode.PERSISTENT, true);
|
||||
} catch (AlreadyExistsException e) {
|
||||
// try updating
|
||||
stateManager.setData(path, data, zkVersion);
|
||||
}
|
||||
} else {
|
||||
stateManager.setData(path, data, zkVersion);
|
||||
}
|
||||
// verify version
|
||||
VersionedData vd = stateManager.getData(path);
|
||||
assert vd.getVersion() == zkVersion + 1;
|
||||
zkVersion++;
|
||||
} catch (KeeperException | BadVersionException e) {
|
||||
// should never happen?
|
||||
throw new RuntimeException("error saving " + coll, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return coll;
|
||||
}
|
||||
|
||||
public int getZkVersion() {
|
||||
lock.lock();
|
||||
try {
|
||||
return zkVersion;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidate() {
|
||||
lock.lock();
|
||||
try {
|
||||
coll = null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final Map<String, CachedCollectionRef> collectionsStatesRef = new ConcurrentHashMap<>();
|
||||
|
||||
private final Random bulkUpdateRandom = new Random(0);
|
||||
|
||||
|
@ -207,6 +329,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
sliceProperties.clear();
|
||||
nodeReplicaMap.clear();
|
||||
liveNodes.clear();
|
||||
collectionsStatesRef.clear();
|
||||
for (String nodeId : stateManager.listData(ZkStateReader.LIVE_NODES_ZKNODE)) {
|
||||
if (stateManager.hasData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId)) {
|
||||
stateManager.removeData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, -1);
|
||||
|
@ -223,6 +346,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
createEphemeralLiveNode(nodeId);
|
||||
}
|
||||
initialState.forEachCollection(dc -> {
|
||||
// DocCollection will be created later
|
||||
collectionsStatesRef.put(dc.getName(), new CachedCollectionRef(dc.getName(), dc.getZNodeVersion()));
|
||||
collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
|
||||
opDelays.computeIfAbsent(dc.getName(), Utils.NEW_HASHMAP_FUN).putAll(defaultOpDelays);
|
||||
dc.getSlices().forEach(s -> {
|
||||
|
@ -248,7 +373,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
});
|
||||
});
|
||||
});
|
||||
collectionsStatesRef.set(null);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -287,8 +411,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
return nodes.get(random.nextInt(nodes.size()));
|
||||
}
|
||||
|
||||
// todo: maybe hook up DistribStateManager /clusterstate.json watchers?
|
||||
|
||||
private ReplicaInfo getReplicaInfo(Replica r) {
|
||||
final List<ReplicaInfo> list = nodeReplicaMap.computeIfAbsent
|
||||
(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
|
||||
|
@ -331,8 +453,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
// mark every replica on that node as down
|
||||
boolean res = liveNodes.remove(nodeId);
|
||||
setReplicaStates(nodeId, Replica.State.DOWN, collections);
|
||||
if (!collections.isEmpty()) {
|
||||
collectionsStatesRef.set(null);
|
||||
for (String collection : collections) {
|
||||
collectionsStatesRef.get(collection).invalidate();;
|
||||
}
|
||||
// remove ephemeral nodes
|
||||
stateManager.getRoot().removeEphemeralChildren(nodeId);
|
||||
|
@ -363,7 +485,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
try {
|
||||
Set<String> myNodes = new HashSet<>(nodeReplicaMap.keySet());
|
||||
myNodes.removeAll(liveNodes.get());
|
||||
collectionsStatesRef.set(null);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -452,7 +573,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
try {
|
||||
setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
|
||||
if (!collections.isEmpty()) {
|
||||
collectionsStatesRef.set(null);
|
||||
collections.forEach(c -> collectionsStatesRef.get(c).invalidate());
|
||||
simRunLeaderElection(collections, true);
|
||||
return true;
|
||||
} else {
|
||||
|
@ -604,7 +725,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
cloudManager.getMetricManager().registerGauge(null, registry,
|
||||
() -> replicaSize, "", true, Type.CORE_IDX.metricsAttribute);
|
||||
// at this point nuke our cached DocCollection state
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(replicaInfo.getCollection()).invalidate();
|
||||
log.trace("-- simAddReplica {}", replicaInfo);
|
||||
if (runLeaderElection) {
|
||||
simRunLeaderElection(replicaInfo.getCollection(), replicaInfo.getShard(), true);
|
||||
|
@ -633,7 +754,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
|
||||
.remove(ri);
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(ri.getCollection()).invalidate();
|
||||
|
||||
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
|
||||
|
||||
|
@ -668,26 +789,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save clusterstate.json to {@link DistribStateManager}.
|
||||
* @return saved state
|
||||
*/
|
||||
private ClusterState saveClusterState(ClusterState state) throws IOException {
|
||||
ensureNotClosed();
|
||||
byte[] data = Utils.toJSON(state);
|
||||
try {
|
||||
VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
|
||||
int version = oldData != null ? oldData.getVersion() : 0;
|
||||
assert clusterStateVersion == version : "local clusterStateVersion out of sync";
|
||||
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
|
||||
log.debug("** saved cluster state version {}", version);
|
||||
clusterStateVersion++;
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delay an operation by a configured amount.
|
||||
* @param collection collection name
|
||||
|
@ -725,7 +826,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (saveClusterState) {
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
collectionsStatesRef.set(null);
|
||||
collections.forEach(c -> collectionsStatesRef.get(c).invalidate());
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -865,13 +966,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("-- elected new leader for {} / {} (currentVersion={}): {}", collection,
|
||||
s.getName(), clusterStateVersion, ri);
|
||||
s.getName(), col.getZNodeVersion(), ri);
|
||||
}
|
||||
stateChanged.set(true);
|
||||
}
|
||||
} finally {
|
||||
if (stateChanged.get() || saveState) {
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(collection).invalidate();
|
||||
}
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -889,7 +990,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
|
||||
final String collectionName = props.getStr(NAME);
|
||||
log.debug("-- simCreateCollection {}, currentVersion={}", collectionName, clusterStateVersion);
|
||||
log.debug("-- simCreateCollection {}", collectionName);
|
||||
|
||||
String router = props.getStr("router.name", DocRouter.DEFAULT_NAME);
|
||||
String policy = props.getStr(Policy.POLICY);
|
||||
|
@ -903,12 +1004,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
CreateCollectionCmd.checkReplicaTypes(props);
|
||||
|
||||
// always force getting fresh state
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
collectionsStatesRef.set(null);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
final ClusterState clusterState = getClusterState();
|
||||
|
||||
String withCollection = props.getStr(CollectionAdminParams.WITH_COLLECTION);
|
||||
|
@ -962,8 +1057,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
CollectionAdminParams.COLOCATED_WITH, collectionName);
|
||||
cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
|
||||
}
|
||||
// force recreation of collection states
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.put(collectionName, new CachedCollectionRef(collectionName, 0));
|
||||
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
@ -1043,7 +1137,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
// force recreation of collection states
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(collectionName).invalidate();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -1057,7 +1151,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
results.add("success", "");
|
||||
log.debug("-- finished createCollection {}, currentVersion={}", collectionName, clusterStateVersion);
|
||||
log.debug("-- finished createCollection {}", collectionName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1106,7 +1200,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
});
|
||||
collectionsStatesRef.set(null);
|
||||
cloudManager.getDistribStateManager().removeRecursively(ZkStateReader.getCollectionPath(collection), true, true);
|
||||
collectionsStatesRef.remove(collection);
|
||||
results.add("success", "");
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception", e);
|
||||
|
@ -1121,7 +1216,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
public void simDeleteAllCollections() throws Exception {
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.keySet().forEach(name -> {
|
||||
try {
|
||||
cloudManager.getDistribStateManager().removeRecursively(ZkStateReader.getCollectionPath(name), true, true);
|
||||
} catch (Exception e) {
|
||||
log.error("Unable to delete collection state.json");
|
||||
}
|
||||
});
|
||||
|
||||
collProperties.clear();
|
||||
sliceProperties.clear();
|
||||
|
@ -1468,7 +1569,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
|
||||
// invalidate cached state
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(collectionName).invalidate();
|
||||
} finally {
|
||||
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
|
||||
lock.unlock();
|
||||
|
@ -1516,7 +1617,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
});
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(collectionName).invalidate();
|
||||
results.add("success", "");
|
||||
} catch (Exception e) {
|
||||
results.add("failure", e.toString());
|
||||
|
@ -2004,7 +2105,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
props.clear();
|
||||
props.putAll(properties);
|
||||
}
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(coll).invalidate();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -2025,7 +2126,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
} else {
|
||||
props.put(key, value);
|
||||
}
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(coll).invalidate();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -2046,7 +2147,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (properties != null) {
|
||||
sliceProps.putAll(properties);
|
||||
}
|
||||
collectionsStatesRef.set(null);
|
||||
collectionsStatesRef.get(coll).invalidate();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -2247,7 +2348,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
lock.lockInterruptibly();
|
||||
try {
|
||||
final Map<String, Map<String, Object>> stats = new TreeMap<>();
|
||||
collectionsStatesRef.set(null);
|
||||
ClusterState state = getClusterState();
|
||||
state.forEachCollection(coll -> {
|
||||
Map<String, Object> perColl = new LinkedHashMap<>();
|
||||
|
@ -2286,7 +2386,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
continue;
|
||||
}
|
||||
AtomicLong buffered = (AtomicLong)sliceProperties.get(coll.getName()).get(s.getName()).get(BUFFERED_UPDATES);
|
||||
AtomicLong buffered = (AtomicLong)sliceProperties
|
||||
.getOrDefault(coll.getName(), Collections.emptyMap())
|
||||
.getOrDefault(s.getName(), Collections.emptyMap()).get(BUFFERED_UPDATES);
|
||||
if (buffered != null) {
|
||||
bufferedDocs += buffered.get();
|
||||
}
|
||||
|
@ -2389,7 +2491,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
lock.lockInterruptibly();
|
||||
try {
|
||||
Map<String, DocCollection> states = getCollectionStates();
|
||||
ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
|
||||
ClusterState state = new ClusterState(0, liveNodes.get(), states);
|
||||
return state;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
|
@ -2399,65 +2501,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
|
||||
// this method uses a simple cache in collectionsStatesRef. Operations that modify
|
||||
// cluster state should always reset this cache so that the changes become visible
|
||||
private Map<String, DocCollection> getCollectionStates() throws IOException, InterruptedException {
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
|
||||
if (collectionStates != null) {
|
||||
Map<String, DocCollection> collectionStates = new HashMap<>();
|
||||
collectionsStatesRef.forEach((name, cached) -> {
|
||||
try {
|
||||
collectionStates.put(name, cached.getColl());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("error building collection " + name + " state", e);
|
||||
}
|
||||
});
|
||||
return collectionStates;
|
||||
}
|
||||
collectionsStatesRef.set(null);
|
||||
log.debug("** creating new collection states, currentVersion={}", clusterStateVersion);
|
||||
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
synchronized (replicas) {
|
||||
replicas.forEach(ri -> {
|
||||
Map<String, Object> props;
|
||||
synchronized (ri) {
|
||||
props = new HashMap<>(ri.getVariables());
|
||||
}
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, n);
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
|
||||
props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
|
||||
props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
|
||||
Replica r = new Replica(ri.getName(), props, ri.getCollection(), ri.getShard());
|
||||
collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), s -> new HashMap<>())
|
||||
.put(ri.getName(), r);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// add empty slices
|
||||
sliceProperties.forEach((c, perSliceProps) -> {
|
||||
perSliceProps.forEach((slice, props) -> {
|
||||
collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
|
||||
});
|
||||
});
|
||||
// add empty collections
|
||||
collProperties.keySet().forEach(c -> {
|
||||
collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
|
||||
});
|
||||
|
||||
Map<String, DocCollection> res = new HashMap<>();
|
||||
collMap.forEach((coll, shards) -> {
|
||||
Map<String, Slice> slices = new HashMap<>();
|
||||
shards.forEach((s, replicas) -> {
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
|
||||
Slice slice = new Slice(s, replicas, sliceProps, coll);
|
||||
slices.put(s, slice);
|
||||
});
|
||||
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
|
||||
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
|
||||
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
|
||||
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
|
||||
res.put(coll, dc);
|
||||
});
|
||||
saveClusterState(new ClusterState(clusterStateVersion, liveNodes.get(), res));
|
||||
collectionsStatesRef.set(res);
|
||||
return res;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -55,8 +56,27 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
|
|||
liveNodes = Set.copyOf((Collection<String>)snapshot.getOrDefault("liveNodes", Collections.emptySet()));
|
||||
clusterProperties = (Map<String, Object>)snapshot.getOrDefault("clusterProperties", Collections.emptyMap());
|
||||
Map<String, Object> stateMap = new HashMap<>((Map<String, Object>)snapshot.getOrDefault("clusterState", Collections.emptyMap()));
|
||||
Number version = (Number)stateMap.remove("version");
|
||||
clusterState = ClusterState.load(version != null ? version.intValue() : null, stateMap, liveNodes, ZkStateReader.CLUSTER_STATE);
|
||||
Map<String, DocCollection> collectionStates = new HashMap<>();
|
||||
// back-compat with format = 1
|
||||
Integer stateVersion = Integer.valueOf(String.valueOf(stateMap.getOrDefault("version", 0)));
|
||||
stateMap.remove("version");
|
||||
stateMap.forEach((name, state) -> {
|
||||
Map<String, Object> mutableState = (Map<String, Object>)state;
|
||||
Map<String, Object> collMap = (Map<String, Object>) mutableState.get(name);
|
||||
if (collMap == null) {
|
||||
// snapshot in format 1
|
||||
collMap = mutableState;
|
||||
mutableState = Collections.singletonMap(name, state);
|
||||
}
|
||||
Integer version = Integer.parseInt(String.valueOf(collMap.getOrDefault("zNodeVersion", stateVersion)));
|
||||
String path = String.valueOf(collMap.getOrDefault("zNode", ZkStateReader.getCollectionPath(name)));
|
||||
collMap.remove("zNodeVersion");
|
||||
collMap.remove("zNode");
|
||||
byte[] data = Utils.toJSON(mutableState);
|
||||
ClusterState collState = ClusterState.load(version, data, Collections.emptySet(), path);
|
||||
collectionStates.put(name, collState.getCollection(name));
|
||||
});
|
||||
clusterState = new ClusterState(stateVersion, liveNodes, collectionStates);
|
||||
}
|
||||
|
||||
public Map<String, Object> getSnapshot() {
|
||||
|
@ -67,14 +87,18 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
Map<String, Object> stateMap = new HashMap<>();
|
||||
snapshot.put("clusterState", stateMap);
|
||||
stateMap.put("version", clusterState.getZNodeVersion());
|
||||
clusterState.forEachCollection(coll -> {
|
||||
CharArr out = new CharArr();
|
||||
JSONWriter writer = new JSONWriter(out, 2);
|
||||
coll.write(writer);
|
||||
String json = out.toString();
|
||||
try {
|
||||
stateMap.put(coll.getName(), Utils.fromJSON(json.getBytes("UTF-8")));
|
||||
Map<String, Object> collMap = new LinkedHashMap<>((Map<String, Object>)Utils.fromJSON(json.getBytes("UTF-8")));
|
||||
collMap.put("zNodeVersion", coll.getZNodeVersion());
|
||||
collMap.put("zNode", coll.getZNode());
|
||||
// format compatible with the real /state.json, which uses a mini-ClusterState
|
||||
// consisting of a single collection
|
||||
stateMap.put(coll.getName(), Collections.singletonMap(coll.getName(), collMap));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException("should not happen!", e);
|
||||
}
|
||||
|
|
|
@ -229,8 +229,8 @@ public class TestSnapshotCloudManager extends SolrCloudTestCase {
|
|||
Pattern.compile("/autoscaling/triggerState/.*"),
|
||||
// some triggers may have run after the snapshot was taken
|
||||
Pattern.compile("/autoscaling/events/.*"),
|
||||
// we always use format 1 in SimClusterStateProvider
|
||||
Pattern.compile("/clusterstate\\.json"),
|
||||
Pattern.compile("/collections/[^/]+?/state.json"),
|
||||
// depending on the startup sequence leaders may differ
|
||||
Pattern.compile("/collections/[^/]+?/leader_elect/.*"),
|
||||
Pattern.compile("/collections/[^/]+?/leaders/.*"),
|
||||
|
@ -255,6 +255,14 @@ public class TestSnapshotCloudManager extends SolrCloudTestCase {
|
|||
.filter(STATE_FILTER_FUN).collect(Collectors.toList()));
|
||||
Collections.sort(treeOne);
|
||||
Collections.sort(treeTwo);
|
||||
if (!treeOne.equals(treeTwo)) {
|
||||
List<String> t1 = new ArrayList<>(treeOne);
|
||||
t1.removeAll(treeTwo);
|
||||
log.warn("Only in tree one: " + t1);
|
||||
List<String> t2 = new ArrayList<>(treeTwo);
|
||||
t2.removeAll(treeOne);
|
||||
log.warn("Only in tree two: " + t2);
|
||||
}
|
||||
assertEquals(treeOne, treeTwo);
|
||||
for (String path : treeOne) {
|
||||
VersionedData vd1 = one.getData(path);
|
||||
|
|
Loading…
Reference in New Issue