mirror of https://github.com/apache/lucene.git
SOLR-13440: Support saving/restoring autoscaling state for repeatable simulations.
This commit is contained in:
parent
9189472d70
commit
f2c18bacf2
|
@ -83,6 +83,8 @@ New Features
|
|||
|
||||
* SOLR-13047: Add facet2D Streaming Expression (Nazerke Seidan, Joel Bernstein)
|
||||
|
||||
* SOLR-13440: Support saving/restoring autoscaling state for repeatable simulations. (ab)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueue;
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
||||
/**
|
||||
* A queue factory implementation that does nothing.
|
||||
*/
|
||||
public class NoopDistributedQueueFactory implements DistributedQueueFactory {
|
||||
|
||||
public static final DistributedQueueFactory INSTANCE = new NoopDistributedQueueFactory();
|
||||
|
||||
@Override
|
||||
public DistributedQueue makeQueue(String path) throws IOException {
|
||||
return NoopDistributedQueue.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(String path) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
private static final class NoopDistributedQueue implements DistributedQueue {
|
||||
static final DistributedQueue INSTANCE = new NoopDistributedQueue();
|
||||
|
||||
@Override
|
||||
public byte[] peek() throws Exception {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] peek(boolean block) throws Exception {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] peek(long wait) throws Exception {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] poll() throws Exception {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] remove() throws Exception {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] take() throws Exception {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offer(byte[] data) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getStats() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws Exception {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,9 +22,9 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -52,12 +52,12 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
|||
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionApiMapping;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.RequestWriter;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
|
@ -173,19 +173,27 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
* @param timeSource time source to use.
|
||||
*/
|
||||
public SimCloudManager(TimeSource timeSource) throws Exception {
|
||||
this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
|
||||
this(timeSource, null);
|
||||
}
|
||||
|
||||
SimCloudManager(TimeSource timeSource, SimDistribStateManager distribStateManager) throws Exception {
|
||||
this.loader = new SolrResourceLoader();
|
||||
// init common paths
|
||||
stateManager.makePath(ZkStateReader.CLUSTER_STATE);
|
||||
stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
|
||||
stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
|
||||
stateManager.makePath(ZkStateReader.ROLES);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
stateManager.makePath(Overseer.OVERSEER_ELECT);
|
||||
if (distribStateManager == null) {
|
||||
this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
|
||||
// init common paths
|
||||
stateManager.makePath(ZkStateReader.CLUSTER_STATE);
|
||||
stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
|
||||
stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
|
||||
stateManager.makePath(ZkStateReader.ROLES);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
stateManager.makePath(Overseer.OVERSEER_ELECT);
|
||||
} else {
|
||||
this.stateManager = distribStateManager;
|
||||
}
|
||||
|
||||
// register common metrics
|
||||
metricTag = Integer.toHexString(hashCode());
|
||||
|
@ -296,22 +304,27 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
return cloudManager;
|
||||
}
|
||||
|
||||
public static SimCloudManager createCluster(SolrCloudManager other, TimeSource timeSource) throws Exception {
|
||||
SimCloudManager cloudManager = new SimCloudManager(timeSource);
|
||||
public static SimCloudManager createCluster(SolrCloudManager other, AutoScalingConfig config, TimeSource timeSource) throws Exception {
|
||||
SimDistribStateManager distribStateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
|
||||
distribStateManager.copyFrom(other.getDistribStateManager(), false);
|
||||
SimCloudManager cloudManager = new SimCloudManager(timeSource, distribStateManager);
|
||||
if (config != null) {
|
||||
cloudManager.getSimDistribStateManager().simSetAutoScalingConfig(config);
|
||||
} else {
|
||||
config = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||
}
|
||||
Set<String> nodeTags = new HashSet<>(SimUtils.COMMON_NODE_TAGS);
|
||||
nodeTags.addAll(config.getPolicy().getParams());
|
||||
Set<String> replicaTags = new HashSet<>(SimUtils.COMMON_REPLICA_TAGS);
|
||||
replicaTags.addAll(config.getPolicy().getPerReplicaAttributes());
|
||||
cloudManager.getSimClusterStateProvider().copyFrom(other.getClusterStateProvider());
|
||||
List<String> replicaTags = Arrays.asList(
|
||||
Variable.Type.CORE_IDX.metricsAttribute,
|
||||
"QUERY./select.requests",
|
||||
"UPDATE./update.requests"
|
||||
);
|
||||
Set<String> nodeTags = createNodeValues("unused:1234_solr").keySet();
|
||||
for (String node : other.getClusterStateProvider().getLiveNodes()) {
|
||||
SimClusterStateProvider simClusterStateProvider = cloudManager.getSimClusterStateProvider();
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValues(node, other.getNodeStateProvider().getNodeValues(node, nodeTags));
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = other.getNodeStateProvider().getReplicaInfo(node, replicaTags);
|
||||
simClusterStateProvider.simSetReplicaValues(node, infos, true);
|
||||
}
|
||||
cloudManager.getSimDistribStateManager().copyFrom(other.getDistribStateManager(), false);
|
||||
SimUtils.checkConsistency(cloudManager, config);
|
||||
return cloudManager;
|
||||
}
|
||||
|
||||
|
@ -721,13 +734,6 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
count.incrementAndGet();
|
||||
}
|
||||
|
||||
private static final Map<String, String> v2v1Mapping = new HashMap<>();
|
||||
static {
|
||||
for (CollectionApiMapping.Meta meta : CollectionApiMapping.Meta.values()) {
|
||||
if (meta.action != null) v2v1Mapping.put(meta.commandName, meta.action.toLower());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler method for autoscaling requests. NOTE: only a specific subset of autoscaling requests is
|
||||
* supported!
|
||||
|
@ -835,31 +841,8 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
if (!(req instanceof CollectionAdminRequest)) {
|
||||
// maybe a V2Request?
|
||||
if (req instanceof V2Request) {
|
||||
Map<String, Object> reqMap = new HashMap<>();
|
||||
((V2Request)req).toMap(reqMap);
|
||||
String path = (String)reqMap.get("path");
|
||||
if (!path.startsWith("/c/") || path.length() < 4) {
|
||||
throw new UnsupportedOperationException("Unsupported V2 request path: " + reqMap);
|
||||
}
|
||||
Map<String, Object> cmd = (Map<String, Object>)reqMap.get("command");
|
||||
if (cmd.size() != 1) {
|
||||
throw new UnsupportedOperationException("Unsupported multi-command V2 request: " + reqMap);
|
||||
}
|
||||
a = cmd.keySet().iterator().next();
|
||||
params = new ModifiableSolrParams();
|
||||
((ModifiableSolrParams)params).add(CollectionAdminParams.COLLECTION, path.substring(3));
|
||||
if (req.getParams() != null) {
|
||||
((ModifiableSolrParams)params).add(req.getParams());
|
||||
}
|
||||
Map<String, Object> reqParams = (Map<String, Object>)cmd.get(a);
|
||||
for (Map.Entry<String, Object> e : reqParams.entrySet()) {
|
||||
((ModifiableSolrParams)params).add(e.getKey(), e.getValue().toString());
|
||||
}
|
||||
// re-map from v2 to v1 action
|
||||
a = v2v1Mapping.get(a);
|
||||
if (a == null) {
|
||||
throw new UnsupportedOperationException("Unsupported V2 request: " + reqMap);
|
||||
}
|
||||
params = SimUtils.v2AdminRequestToV1Params((V2Request)req);
|
||||
a = params.get(CoreAdminParams.ACTION);
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName() + ": " + req.getPath() + " " + req.getParams());
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
|
||||
|
@ -199,6 +200,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
lock.lockInterruptibly();
|
||||
try {
|
||||
collProperties.clear();
|
||||
colShardReplicaMap.clear();
|
||||
sliceProperties.clear();
|
||||
nodeReplicaMap.clear();
|
||||
liveNodes.clear();
|
||||
|
@ -209,6 +211,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId)) {
|
||||
stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, -1);
|
||||
}
|
||||
if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId)) {
|
||||
stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId, -1);
|
||||
}
|
||||
}
|
||||
liveNodes.addAll(initialState.getLiveNodes());
|
||||
for (String nodeId : liveNodes.get()) {
|
||||
|
@ -220,10 +225,22 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
dc.getSlices().forEach(s -> {
|
||||
sliceProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(s.getName(), Utils.NEW_HASHMAP_FUN).putAll(s.getProperties());
|
||||
Replica leader = s.getLeader();
|
||||
s.getReplicas().forEach(r -> {
|
||||
ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties());
|
||||
Map<String, Object> props = new HashMap<>(r.getProperties());
|
||||
if (leader != null && r.getName().equals(leader.getName())) {
|
||||
props.put("leader", "true");
|
||||
}
|
||||
ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), props);
|
||||
if (leader != null && r.getName().equals(leader.getName())) {
|
||||
ri.getVariables().put("leader", "true");
|
||||
}
|
||||
if (liveNodes.get().contains(r.getNodeName())) {
|
||||
nodeReplicaMap.computeIfAbsent(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN).add(ri);
|
||||
colShardReplicaMap.computeIfAbsent(ri.getCollection(), name -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), shard -> new ArrayList<>()).add(ri);
|
||||
} else {
|
||||
log.warn("- dropping replica because its node " + r.getNodeName() + " is not live: " + r);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -274,7 +291,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
|
||||
synchronized (list) {
|
||||
for (ReplicaInfo ri : list) {
|
||||
if (r.getCoreName().equals(ri.getCore())) {
|
||||
if (r.getCoreName().equals(ri.getCore()) && r.getName().equals(ri.getName())) {
|
||||
return ri;
|
||||
}
|
||||
}
|
||||
|
@ -562,14 +579,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
|
||||
.computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
|
||||
// update the number of cores and freedisk in node values
|
||||
Integer cores = (Integer)values.get(ImplicitSnitch.CORES);
|
||||
Number cores = (Number)values.get(ImplicitSnitch.CORES);
|
||||
if (cores == null) {
|
||||
cores = 0;
|
||||
}
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores.intValue() + 1);
|
||||
Number disk = (Number)values.get(ImplicitSnitch.DISK);
|
||||
if (disk == null) {
|
||||
disk = SimCloudManager.DEFAULT_FREE_DISK;
|
||||
throw new Exception("Missing '" + ImplicitSnitch.DISK + "' in node metrics for node " + nodeId);
|
||||
//disk = SimCloudManager.DEFAULT_FREE_DISK;
|
||||
}
|
||||
long replicaSize = ((Number)replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute)).longValue();
|
||||
Number replicaSizeGB = (Number)Type.CORE_IDX.convertVal(replicaSize);
|
||||
|
@ -598,7 +616,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
* @param nodeId node id
|
||||
* @param coreNodeName coreNodeName
|
||||
*/
|
||||
public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception {
|
||||
public void simRemoveReplica(String nodeId, String collection, String coreNodeName) throws Exception {
|
||||
ensureNotClosed();
|
||||
|
||||
lock.lockInterruptibly();
|
||||
|
@ -607,7 +625,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
(nodeId, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
|
||||
synchronized (replicas) {
|
||||
for (int i = 0; i < replicas.size(); i++) {
|
||||
if (coreNodeName.equals(replicas.get(i).getName())) {
|
||||
if (collection.equals(replicas.get(i).getCollection()) && coreNodeName.equals(replicas.get(i).getName())) {
|
||||
ReplicaInfo ri = replicas.remove(i);
|
||||
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
|
||||
|
@ -618,11 +636,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
// update the number of cores in node values, if node is live
|
||||
if (liveNodes.contains(nodeId)) {
|
||||
Integer cores = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES);
|
||||
if (cores == null || cores == 0) {
|
||||
Number cores = (Number)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES);
|
||||
if (cores == null || cores.intValue() == 0) {
|
||||
throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId);
|
||||
}
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores - 1);
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores.intValue() - 1);
|
||||
Number disk = (Number)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.DISK);
|
||||
if (disk == null || disk.doubleValue() == 0.0) {
|
||||
throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId);
|
||||
|
@ -1038,13 +1056,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (ri.getCollection().equals(collection)) {
|
||||
it.remove();
|
||||
// update the number of cores in node values
|
||||
Integer cores = (Integer) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
|
||||
Number cores = (Number) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
|
||||
if (cores != null) { // node is still up
|
||||
if (cores == 0) {
|
||||
if (cores.intValue() == 0) {
|
||||
throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n);
|
||||
}
|
||||
try {
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores - 1);
|
||||
cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores.intValue() - 1);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("interrupted");
|
||||
|
@ -1089,6 +1107,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
|
||||
private static final Set<String> NO_COPY_PROPS = new HashSet<>(Arrays.asList(
|
||||
ZkStateReader.CORE_NODE_NAME_PROP,
|
||||
ZkStateReader.NODE_NAME_PROP,
|
||||
ZkStateReader.BASE_URL_PROP,
|
||||
ZkStateReader.CORE_NAME_PROP
|
||||
));
|
||||
|
||||
/**
|
||||
* Move replica. This uses a similar algorithm as {@link org.apache.solr.cloud.api.collections.MoveReplicaCmd} <code>moveNormalReplica(...)</code> method.
|
||||
* @param message operation details
|
||||
|
@ -1123,17 +1148,35 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
|
||||
opDelay(collection, CollectionParams.CollectionAction.MOVEREPLICA.name());
|
||||
ReplicaInfo ri = getReplicaInfo(replica);
|
||||
if (ri != null) {
|
||||
if (ri.getVariable(Type.CORE_IDX.tagName) != null) {
|
||||
// simulate very large replicas - add additional delay of 5s / GB
|
||||
long sizeInGB = ((Number)ri.getVariable(Type.CORE_IDX.tagName)).longValue();
|
||||
long opDelay = opDelays.getOrDefault(ri.getCollection(), Collections.emptyMap())
|
||||
.getOrDefault(CollectionParams.CollectionAction.MOVEREPLICA.name(), defaultOpDelays.get(CollectionParams.CollectionAction.MOVEREPLICA.name()));
|
||||
opDelay = TimeUnit.MILLISECONDS.toSeconds(opDelay);
|
||||
if (sizeInGB > opDelay) {
|
||||
// add 5s per each GB above the threshold
|
||||
cloudManager.getTimeSource().sleep(TimeUnit.SECONDS.toMillis(sizeInGB - opDelay) * 5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: for now simulate moveNormalReplica sequence, where we first add new replica and then delete the old one
|
||||
|
||||
String newSolrCoreName = Assign.buildSolrCoreName(stateManager, coll, slice.getName(), replica.getType());
|
||||
String coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
|
||||
ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, null);
|
||||
// copy other properties
|
||||
Map<String, Object> props = replica.getProperties().entrySet().stream()
|
||||
.filter(e -> !NO_COPY_PROPS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
|
||||
ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, props);
|
||||
log.debug("-- new replica: " + newReplica);
|
||||
// xxx should run leader election here already?
|
||||
simAddReplica(targetNode, newReplica, false);
|
||||
// this will trigger leader election
|
||||
simRemoveReplica(replica.getNodeName(), replica.getName());
|
||||
simRemoveReplica(replica.getNodeName(), collection, replica.getName());
|
||||
results.add("success", "");
|
||||
}
|
||||
|
||||
|
@ -1996,13 +2039,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
public void simSetReplicaValues(String node, Map<String, Map<String, List<ReplicaInfo>>> source, boolean overwrite) {
|
||||
List<ReplicaInfo> infos = nodeReplicaMap.get(node);
|
||||
Map<String, ReplicaInfo> infoMap = new HashMap<>();
|
||||
infos.forEach(ri -> infoMap.put(ri.getName(), ri));
|
||||
if (infos == null) {
|
||||
throw new RuntimeException("Node not present: " + node);
|
||||
}
|
||||
// core_node_name is not unique across collections
|
||||
Map<String, Map<String, ReplicaInfo>> infoMap = new HashMap<>();
|
||||
infos.forEach(ri -> infoMap.computeIfAbsent(ri.getCollection(), Utils.NEW_HASHMAP_FUN).put(ri.getName(), ri));
|
||||
source.forEach((coll, shards) -> shards.forEach((shard, replicas) -> replicas.forEach(r -> {
|
||||
ReplicaInfo target = infoMap.get(r.getName());
|
||||
ReplicaInfo target = infoMap.getOrDefault(coll, Collections.emptyMap()).get(r.getName());
|
||||
if (target == null) {
|
||||
throw new RuntimeException("Unable to find simulated replica of " + r);
|
||||
}
|
||||
|
@ -2041,6 +2085,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
|
||||
public ReplicaInfo simGetReplicaInfo(String collection, String coreNode) {
|
||||
Map<String, List<ReplicaInfo>> shardsReplicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>());
|
||||
for (List<ReplicaInfo> replicas : shardsReplicas.values()) {
|
||||
for (ReplicaInfo ri : replicas) {
|
||||
if (ri.getName().equals(coreNode)) {
|
||||
return ri;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* List collections.
|
||||
* @return list of existing collections.
|
||||
|
|
|
@ -75,7 +75,8 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
private int version = 0;
|
||||
private int seq = 0;
|
||||
private final CreateMode mode;
|
||||
private final String clientId;
|
||||
// copyFrom needs to modify this
|
||||
private String owner;
|
||||
private final String path;
|
||||
private final String name;
|
||||
private final Node parent;
|
||||
|
@ -84,16 +85,16 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
Set<Watcher> dataWatches = ConcurrentHashMap.newKeySet();
|
||||
Set<Watcher> childrenWatches = ConcurrentHashMap.newKeySet();
|
||||
|
||||
Node(Node parent, String name, String path, CreateMode mode, String clientId) {
|
||||
Node(Node parent, String name, String path, CreateMode mode, String owner) {
|
||||
this.parent = parent;
|
||||
this.name = name;
|
||||
this.path = path;
|
||||
this.mode = mode;
|
||||
this.clientId = clientId;
|
||||
this.owner = owner;
|
||||
}
|
||||
|
||||
Node(Node parent, String name, String path, byte[] data, CreateMode mode, String clientId) {
|
||||
this(parent, name, path, mode, clientId);
|
||||
Node(Node parent, String name, String path, byte[] data, CreateMode mode, String owner) {
|
||||
this(parent, name, path, mode, owner);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
|
@ -136,7 +137,7 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
public VersionedData getData(Watcher w) {
|
||||
dataLock.lock();
|
||||
try {
|
||||
VersionedData res = new VersionedData(version, data, mode, clientId);
|
||||
VersionedData res = new VersionedData(version, data, mode, owner);
|
||||
if (w != null && !dataWatches.contains(w)) {
|
||||
dataWatches.add(w);
|
||||
}
|
||||
|
@ -195,7 +196,7 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
continue;
|
||||
}
|
||||
if ((CreateMode.EPHEMERAL == n.mode || CreateMode.EPHEMERAL_SEQUENTIAL == n.mode) &&
|
||||
id.equals(n.clientId)) {
|
||||
id.equals(n.owner)) {
|
||||
removeChild(n.name, -1);
|
||||
} else {
|
||||
n.removeEphemeralChildren(id);
|
||||
|
@ -208,7 +209,7 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
private final ReentrantLock multiLock = new ReentrantLock();
|
||||
|
||||
public static Node createNewRootNode() {
|
||||
return new Node(null, "", "/", CreateMode.PERSISTENT, "__root__");
|
||||
return new Node(null, "", "/", CreateMode.PERSISTENT, "0");
|
||||
}
|
||||
|
||||
private final ExecutorService watchersPool;
|
||||
|
@ -243,6 +244,7 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
*/
|
||||
public void copyFrom(DistribStateManager other, boolean failOnExists) throws InterruptedException, IOException, KeeperException, AlreadyExistsException, BadVersionException {
|
||||
List<String> tree = other.listTree("/");
|
||||
log.info("- copying " + tree.size() + " resources...");
|
||||
// check if any node exists
|
||||
for (String path : tree) {
|
||||
if (hasData(path) && failOnExists) {
|
||||
|
@ -256,9 +258,10 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
} else {
|
||||
makePath(path, data.getData(), data.getMode(), failOnExists);
|
||||
}
|
||||
// hack: set the version to be the same as the source
|
||||
// hack: set the version and owner to be the same as the source
|
||||
Node n = traverse(path, false, CreateMode.PERSISTENT);
|
||||
n.version = data.getVersion();
|
||||
n.owner = data.getOwner();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -328,6 +331,9 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
return null;
|
||||
}
|
||||
throttleOrError(path);
|
||||
if (path.equals("/")) {
|
||||
return root;
|
||||
}
|
||||
if (path.charAt(0) == '/') {
|
||||
path = path.substring(1);
|
||||
}
|
||||
|
@ -365,7 +371,8 @@ public class SimDistribStateManager implements DistribStateManager {
|
|||
}
|
||||
|
||||
fullChildPath.append(nodeName);
|
||||
Node child = new Node(parentNode, nodeName, fullChildPath.toString(), data, mode, id);
|
||||
String owner = mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL ? id : "0";
|
||||
Node child = new Node(parentNode, nodeName, fullChildPath.toString(), data, mode, owner);
|
||||
|
||||
if (attachToParent) {
|
||||
parentNode.setChild(nodeName, child);
|
||||
|
|
|
@ -317,6 +317,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
|
|||
for (ReplicaInfo r : replicas) {
|
||||
Map<String, List<ReplicaInfo>> perCollection = res.computeIfAbsent(r.getCollection(), Utils.NEW_HASHMAP_FUN);
|
||||
List<ReplicaInfo> perShard = perCollection.computeIfAbsent(r.getShard(), Utils.NEW_ARRAYLIST_FUN);
|
||||
// XXX filter out some properties?
|
||||
perShard.add(r);
|
||||
}
|
||||
return res;
|
||||
|
|
|
@ -0,0 +1,351 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
|
||||
import org.apache.solr.client.solrj.request.CollectionApiMapping;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
|
||||
/**
|
||||
* Various utility methods useful for autoscaling simulations and snapshots.
|
||||
*/
|
||||
public class SimUtils {
|
||||
|
||||
public static final Set<String> COMMON_REPLICA_TAGS = new HashSet<>(Arrays.asList(
|
||||
Variable.Type.CORE_IDX.metricsAttribute,
|
||||
Variable.Type.CORE_IDX.tagName,
|
||||
"QUERY./select.requests",
|
||||
"UPDATE./update.requests"
|
||||
));
|
||||
|
||||
public static final Set<String> COMMON_NODE_TAGS = new HashSet<>(Arrays.asList(
|
||||
Variable.Type.CORES.tagName,
|
||||
Variable.Type.FREEDISK.tagName,
|
||||
Variable.Type.NODE.tagName,
|
||||
Variable.Type.NODE_ROLE.tagName,
|
||||
Variable.Type.TOTALDISK.tagName,
|
||||
Variable.Type.DISKTYPE.tagName,
|
||||
Variable.Type.HEAPUSAGE.tagName,
|
||||
Variable.Type.HOST.tagName,
|
||||
Variable.Type.IP_1.tagName,
|
||||
Variable.Type.IP_2.tagName,
|
||||
Variable.Type.IP_3.tagName,
|
||||
Variable.Type.IP_4.tagName,
|
||||
Variable.Type.PORT.tagName,
|
||||
Variable.Type.SYSLOADAVG.tagName,
|
||||
"withCollection"
|
||||
));
|
||||
|
||||
/**
|
||||
* Check consistency of data in a {@link SolrCloudManager}. This may be needed when constructing a simulated
|
||||
* instance from potentially inconsistent data (eg. partial snapshots taken at different points in time).
|
||||
* @param solrCloudManager source manager
|
||||
* @param config optional {@link AutoScalingConfig} instance used to determine what node and replica metrics to check.
|
||||
*/
|
||||
public static void checkConsistency(SolrCloudManager solrCloudManager, AutoScalingConfig config) throws Exception {
|
||||
if (config == null) {
|
||||
config = solrCloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||
}
|
||||
Set<String> replicaTags = new HashSet<>(COMMON_REPLICA_TAGS);
|
||||
replicaTags.addAll(config.getPolicy().getPerReplicaAttributes());
|
||||
|
||||
// verify replicas are consistent and data is available
|
||||
Map<String, Map<String, Replica>> allReplicas = new HashMap<>();
|
||||
solrCloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
|
||||
coll.getReplicas().forEach(r -> {
|
||||
if (allReplicas.containsKey(r.getName())) {
|
||||
throw new RuntimeException("duplicate core_node name in clusterState: " + allReplicas.get(r.getName()) + " versus " + r);
|
||||
} else {
|
||||
allReplicas.computeIfAbsent(coll.getName(), c -> new HashMap<>()).put(r.getName(), r);
|
||||
}
|
||||
});
|
||||
});
|
||||
Map<String, Map<String, ReplicaInfo>> allReplicaInfos = new HashMap<>();
|
||||
solrCloudManager.getClusterStateProvider().getLiveNodes().forEach(n -> {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = solrCloudManager.getNodeStateProvider().getReplicaInfo(n, replicaTags);
|
||||
infos.forEach((coll, shards) -> shards.forEach((shard, replicas) -> replicas.forEach(r -> {
|
||||
if (allReplicaInfos.containsKey(r.getName())) {
|
||||
throw new RuntimeException("duplicate core_node name in NodeStateProvider: " + allReplicaInfos.get(r.getName()) + " versus " + r);
|
||||
} else {
|
||||
allReplicaInfos.computeIfAbsent(coll, c -> new HashMap<>()).put(r.getName(), r);
|
||||
}
|
||||
})));
|
||||
});
|
||||
if (!allReplicaInfos.keySet().equals(allReplicas.keySet())) {
|
||||
Set<String> notInClusterState = allReplicaInfos.keySet().stream()
|
||||
.filter(k -> !allReplicas.containsKey(k))
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> notInNodeProvider = allReplicas.keySet().stream()
|
||||
.filter(k -> !allReplicaInfos.containsKey(k))
|
||||
.collect(Collectors.toSet());
|
||||
throw new RuntimeException("Mismatched replica data between ClusterState and NodeStateProvider:\n\t" +
|
||||
"collection not in ClusterState: " + notInClusterState + "\n\t" +
|
||||
"collection not in NodeStateProvider: " + notInNodeProvider);
|
||||
}
|
||||
allReplicaInfos.keySet().forEach(collection -> {
|
||||
Set<String> infosCores = allReplicaInfos.getOrDefault(collection, Collections.emptyMap()).keySet();
|
||||
Set<String> csCores = allReplicas.getOrDefault(collection, Collections.emptyMap()).keySet();
|
||||
if (!infosCores.equals(csCores)) {
|
||||
Set<String> notInClusterState = infosCores.stream()
|
||||
.filter(k -> !csCores.contains(k))
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> notInNodeProvider = csCores.stream()
|
||||
.filter(k -> !infosCores.contains(k))
|
||||
.collect(Collectors.toSet());
|
||||
throw new RuntimeException("Mismatched replica data between ClusterState and NodeStateProvider:\n\t" +
|
||||
"replica not in ClusterState: " + notInClusterState + "\n\t" +
|
||||
"replica not in NodeStateProvider: " + notInNodeProvider);
|
||||
}
|
||||
});
|
||||
// verify all replicas have size info
|
||||
allReplicaInfos.forEach((coll, replicas) -> replicas.forEach((core, ri) -> {
|
||||
Number size = (Number) ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute);
|
||||
if (size == null) {
|
||||
size = (Number) ri.getVariable(Variable.Type.CORE_IDX.tagName);
|
||||
if (size == null) {
|
||||
// for (String node : solrCloudManager.getClusterStateProvider().getLiveNodes()) {
|
||||
// log.error("Check for missing values: {}: {}", node, solrCloudManager.getNodeStateProvider().getReplicaInfo(node, SnapshotNodeStateProvider.REPLICA_TAGS));
|
||||
// }
|
||||
throw new RuntimeException("missing replica size information: " + ri);
|
||||
}
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate statistics of node / collection and replica layouts for the provided {@link SolrCloudManager}.
|
||||
* @param cloudManager manager
|
||||
* @param config autoscaling config, or null if the one from the provided manager should be used
|
||||
* @param verbose if true then add more details about replicas.
|
||||
* @return a map containing detailed statistics
|
||||
*/
|
||||
public static Map<String, Object> calculateStats(SolrCloudManager cloudManager, AutoScalingConfig config, boolean verbose) throws Exception {
|
||||
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
|
||||
Map<String, Map<String, Number>> collStats = new TreeMap<>();
|
||||
Policy.Session session = config.getPolicy().createSession(cloudManager);
|
||||
clusterState.forEachCollection(coll -> {
|
||||
Map<String, Number> perColl = collStats.computeIfAbsent(coll.getName(), n -> new LinkedHashMap<>());
|
||||
AtomicInteger numCores = new AtomicInteger();
|
||||
HashMap<String, Map<String, AtomicInteger>> nodes = new HashMap<>();
|
||||
coll.getSlices().forEach(s -> {
|
||||
numCores.addAndGet(s.getReplicas().size());
|
||||
s.getReplicas().forEach(r -> {
|
||||
nodes.computeIfAbsent(r.getNodeName(), n -> new HashMap<>())
|
||||
.computeIfAbsent(s.getName(), slice -> new AtomicInteger()).incrementAndGet();
|
||||
});
|
||||
});
|
||||
int maxCoresPerNode = 0;
|
||||
int minCoresPerNode = 0;
|
||||
int maxActualShardsPerNode = 0;
|
||||
int minActualShardsPerNode = 0;
|
||||
int maxShardReplicasPerNode = 0;
|
||||
int minShardReplicasPerNode = 0;
|
||||
if (!nodes.isEmpty()) {
|
||||
minCoresPerNode = Integer.MAX_VALUE;
|
||||
minActualShardsPerNode = Integer.MAX_VALUE;
|
||||
minShardReplicasPerNode = Integer.MAX_VALUE;
|
||||
for (Map<String, AtomicInteger> counts : nodes.values()) {
|
||||
int total = counts.values().stream().mapToInt(c -> c.get()).sum();
|
||||
for (AtomicInteger count : counts.values()) {
|
||||
if (count.get() > maxShardReplicasPerNode) {
|
||||
maxShardReplicasPerNode = count.get();
|
||||
}
|
||||
if (count.get() < minShardReplicasPerNode) {
|
||||
minShardReplicasPerNode = count.get();
|
||||
}
|
||||
}
|
||||
if (total > maxCoresPerNode) {
|
||||
maxCoresPerNode = total;
|
||||
}
|
||||
if (total < minCoresPerNode) {
|
||||
minCoresPerNode = total;
|
||||
}
|
||||
if (counts.size() > maxActualShardsPerNode) {
|
||||
maxActualShardsPerNode = counts.size();
|
||||
}
|
||||
if (counts.size() < minActualShardsPerNode) {
|
||||
minActualShardsPerNode = counts.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
perColl.put("activeShards", coll.getActiveSlices().size());
|
||||
perColl.put("inactiveShards", coll.getSlices().size() - coll.getActiveSlices().size());
|
||||
perColl.put("rf", coll.getReplicationFactor());
|
||||
perColl.put("maxShardsPerNode", coll.getMaxShardsPerNode());
|
||||
perColl.put("maxActualShardsPerNode", maxActualShardsPerNode);
|
||||
perColl.put("minActualShardsPerNode", minActualShardsPerNode);
|
||||
perColl.put("maxShardReplicasPerNode", maxShardReplicasPerNode);
|
||||
perColl.put("minShardReplicasPerNode", minShardReplicasPerNode);
|
||||
perColl.put("numCores", numCores.get());
|
||||
perColl.put("numNodes", nodes.size());
|
||||
perColl.put("maxCoresPerNode", maxCoresPerNode);
|
||||
perColl.put("minCoresPerNode", minCoresPerNode);
|
||||
});
|
||||
Map<String, Map<String, Object>> nodeStats = new TreeMap<>();
|
||||
Map<Integer, AtomicInteger> coreStats = new TreeMap<>();
|
||||
List<Row> rows = session.getSortedNodes();
|
||||
// check consistency
|
||||
if (rows.size() != clusterState.getLiveNodes().size()) {
|
||||
throw new Exception("Mismatch between autoscaling matrix size (" + rows.size() + ") and liveNodes size (" + clusterState.getLiveNodes().size() + ")");
|
||||
}
|
||||
for (Row row : rows) {
|
||||
Map<String, Object> nodeStat = nodeStats.computeIfAbsent(row.node, n -> new LinkedHashMap<>());
|
||||
nodeStat.put("isLive", row.isLive());
|
||||
nodeStat.put("freedisk", row.getVal("freedisk", 0));
|
||||
nodeStat.put("totaldisk", row.getVal("totaldisk", 0));
|
||||
int cores = ((Number)row.getVal("cores", 0)).intValue();
|
||||
nodeStat.put("cores", cores);
|
||||
coreStats.computeIfAbsent(cores, num -> new AtomicInteger()).incrementAndGet();
|
||||
Map<String, Map<String, Map<String, Object>>> collReplicas = new TreeMap<>();
|
||||
// check consistency
|
||||
AtomicInteger rowCores = new AtomicInteger();
|
||||
row.forEachReplica(ri -> rowCores.incrementAndGet());
|
||||
if (cores != rowCores.get()) {
|
||||
throw new Exception("Mismatch between autoscaling matrix row replicas (" + rowCores.get() + ") and number of cores (" + cores + ")");
|
||||
}
|
||||
row.forEachReplica(ri -> {
|
||||
Map<String, Object> perReplica = collReplicas.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
|
||||
.computeIfAbsent(ri.getCore().substring(ri.getCollection().length() + 1), core -> new LinkedHashMap<>());
|
||||
// if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
|
||||
// perReplica.put(Variable.Type.CORE_IDX.tagName, ri.getVariable(Variable.Type.CORE_IDX.tagName));
|
||||
// }
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) != null) {
|
||||
perReplica.put(Variable.Type.CORE_IDX.metricsAttribute, ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute));
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
|
||||
perReplica.put(Variable.Type.CORE_IDX.tagName, ri.getVariable(Variable.Type.CORE_IDX.tagName));
|
||||
} else {
|
||||
perReplica.put(Variable.Type.CORE_IDX.tagName,
|
||||
Variable.Type.CORE_IDX.convertVal(ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute)));
|
||||
}
|
||||
}
|
||||
perReplica.put("coreNode", ri.getName());
|
||||
if (ri.isLeader || ri.getBool("leader", false)) {
|
||||
perReplica.put("leader", true);
|
||||
Double totalSize = (Double)collStats.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent("avgShardSize", size -> 0.0);
|
||||
Number riSize = (Number)ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute);
|
||||
if (riSize != null) {
|
||||
totalSize += riSize.doubleValue();
|
||||
collStats.get(ri.getCollection()).put("avgShardSize", totalSize);
|
||||
Double max = (Double)collStats.get(ri.getCollection()).get("maxShardSize");
|
||||
if (max == null) max = 0.0;
|
||||
if (riSize.doubleValue() > max) {
|
||||
collStats.get(ri.getCollection()).put("maxShardSize", riSize.doubleValue());
|
||||
}
|
||||
Double min = (Double)collStats.get(ri.getCollection()).get("minShardSize");
|
||||
if (min == null) min = Double.MAX_VALUE;
|
||||
if (riSize.doubleValue() < min) {
|
||||
collStats.get(ri.getCollection()).put("minShardSize", riSize.doubleValue());
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("ReplicaInfo without size information: " + ri);
|
||||
}
|
||||
}
|
||||
if (verbose) {
|
||||
nodeStat.put("replicas", collReplicas);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// calculate average per shard and convert the units
|
||||
for (Map<String, Number> perColl : collStats.values()) {
|
||||
Number avg = perColl.get("avgShardSize");
|
||||
if (avg != null) {
|
||||
avg = avg.doubleValue() / perColl.get("activeShards").doubleValue();
|
||||
perColl.put("avgShardSize", (Number)Variable.Type.CORE_IDX.convertVal(avg));
|
||||
}
|
||||
Number num = perColl.get("maxShardSize");
|
||||
if (num != null) {
|
||||
perColl.put("maxShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num));
|
||||
}
|
||||
num = perColl.get("minShardSize");
|
||||
if (num != null) {
|
||||
perColl.put("minShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num));
|
||||
}
|
||||
}
|
||||
Map<String, Object> stats = new LinkedHashMap<>();
|
||||
stats.put("coresPerNodes", coreStats);
|
||||
stats.put("sortedNodeStats", nodeStats);
|
||||
stats.put("collectionStats", collStats);
|
||||
return stats;
|
||||
}
|
||||
|
||||
private static final Map<String, String> v2v1Mapping = new HashMap<>();
|
||||
static {
|
||||
for (CollectionApiMapping.Meta meta : CollectionApiMapping.Meta.values()) {
|
||||
if (meta.action != null) v2v1Mapping.put(meta.commandName, meta.action.toLower());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a V2 {@link org.apache.solr.client.solrj.request.CollectionAdminRequest} to regular {@link org.apache.solr.common.params.SolrParams}
|
||||
* @param req request
|
||||
* @return payload converted to V1 params
|
||||
*/
|
||||
public static ModifiableSolrParams v2AdminRequestToV1Params(V2Request req) {
|
||||
Map<String, Object> reqMap = new HashMap<>();
|
||||
((V2Request)req).toMap(reqMap);
|
||||
String path = (String)reqMap.get("path");
|
||||
if (!path.startsWith("/c/") || path.length() < 4) {
|
||||
throw new UnsupportedOperationException("Unsupported V2 request path: " + reqMap);
|
||||
}
|
||||
Map<String, Object> cmd = (Map<String, Object>)reqMap.get("command");
|
||||
if (cmd.size() != 1) {
|
||||
throw new UnsupportedOperationException("Unsupported multi-command V2 request: " + reqMap);
|
||||
}
|
||||
String a = cmd.keySet().iterator().next();
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.add(CollectionAdminParams.COLLECTION, path.substring(3));
|
||||
if (req.getParams() != null) {
|
||||
params.add(req.getParams());
|
||||
}
|
||||
Map<String, Object> reqParams = (Map<String, Object>)cmd.get(a);
|
||||
for (Map.Entry<String, Object> e : reqParams.entrySet()) {
|
||||
params.add(e.getKey(), e.getValue().toString());
|
||||
}
|
||||
// re-map from v2 to v1 action
|
||||
a = v2v1Mapping.get(a);
|
||||
if (a == null) {
|
||||
throw new UnsupportedOperationException("Unsupported V2 request: " + reqMap);
|
||||
}
|
||||
params.add(CoreAdminParams.ACTION, a);
|
||||
return params;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,237 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ObjectCache;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Read-only snapshot of another {@link SolrCloudManager}.
|
||||
*/
|
||||
public class SnapshotCloudManager implements SolrCloudManager {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private ObjectCache objectCache = new ObjectCache();
|
||||
private SnapshotClusterStateProvider clusterStateProvider;
|
||||
private SnapshotNodeStateProvider nodeStateProvider;
|
||||
private SnapshotDistribStateManager distribStateManager;
|
||||
private TimeSource timeSource;
|
||||
|
||||
public static final String MANAGER_STATE_KEY = "managerState";
|
||||
public static final String CLUSTER_STATE_KEY = "clusterState";
|
||||
public static final String NODE_STATE_KEY = "nodeState";
|
||||
public static final String DISTRIB_STATE_KEY = "distribState";
|
||||
public static final String AUTOSCALING_STATE_KEY = "autoscalingState";
|
||||
public static final String STATISTICS_STATE_KEY = "statistics";
|
||||
|
||||
private static final List<String> REQUIRED_KEYS = Arrays.asList(
|
||||
MANAGER_STATE_KEY,
|
||||
CLUSTER_STATE_KEY,
|
||||
NODE_STATE_KEY,
|
||||
DISTRIB_STATE_KEY
|
||||
);
|
||||
|
||||
public SnapshotCloudManager(SolrCloudManager other, AutoScalingConfig config) throws Exception {
|
||||
this.timeSource = other.getTimeSource();
|
||||
this.clusterStateProvider = new SnapshotClusterStateProvider(other.getClusterStateProvider());
|
||||
this.nodeStateProvider = new SnapshotNodeStateProvider(other, config);
|
||||
this.distribStateManager = new SnapshotDistribStateManager(other.getDistribStateManager(), config);
|
||||
SimUtils.checkConsistency(this, config);
|
||||
}
|
||||
|
||||
public SnapshotCloudManager(Map<String, Object> snapshot) throws Exception {
|
||||
Objects.requireNonNull(snapshot);
|
||||
init(
|
||||
(Map<String, Object>)snapshot.getOrDefault(MANAGER_STATE_KEY, Collections.emptyMap()),
|
||||
(Map<String, Object>)snapshot.getOrDefault(CLUSTER_STATE_KEY, Collections.emptyMap()),
|
||||
(Map<String, Object>)snapshot.getOrDefault(NODE_STATE_KEY, Collections.emptyMap()),
|
||||
(Map<String, Object>)snapshot.getOrDefault(DISTRIB_STATE_KEY, Collections.emptyMap())
|
||||
);
|
||||
}
|
||||
|
||||
public void saveSnapshot(File targetDir, boolean withAutoscaling) throws Exception {
|
||||
Map<String, Object> snapshot = getSnapshot(withAutoscaling);
|
||||
targetDir.mkdirs();
|
||||
for (Map.Entry<String, Object> e : snapshot.entrySet()) {
|
||||
FileOutputStream out = new FileOutputStream(new File(targetDir, e.getKey() + ".json"));
|
||||
IOUtils.write(Utils.toJSON(e.getValue()), out);
|
||||
out.flush();
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static SnapshotCloudManager readSnapshot(File sourceDir) throws Exception {
|
||||
if (!sourceDir.exists()) {
|
||||
throw new Exception("Source path doesn't exist: " + sourceDir);
|
||||
}
|
||||
if (!sourceDir.isDirectory()) {
|
||||
throw new Exception("Source path is not a directory: " + sourceDir);
|
||||
}
|
||||
Map<String, Object> snapshot = new HashMap<>();
|
||||
int validData = 0;
|
||||
for (String key : REQUIRED_KEYS) {
|
||||
File src = new File(sourceDir, key + ".json");
|
||||
if (src.exists()) {
|
||||
InputStream is = new FileInputStream(src);
|
||||
Map<String, Object> data = (Map<String, Object>)Utils.fromJSON(is);
|
||||
is.close();
|
||||
snapshot.put(key, data);
|
||||
validData++;
|
||||
}
|
||||
}
|
||||
if (validData < REQUIRED_KEYS.size()) {
|
||||
throw new Exception("Some data is missing - expected: " + REQUIRED_KEYS + ", found: " + snapshot.keySet());
|
||||
}
|
||||
return new SnapshotCloudManager(snapshot);
|
||||
}
|
||||
|
||||
private void init(Map<String, Object> managerState, Map<String, Object> clusterState, Map<String, Object> nodeState,
|
||||
Map<String, Object> distribState) throws Exception {
|
||||
Objects.requireNonNull(managerState);
|
||||
Objects.requireNonNull(clusterState);
|
||||
Objects.requireNonNull(nodeState);
|
||||
Objects.requireNonNull(distribState);
|
||||
this.timeSource = TimeSource.get((String)managerState.getOrDefault("timeSource", "simTime:50"));
|
||||
this.clusterStateProvider = new SnapshotClusterStateProvider(clusterState);
|
||||
this.nodeStateProvider = new SnapshotNodeStateProvider(nodeState);
|
||||
this.distribStateManager = new SnapshotDistribStateManager(distribState);
|
||||
|
||||
SimUtils.checkConsistency(this, null);
|
||||
}
|
||||
|
||||
public Map<String, Object> getSnapshot(boolean withAutoscaling) throws Exception {
|
||||
Map<String, Object> snapshot = new LinkedHashMap<>(4);
|
||||
Map<String, Object> managerState = new HashMap<>();
|
||||
managerState.put("timeSource", timeSource.toString());
|
||||
snapshot.put(MANAGER_STATE_KEY, managerState);
|
||||
|
||||
snapshot.put(CLUSTER_STATE_KEY, clusterStateProvider.getSnapshot());
|
||||
snapshot.put(NODE_STATE_KEY, nodeStateProvider.getSnapshot());
|
||||
snapshot.put(DISTRIB_STATE_KEY, distribStateManager.getSnapshot());
|
||||
if (withAutoscaling) {
|
||||
AutoScalingConfig config = distribStateManager.getAutoScalingConfig();
|
||||
Policy.Session session = config.getPolicy().createSession(this);
|
||||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(config, this);
|
||||
Map<String, Object> diagnostics = new LinkedHashMap<>();
|
||||
PolicyHelper.getDiagnostics(session).toMap(diagnostics);
|
||||
List<Map<String, Object>> suggestionDetails = new ArrayList<>(suggestions.size());
|
||||
suggestions.forEach(s -> {
|
||||
Map<String, Object> map = new LinkedHashMap<>();
|
||||
map.put("suggestion", s);
|
||||
if (s.getOperation() != null) {
|
||||
SolrParams params = s.getOperation().getParams();
|
||||
if (s.getOperation() instanceof V2Request) {
|
||||
params = SimUtils.v2AdminRequestToV1Params((V2Request)s.getOperation());
|
||||
}
|
||||
ReplicaInfo info = nodeStateProvider.getReplicaInfo(
|
||||
params.get(CollectionAdminParams.COLLECTION), params.get("replica"));
|
||||
if (info == null) {
|
||||
log.warn("Can't find ReplicaInfo for suggested operation: " + s);
|
||||
} else {
|
||||
map.put("replica", info);
|
||||
}
|
||||
}
|
||||
suggestionDetails.add(map);
|
||||
});
|
||||
Map<String, Object> autoscaling = new LinkedHashMap<>();
|
||||
autoscaling.put("suggestions", suggestionDetails);
|
||||
autoscaling.put("diagnostics", diagnostics);
|
||||
snapshot.put(AUTOSCALING_STATE_KEY, autoscaling);
|
||||
}
|
||||
snapshot.put(STATISTICS_STATE_KEY, SimUtils.calculateStats(this, distribStateManager.getAutoScalingConfig(), true));
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterStateProvider getClusterStateProvider() {
|
||||
return clusterStateProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeStateProvider getNodeStateProvider() {
|
||||
return nodeStateProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistribStateManager getDistribStateManager() {
|
||||
return distribStateManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DistributedQueueFactory getDistributedQueueFactory() {
|
||||
return NoopDistributedQueueFactory.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectCache getObjectCache() {
|
||||
return objectCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeSource getTimeSource() {
|
||||
return timeSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrResponse request(SolrRequest req) throws IOException {
|
||||
throw new UnsupportedOperationException("request");
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException {
|
||||
throw new UnsupportedOperationException("httpRequest");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.noggit.CharArr;
|
||||
import org.noggit.JSONWriter;
|
||||
|
||||
/**
|
||||
* Read-only snapshot of another {@link ClusterStateProvider}.
|
||||
*/
|
||||
public class SnapshotClusterStateProvider implements ClusterStateProvider {
|
||||
|
||||
final Set<String> liveNodes;
|
||||
final ClusterState clusterState;
|
||||
final Map<String, Object> clusterProperties;
|
||||
|
||||
public SnapshotClusterStateProvider(ClusterStateProvider other) throws Exception {
|
||||
liveNodes = Collections.unmodifiableSet(new HashSet<>(other.getLiveNodes()));
|
||||
ClusterState otherState = other.getClusterState();
|
||||
clusterState = new ClusterState(otherState.getZNodeVersion(), liveNodes, otherState.getCollectionsMap());
|
||||
clusterProperties = new HashMap<>(other.getClusterProperties());
|
||||
}
|
||||
|
||||
public SnapshotClusterStateProvider(Map<String, Object> snapshot) {
|
||||
Objects.requireNonNull(snapshot);
|
||||
liveNodes = Collections.unmodifiableSet(new HashSet<>((Collection<String>)snapshot.getOrDefault("liveNodes", Collections.emptySet())));
|
||||
clusterProperties = (Map<String, Object>)snapshot.getOrDefault("clusterProperties", Collections.emptyMap());
|
||||
Map<String, Object> stateMap = new HashMap<>((Map<String, Object>)snapshot.getOrDefault("clusterState", Collections.emptyMap()));
|
||||
Number version = (Number)stateMap.remove("version");
|
||||
clusterState = ClusterState.load(version != null ? version.intValue() : null, stateMap, liveNodes, ZkStateReader.CLUSTER_STATE);
|
||||
}
|
||||
|
||||
public Map<String, Object> getSnapshot() {
|
||||
Map<String, Object> snapshot = new HashMap<>();
|
||||
snapshot.put("liveNodes", liveNodes);
|
||||
if (clusterProperties != null) {
|
||||
snapshot.put("clusterProperties", clusterProperties);
|
||||
}
|
||||
Map<String, Object> stateMap = new HashMap<>();
|
||||
snapshot.put("clusterState", stateMap);
|
||||
stateMap.put("version", clusterState.getZNodeVersion());
|
||||
clusterState.forEachCollection(coll -> {
|
||||
CharArr out = new CharArr();
|
||||
JSONWriter writer = new JSONWriter(out, 2);
|
||||
coll.write(writer);
|
||||
String json = out.toString();
|
||||
try {
|
||||
stateMap.put(coll.getName(), Utils.fromJSON(json.getBytes("UTF-8")));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException("should not happen!", e);
|
||||
}
|
||||
});
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState.CollectionRef getState(String collection) {
|
||||
return clusterState.getCollectionRef(collection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
return liveNodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> resolveAlias(String alias) {
|
||||
throw new UnsupportedOperationException("resolveAlias");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAliasProperties(String alias) {
|
||||
throw new UnsupportedOperationException("getAliasProperties");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState getClusterState() throws IOException {
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getClusterProperties() {
|
||||
return clusterProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyNameByCollection(String coll) {
|
||||
DocCollection collection = clusterState.getCollectionOrNull(coll);
|
||||
return collection == null ? null : (String)collection.getProperties().get("policy");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,191 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.util.Base64;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.Op;
|
||||
import org.apache.zookeeper.OpResult;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Read-only snapshot of another {@link DistribStateManager}
|
||||
*/
|
||||
public class SnapshotDistribStateManager implements DistribStateManager {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
LinkedHashMap<String, VersionedData> dataMap = new LinkedHashMap<>();
|
||||
|
||||
/**
|
||||
* Populate this instance from another {@link DistribStateManager} instance.
|
||||
* @param other another instance
|
||||
* @param config optional {@link AutoScalingConfig}, which will overwrite any existing config.
|
||||
*/
|
||||
public SnapshotDistribStateManager(DistribStateManager other, AutoScalingConfig config) throws Exception {
|
||||
List<String> tree = other.listTree("/");
|
||||
log.debug("- copying {} resources from {}", tree.size(), other.getClass().getSimpleName());
|
||||
for (String path : tree) {
|
||||
dataMap.put(path, other.getData(path));
|
||||
}
|
||||
if (config != null) { // overwrite existing
|
||||
VersionedData vd = new VersionedData(config.getZkVersion(), Utils.toJSON(config), CreateMode.PERSISTENT, "0");
|
||||
dataMap.put(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, vd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate this instance from a previously generated snapshot.
|
||||
* @param snapshot previous snapshot created using this class.
|
||||
*/
|
||||
public SnapshotDistribStateManager(Map<String, Object> snapshot) {
|
||||
snapshot.forEach((path, value) -> {
|
||||
Map<String, Object> map = (Map<String, Object>)value;
|
||||
Number version = (Number)map.getOrDefault("version", 0);
|
||||
String owner = (String)map.get("owner");
|
||||
String modeStr = (String)map.getOrDefault("mode", CreateMode.PERSISTENT.toString());
|
||||
CreateMode mode = CreateMode.valueOf(modeStr);
|
||||
byte[] bytes = null;
|
||||
if (map.containsKey("data")) {
|
||||
bytes = Base64.base64ToByteArray((String)map.get("data"));
|
||||
}
|
||||
dataMap.put(path, new VersionedData(version.intValue(), bytes, mode, owner));
|
||||
});
|
||||
log.debug("- loaded snapshot of {} resources", dataMap.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a snapshot of all content in this instance.
|
||||
*/
|
||||
public Map<String, Object> getSnapshot() {
|
||||
Map<String, Object> snapshot = new LinkedHashMap<>();
|
||||
dataMap.forEach((path, vd) -> {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
vd.toMap(data);
|
||||
snapshot.put(path, data);
|
||||
});
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasData(String path) throws IOException, KeeperException, InterruptedException {
|
||||
return dataMap.containsKey(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
|
||||
return listData(path, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listTree(String path) {
|
||||
return dataMap.keySet().stream()
|
||||
.filter(p -> p.startsWith(path))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
|
||||
final String prefix = path + "/";
|
||||
return dataMap.entrySet().stream()
|
||||
.filter(e -> e.getKey().startsWith(prefix))
|
||||
.map(e -> {
|
||||
String suffix = e.getKey().substring(prefix.length());
|
||||
int idx = suffix.indexOf('/');
|
||||
if (idx == -1) {
|
||||
return suffix;
|
||||
} else {
|
||||
return suffix.substring(0, idx);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
|
||||
if (!dataMap.containsKey(path)) {
|
||||
throw new NoSuchElementException(path);
|
||||
}
|
||||
return dataMap.get(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
|
||||
throw new UnsupportedOperationException("makePath");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
|
||||
throw new UnsupportedOperationException("makePath");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
|
||||
throw new UnsupportedOperationException("createData");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeData(String path, int version) throws NoSuchElementException, IOException, NotEmptyException, KeeperException, InterruptedException, BadVersionException {
|
||||
throw new UnsupportedOperationException("removeData");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException {
|
||||
throw new UnsupportedOperationException("setData");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
|
||||
throw new UnsupportedOperationException("multi");
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException {
|
||||
VersionedData vd = dataMap.get(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
if (vd != null && vd.getData() != null && vd.getData().length > 0) {
|
||||
map = (Map<String, Object>) Utils.fromJSON(vd.getData());
|
||||
map.put(AutoScalingParams.ZK_VERSION, vd.getVersion());
|
||||
}
|
||||
return new AutoScalingConfig(map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,186 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
|
||||
|
||||
/**
|
||||
* Read-only snapshot of another {@link NodeStateProvider}.
|
||||
*/
|
||||
public class SnapshotNodeStateProvider implements NodeStateProvider {
|
||||
private Map<String, Map<String, Object>> nodeValues = new LinkedHashMap<>();
|
||||
private Map<String, Map<String, Map<String, List<ReplicaInfo>>>> replicaInfos = new LinkedHashMap<>();
|
||||
|
||||
private static double GB = 1024.0d * 1024.0d * 1024.0d;
|
||||
|
||||
/**
|
||||
* Populate this instance from another instance of {@link SolrCloudManager}.
|
||||
* @param other another instance
|
||||
* @param config optional {@link AutoScalingConfig}, which will be used to determine what node and
|
||||
* replica tags to retrieve. If this is null then the other instance's config will be used.
|
||||
*/
|
||||
public SnapshotNodeStateProvider(SolrCloudManager other, AutoScalingConfig config) throws Exception {
|
||||
if (config == null) {
|
||||
config = other.getDistribStateManager().getAutoScalingConfig();
|
||||
}
|
||||
Set<String> nodeTags = new HashSet<>(SimUtils.COMMON_NODE_TAGS);
|
||||
nodeTags.addAll(config.getPolicy().getParams());
|
||||
Set<String> replicaTags = new HashSet<>(SimUtils.COMMON_REPLICA_TAGS);
|
||||
replicaTags.addAll(config.getPolicy().getPerReplicaAttributes());
|
||||
for (String node : other.getClusterStateProvider().getLiveNodes()) {
|
||||
nodeValues.put(node, new LinkedHashMap<>(other.getNodeStateProvider().getNodeValues(node, nodeTags)));
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = other.getNodeStateProvider().getReplicaInfo(node, replicaTags);
|
||||
infos.forEach((collection, shards) -> {
|
||||
shards.forEach((shard, replicas) -> {
|
||||
replicas.forEach(r -> {
|
||||
List<ReplicaInfo> myReplicas = replicaInfos
|
||||
.computeIfAbsent(node, n -> new LinkedHashMap<>())
|
||||
.computeIfAbsent(collection, c -> new LinkedHashMap<>())
|
||||
.computeIfAbsent(shard, s -> new ArrayList<>());
|
||||
Map<String, Object> rMap = new LinkedHashMap<>();
|
||||
r.toMap(rMap);
|
||||
if (r.isLeader) { // ReplicaInfo.toMap doesn't write this!!!
|
||||
((Map<String, Object>)rMap.values().iterator().next()).put("leader", "true");
|
||||
}
|
||||
ReplicaInfo ri = new ReplicaInfo(rMap);
|
||||
// put in "leader" again if present
|
||||
if (r.isLeader) {
|
||||
ri.getVariables().put("leader", "true");
|
||||
}
|
||||
// externally produced snapshots may not include the right units
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) == null) {
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
|
||||
Number indexSizeGB = (Number) ri.getVariable(Variable.Type.CORE_IDX.tagName);
|
||||
ri.getVariables().put(Variable.Type.CORE_IDX.metricsAttribute, indexSizeGB.doubleValue() * GB);
|
||||
} else {
|
||||
throw new RuntimeException("Missing size information for replica: " + ri);
|
||||
}
|
||||
}
|
||||
myReplicas.add(ri);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate this instance from a previously generated snapshot.
|
||||
* @param snapshot previous snapshot created using this class.
|
||||
*/
|
||||
public SnapshotNodeStateProvider(Map<String, Object> snapshot) {
|
||||
Objects.requireNonNull(snapshot);
|
||||
nodeValues = (Map<String, Map<String, Object>>)snapshot.getOrDefault("nodeValues", Collections.emptyMap());
|
||||
((Map<String, Object>)snapshot.getOrDefault("replicaInfos", Collections.emptyMap())).forEach((node, v) -> {
|
||||
Map<String, Map<String, List<ReplicaInfo>>> perNode = replicaInfos.computeIfAbsent(node, n -> new LinkedHashMap<>());
|
||||
((Map<String, Object>)v).forEach((collection, shards) -> {
|
||||
Map<String, List<ReplicaInfo>> perColl = perNode.computeIfAbsent(collection, c -> new LinkedHashMap<>());
|
||||
((Map<String, Object>)shards).forEach((shard, replicas) -> {
|
||||
List<ReplicaInfo> infos = perColl.computeIfAbsent(shard, s -> new ArrayList<>());
|
||||
((List<Map<String, Object>>)replicas).forEach(replicaMap -> {
|
||||
ReplicaInfo ri = new ReplicaInfo(new LinkedHashMap<>(replicaMap)); // constructor modifies this map
|
||||
if (ri.isLeader) {
|
||||
ri.getVariables().put("leader", "true");
|
||||
}
|
||||
// externally produced snapshots may not include the right units
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) == null) {
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
|
||||
Number indexSizeGB = (Number) ri.getVariable(Variable.Type.CORE_IDX.tagName);
|
||||
ri.getVariables().put(Variable.Type.CORE_IDX.metricsAttribute, indexSizeGB.doubleValue() * GB);
|
||||
} else {
|
||||
throw new RuntimeException("Missing size information for replica: " + ri);
|
||||
}
|
||||
}
|
||||
infos.add(ri);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a snapshot of all node and replica tag values available from the original source, per the original
|
||||
* autoscaling configuration. Note:
|
||||
*/
|
||||
public Map<String, Object> getSnapshot() {
|
||||
Map<String, Object> snapshot = new LinkedHashMap<>();
|
||||
snapshot.put("nodeValues", nodeValues);
|
||||
Map<String, Map<String, Map<String, List<Map<String, Object>>>>> replicaInfosMap = new LinkedHashMap<>();
|
||||
snapshot.put("replicaInfos", replicaInfosMap);
|
||||
replicaInfos.forEach((node, perNode) -> {
|
||||
perNode.forEach((collection, shards) -> {
|
||||
shards.forEach((shard, replicas) -> {
|
||||
replicas.forEach(r -> {
|
||||
List<Map<String, Object>> myReplicas = replicaInfosMap
|
||||
.computeIfAbsent(node, n -> new LinkedHashMap<>())
|
||||
.computeIfAbsent(collection, c -> new LinkedHashMap<>())
|
||||
.computeIfAbsent(shard, s -> new ArrayList<>());
|
||||
Map<String, Object> rMap = new LinkedHashMap<>();
|
||||
r.toMap(rMap);
|
||||
if (r.isLeader) { // ReplicaInfo.toMap doesn't write this!!!
|
||||
((Map<String, Object>)rMap.values().iterator().next()).put("leader", "true");
|
||||
}
|
||||
myReplicas.add(rMap);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||
return nodeValues.getOrDefault(node, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
|
||||
return replicaInfos.getOrDefault(node, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public ReplicaInfo getReplicaInfo(String collection, String coreNode) {
|
||||
for (Map<String, Map<String, List<ReplicaInfo>>> perNode : replicaInfos.values()) {
|
||||
for (List<ReplicaInfo> perShard : perNode.getOrDefault(collection, Collections.emptyMap()).values()) {
|
||||
for (ReplicaInfo ri : perShard) {
|
||||
if (ri.getName().equals(coreNode)) {
|
||||
return ri;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
}
|
|
@ -53,11 +53,9 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Scanner;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.ZipEntry;
|
||||
|
@ -99,13 +97,11 @@ import org.apache.solr.client.solrj.SolrClient;
|
|||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueue;
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
|
@ -116,9 +112,13 @@ import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
|
|||
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.autoscaling.sim.NoopDistributedQueueFactory;
|
||||
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
|
||||
import org.apache.solr.cloud.autoscaling.sim.SimUtils;
|
||||
import org.apache.solr.cloud.autoscaling.sim.SnapshotCloudManager;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -132,6 +132,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
|
|||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ContentStreamBase;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
|
@ -858,12 +859,10 @@ public class SolrCLI implements CLIO {
|
|||
}
|
||||
|
||||
|
||||
public static class AutoscalingTool extends SolrCloudTool {
|
||||
public static class AutoscalingTool extends ToolBase {
|
||||
static final String NODE_REDACTION_PREFIX = "N_";
|
||||
static final String COLL_REDACTION_PREFIX = "COLL_";
|
||||
|
||||
private boolean verbose;
|
||||
|
||||
public AutoscalingTool() {
|
||||
this(CLIO.getOutStream());
|
||||
}
|
||||
|
@ -911,6 +910,16 @@ public class SolrCLI implements CLIO {
|
|||
OptionBuilder
|
||||
.withDescription("Show summarized collection & node statistics.")
|
||||
.create("stats"),
|
||||
OptionBuilder
|
||||
.withDescription("Store autoscaling snapshot of the current cluster.")
|
||||
.withArgName("DIR")
|
||||
.hasArg()
|
||||
.create("save"),
|
||||
OptionBuilder
|
||||
.withDescription("Load autoscaling snapshot of the cluster instead of using the real one.")
|
||||
.withArgName("DIR")
|
||||
.hasArg()
|
||||
.create("load"),
|
||||
OptionBuilder
|
||||
.withDescription("Simulate execution of all suggestions.")
|
||||
.create("simulate"),
|
||||
|
@ -920,6 +929,12 @@ public class SolrCLI implements CLIO {
|
|||
.hasArg()
|
||||
.withLongOpt("iterations")
|
||||
.create("i"),
|
||||
OptionBuilder
|
||||
.withDescription("Save autoscaling shapshots at each step of simulated execution.")
|
||||
.withArgName("DIR")
|
||||
.withLongOpt("saveSimulated")
|
||||
.hasArg()
|
||||
.create("ss"),
|
||||
OptionBuilder
|
||||
.withDescription("Turn on all options to get all available information.")
|
||||
.create("all")
|
||||
|
@ -932,96 +947,109 @@ public class SolrCLI implements CLIO {
|
|||
return "autoscaling";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runCloudTool(CloudSolrClient cloudSolrClient, CommandLine cli) throws Exception {
|
||||
DistributedQueueFactory dummmyFactory = new DistributedQueueFactory() {
|
||||
@Override
|
||||
public DistributedQueue makeQueue(String path) throws IOException {
|
||||
throw new UnsupportedOperationException("makeQueue");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(String path) throws IOException {
|
||||
throw new UnsupportedOperationException("removeQueue");
|
||||
}
|
||||
};
|
||||
try (SolrClientCloudManager realCloudManager = new SolrClientCloudManager(dummmyFactory, cloudSolrClient)) {
|
||||
AutoScalingConfig config = null;
|
||||
HashSet<String> liveNodes = new HashSet<>();
|
||||
String configFile = cli.getOptionValue("a");
|
||||
if (configFile != null) {
|
||||
if (verbose) {
|
||||
log.info("- reading autoscaling config from " + configFile);
|
||||
}
|
||||
config = new AutoScalingConfig(IOUtils.toByteArray(new FileInputStream(configFile)));
|
||||
} else {
|
||||
if (verbose) {
|
||||
log.info("- reading autoscaling config from the cluster.");
|
||||
}
|
||||
config = realCloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||
}
|
||||
// freeze the cluster state
|
||||
SimCloudManager cloudManager = SimCloudManager.createCluster(realCloudManager, TimeSource.get("simTime:50"));
|
||||
liveNodes.addAll(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
boolean withSuggestions = cli.hasOption("s");
|
||||
boolean withDiagnostics = cli.hasOption("d") || cli.hasOption("n");
|
||||
boolean withSortedNodes = cli.hasOption("n");
|
||||
boolean withClusterState = cli.hasOption("c");
|
||||
boolean withStats = cli.hasOption("stats");
|
||||
boolean redact = cli.hasOption("r");
|
||||
if (cli.hasOption("all")) {
|
||||
withSuggestions = true;
|
||||
withDiagnostics = true;
|
||||
withSortedNodes = true;
|
||||
withClusterState = true;
|
||||
withStats = true;
|
||||
}
|
||||
// prepare to redact also host names / IPs in base_url and other properties
|
||||
Set<String> redactNames = new HashSet<>();
|
||||
for (String nodeName : liveNodes) {
|
||||
String urlString = Utils.getBaseUrlForNodeName(nodeName, "http");
|
||||
try {
|
||||
URL u = new URL(urlString);
|
||||
// protocol format
|
||||
redactNames.add(u.getHost() + ":" + u.getPort());
|
||||
// node name format
|
||||
redactNames.add(u.getHost() + "_" + u.getPort() + "_");
|
||||
} catch (MalformedURLException e) {
|
||||
log.warn("Invalid URL for node name " + nodeName + ", replacing including protocol and path", e);
|
||||
redactNames.add(urlString);
|
||||
redactNames.add(Utils.getBaseUrlForNodeName(nodeName, "https"));
|
||||
}
|
||||
}
|
||||
// redact collection names too
|
||||
Set<String> redactCollections = new HashSet<>();
|
||||
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
|
||||
clusterState.forEachCollection(coll -> redactCollections.add(coll.getName()));
|
||||
if (!withSuggestions && !withDiagnostics) {
|
||||
withSuggestions = true;
|
||||
}
|
||||
Map<String, Object> results = prepareResults(cloudManager, config, withClusterState,
|
||||
withStats, withSuggestions, withSortedNodes, withDiagnostics);
|
||||
if (cli.hasOption("simulate")) {
|
||||
String iterStr = cli.getOptionValue("i", "10");
|
||||
int iterations;
|
||||
try {
|
||||
iterations = Integer.parseInt(iterStr);
|
||||
} catch (Exception e) {
|
||||
log.warn("Invalid option 'i' value, using default 10:" + e);
|
||||
iterations = 10;
|
||||
}
|
||||
Map<String, Object> simulationResults = new HashMap<>();
|
||||
simulate(cloudManager, config, simulationResults, withClusterState,
|
||||
withStats, withSuggestions, withSortedNodes, withDiagnostics, iterations);
|
||||
results.put("simulation", simulationResults);
|
||||
}
|
||||
String data = Utils.toJSONString(results);
|
||||
if (redact) {
|
||||
data = RedactionUtils.redactNames(redactCollections, COLL_REDACTION_PREFIX, data);
|
||||
data = RedactionUtils.redactNames(redactNames, NODE_REDACTION_PREFIX, data);
|
||||
}
|
||||
stdout.println(data);
|
||||
protected void runImpl(CommandLine cli) throws Exception {
|
||||
raiseLogLevelUnlessVerbose(cli);
|
||||
SnapshotCloudManager cloudManager;
|
||||
AutoScalingConfig config = null;
|
||||
String configFile = cli.getOptionValue("a");
|
||||
if (configFile != null) {
|
||||
CLIO.err("- reading autoscaling config from " + configFile);
|
||||
config = new AutoScalingConfig(IOUtils.toByteArray(new FileInputStream(configFile)));
|
||||
}
|
||||
if (cli.hasOption("load")) {
|
||||
File sourceDir = new File(cli.getOptionValue("load"));
|
||||
CLIO.err("- loading autoscaling snapshot from " + sourceDir.getAbsolutePath());
|
||||
cloudManager = SnapshotCloudManager.readSnapshot(sourceDir);
|
||||
if (config == null) {
|
||||
CLIO.err("- reading autoscaling config from the snapshot.");
|
||||
config = cloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||
}
|
||||
} else {
|
||||
String zkHost = cli.getOptionValue("zkHost", ZK_HOST);
|
||||
|
||||
log.debug("Connecting to Solr cluster: " + zkHost);
|
||||
try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkHost), Optional.empty()).build()) {
|
||||
|
||||
String collection = cli.getOptionValue("collection");
|
||||
if (collection != null)
|
||||
cloudSolrClient.setDefaultCollection(collection);
|
||||
|
||||
cloudSolrClient.connect();
|
||||
try (SolrClientCloudManager realCloudManager = new SolrClientCloudManager(NoopDistributedQueueFactory.INSTANCE, cloudSolrClient)) {
|
||||
if (config == null) {
|
||||
CLIO.err("- reading autoscaling config from the cluster.");
|
||||
config = realCloudManager.getDistribStateManager().getAutoScalingConfig();
|
||||
}
|
||||
cloudManager = new SnapshotCloudManager(realCloudManager, config);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cli.hasOption("save")) {
|
||||
File targetDir = new File(cli.getOptionValue("save"));
|
||||
cloudManager.saveSnapshot(targetDir, true);
|
||||
CLIO.err("- saved autoscaling snapshot to " + targetDir.getAbsolutePath());
|
||||
}
|
||||
HashSet<String> liveNodes = new HashSet<>();
|
||||
liveNodes.addAll(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
boolean withSuggestions = cli.hasOption("s");
|
||||
boolean withDiagnostics = cli.hasOption("d") || cli.hasOption("n");
|
||||
boolean withSortedNodes = cli.hasOption("n");
|
||||
boolean withClusterState = cli.hasOption("c");
|
||||
boolean withStats = cli.hasOption("stats");
|
||||
boolean redact = cli.hasOption("r");
|
||||
if (cli.hasOption("all")) {
|
||||
withSuggestions = true;
|
||||
withDiagnostics = true;
|
||||
withSortedNodes = true;
|
||||
withClusterState = true;
|
||||
withStats = true;
|
||||
}
|
||||
// prepare to redact also host names / IPs in base_url and other properties
|
||||
Set<String> redactNames = new HashSet<>();
|
||||
for (String nodeName : liveNodes) {
|
||||
String urlString = Utils.getBaseUrlForNodeName(nodeName, "http");
|
||||
try {
|
||||
URL u = new URL(urlString);
|
||||
// protocol format
|
||||
redactNames.add(u.getHost() + ":" + u.getPort());
|
||||
// node name format
|
||||
redactNames.add(u.getHost() + "_" + u.getPort() + "_");
|
||||
} catch (MalformedURLException e) {
|
||||
log.warn("Invalid URL for node name " + nodeName + ", replacing including protocol and path", e);
|
||||
redactNames.add(urlString);
|
||||
redactNames.add(Utils.getBaseUrlForNodeName(nodeName, "https"));
|
||||
}
|
||||
}
|
||||
// redact collection names too
|
||||
Set<String> redactCollections = new HashSet<>();
|
||||
ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
|
||||
clusterState.forEachCollection(coll -> redactCollections.add(coll.getName()));
|
||||
if (!withSuggestions && !withDiagnostics) {
|
||||
withSuggestions = true;
|
||||
}
|
||||
Map<String, Object> results = prepareResults(cloudManager, config, withClusterState,
|
||||
withStats, withSuggestions, withSortedNodes, withDiagnostics);
|
||||
if (cli.hasOption("simulate")) {
|
||||
String iterStr = cli.getOptionValue("i", "10");
|
||||
String saveSimulated = cli.getOptionValue("saveSimulated");
|
||||
int iterations;
|
||||
try {
|
||||
iterations = Integer.parseInt(iterStr);
|
||||
} catch (Exception e) {
|
||||
log.warn("Invalid option 'i' value, using default 10:" + e);
|
||||
iterations = 10;
|
||||
}
|
||||
Map<String, Object> simulationResults = new HashMap<>();
|
||||
simulate(cloudManager, config, simulationResults, saveSimulated, withClusterState,
|
||||
withStats, withSuggestions, withSortedNodes, withDiagnostics, iterations);
|
||||
results.put("simulation", simulationResults);
|
||||
}
|
||||
String data = Utils.toJSONString(results);
|
||||
if (redact) {
|
||||
data = RedactionUtils.redactNames(redactCollections, COLL_REDACTION_PREFIX, data);
|
||||
data = RedactionUtils.redactNames(redactNames, NODE_REDACTION_PREFIX, data);
|
||||
}
|
||||
stdout.println(data);
|
||||
}
|
||||
|
||||
private Map<String, Object> prepareResults(SolrCloudManager clientCloudManager,
|
||||
|
@ -1033,23 +1061,24 @@ public class SolrCLI implements CLIO {
|
|||
boolean withDiagnostics) throws Exception {
|
||||
Policy.Session session = config.getPolicy().createSession(clientCloudManager);
|
||||
ClusterState clusterState = clientCloudManager.getClusterStateProvider().getClusterState();
|
||||
if (verbose) {
|
||||
log.info("- calculating suggestions...");
|
||||
List<Suggester.SuggestionInfo> suggestions = Collections.emptyList();
|
||||
long start, end;
|
||||
if (withSuggestions) {
|
||||
CLIO.err("- calculating suggestions...");
|
||||
start = TimeSource.NANO_TIME.getTimeNs();
|
||||
suggestions = PolicyHelper.getSuggestions(config, clientCloudManager);
|
||||
end = TimeSource.NANO_TIME.getTimeNs();
|
||||
CLIO.err(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)");
|
||||
}
|
||||
long start = TimeSource.NANO_TIME.getTimeNs();
|
||||
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(config, clientCloudManager);
|
||||
long end = TimeSource.NANO_TIME.getTimeNs();
|
||||
if (verbose) {
|
||||
log.info(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)");
|
||||
log.info("- calculating diagnostics...");
|
||||
}
|
||||
start = TimeSource.NANO_TIME.getTimeNs();
|
||||
MapWriter mw = PolicyHelper.getDiagnostics(session);
|
||||
Map<String, Object> diagnostics = new LinkedHashMap<>();
|
||||
mw.toMap(diagnostics);
|
||||
end = TimeSource.NANO_TIME.getTimeNs();
|
||||
if (verbose) {
|
||||
log.info(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)");
|
||||
Map<String, Object> diagnostics = Collections.emptyMap();
|
||||
if (withDiagnostics) {
|
||||
CLIO.err("- calculating diagnostics...");
|
||||
start = TimeSource.NANO_TIME.getTimeNs();
|
||||
MapWriter mw = PolicyHelper.getDiagnostics(session);
|
||||
diagnostics = new LinkedHashMap<>();
|
||||
mw.toMap(diagnostics);
|
||||
end = TimeSource.NANO_TIME.getTimeNs();
|
||||
CLIO.err(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)");
|
||||
}
|
||||
Map<String, Object> results = new LinkedHashMap<>();
|
||||
if (withClusterState) {
|
||||
|
@ -1060,131 +1089,7 @@ public class SolrCLI implements CLIO {
|
|||
results.put("CLUSTERSTATE", map);
|
||||
}
|
||||
if (withStats) {
|
||||
Map<String, Map<String, Number>> collStats = new TreeMap<>();
|
||||
clusterState.forEachCollection(coll -> {
|
||||
Map<String, Number> perColl = collStats.computeIfAbsent(coll.getName(), n -> new LinkedHashMap<>());
|
||||
AtomicInteger numCores = new AtomicInteger();
|
||||
HashMap<String, Map<String, AtomicInteger>> nodes = new HashMap<>();
|
||||
coll.getSlices().forEach(s -> {
|
||||
numCores.addAndGet(s.getReplicas().size());
|
||||
s.getReplicas().forEach(r -> {
|
||||
nodes.computeIfAbsent(r.getNodeName(), n -> new HashMap<>())
|
||||
.computeIfAbsent(s.getName(), slice -> new AtomicInteger()).incrementAndGet();
|
||||
});
|
||||
});
|
||||
int maxCoresPerNode = 0;
|
||||
int minCoresPerNode = 0;
|
||||
int maxActualShardsPerNode = 0;
|
||||
int minActualShardsPerNode = 0;
|
||||
int maxShardReplicasPerNode = 0;
|
||||
int minShardReplicasPerNode = 0;
|
||||
if (!nodes.isEmpty()) {
|
||||
minCoresPerNode = Integer.MAX_VALUE;
|
||||
minActualShardsPerNode = Integer.MAX_VALUE;
|
||||
minShardReplicasPerNode = Integer.MAX_VALUE;
|
||||
for (Map<String, AtomicInteger> counts : nodes.values()) {
|
||||
int total = counts.values().stream().mapToInt(c -> c.get()).sum();
|
||||
for (AtomicInteger count : counts.values()) {
|
||||
if (count.get() > maxShardReplicasPerNode) {
|
||||
maxShardReplicasPerNode = count.get();
|
||||
}
|
||||
if (count.get() < minShardReplicasPerNode) {
|
||||
minShardReplicasPerNode = count.get();
|
||||
}
|
||||
}
|
||||
if (total > maxCoresPerNode) {
|
||||
maxCoresPerNode = total;
|
||||
}
|
||||
if (total < minCoresPerNode) {
|
||||
minCoresPerNode = total;
|
||||
}
|
||||
if (counts.size() > maxActualShardsPerNode) {
|
||||
maxActualShardsPerNode = counts.size();
|
||||
}
|
||||
if (counts.size() < minActualShardsPerNode) {
|
||||
minActualShardsPerNode = counts.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
perColl.put("activeShards", coll.getActiveSlices().size());
|
||||
perColl.put("inactiveShards", coll.getSlices().size() - coll.getActiveSlices().size());
|
||||
perColl.put("rf", coll.getReplicationFactor());
|
||||
perColl.put("maxShardsPerNode", coll.getMaxShardsPerNode());
|
||||
perColl.put("maxActualShardsPerNode", maxActualShardsPerNode);
|
||||
perColl.put("minActualShardsPerNode", minActualShardsPerNode);
|
||||
perColl.put("maxShardReplicasPerNode", maxShardReplicasPerNode);
|
||||
perColl.put("minShardReplicasPerNode", minShardReplicasPerNode);
|
||||
perColl.put("numCores", numCores.get());
|
||||
perColl.put("numNodes", nodes.size());
|
||||
perColl.put("maxCoresPerNode", maxCoresPerNode);
|
||||
perColl.put("minCoresPerNode", minCoresPerNode);
|
||||
});
|
||||
Map<String, Map<String, Object>> nodeStats = new TreeMap<>();
|
||||
Map<Integer, AtomicInteger> coreStats = new TreeMap<>();
|
||||
for (Row row : session.getSortedNodes()) {
|
||||
Map<String, Object> nodeStat = nodeStats.computeIfAbsent(row.node, n -> new LinkedHashMap<>());
|
||||
nodeStat.put("isLive", row.isLive());
|
||||
nodeStat.put("freedisk", row.getVal("freedisk", 0));
|
||||
nodeStat.put("totaldisk", row.getVal("totaldisk", 0));
|
||||
int cores = ((Number)row.getVal("cores", 0)).intValue();
|
||||
nodeStat.put("cores", cores);
|
||||
coreStats.computeIfAbsent(cores, num -> new AtomicInteger()).incrementAndGet();
|
||||
Map<String, Map<String, Map<String, Object>>> collReplicas = new TreeMap<>();
|
||||
row.forEachReplica(ri -> {
|
||||
Map<String, Object> perReplica = collReplicas.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
|
||||
.computeIfAbsent(ri.getCore().substring(ri.getCollection().length() + 1), core -> new LinkedHashMap<>());
|
||||
// if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
|
||||
// perReplica.put(Variable.Type.CORE_IDX.tagName, ri.getVariable(Variable.Type.CORE_IDX.tagName));
|
||||
// }
|
||||
if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) != null) {
|
||||
perReplica.put(Variable.Type.CORE_IDX.metricsAttribute, ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute));
|
||||
}
|
||||
perReplica.put("coreNode", ri.getName());
|
||||
if (ri.getBool("leader", false)) {
|
||||
perReplica.put("leader", true);
|
||||
Double totalSize = (Double)collStats.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent("avgShardSize", size -> 0.0);
|
||||
Number riSize = (Number)ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute);
|
||||
if (riSize != null) {
|
||||
totalSize += riSize.doubleValue();
|
||||
collStats.get(ri.getCollection()).put("avgShardSize", totalSize);
|
||||
Double max = (Double)collStats.get(ri.getCollection()).get("maxShardSize");
|
||||
if (max == null) max = 0.0;
|
||||
if (riSize.doubleValue() > max) {
|
||||
collStats.get(ri.getCollection()).put("maxShardSize", riSize.doubleValue());
|
||||
}
|
||||
Double min = (Double)collStats.get(ri.getCollection()).get("minShardSize");
|
||||
if (min == null) min = Double.MAX_VALUE;
|
||||
if (riSize.doubleValue() < min) {
|
||||
collStats.get(ri.getCollection()).put("minShardSize", riSize.doubleValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeStat.put("replicas", collReplicas);
|
||||
});
|
||||
}
|
||||
|
||||
// calculate average per shard and convert the units
|
||||
for (Map<String, Number> perColl : collStats.values()) {
|
||||
Number avg = perColl.get("avgShardSize");
|
||||
if (avg != null) {
|
||||
avg = avg.doubleValue() / perColl.get("activeShards").doubleValue();
|
||||
perColl.put("avgShardSize", (Number)Variable.Type.CORE_IDX.convertVal(avg));
|
||||
}
|
||||
Number num = perColl.get("maxShardSize");
|
||||
if (num != null) {
|
||||
perColl.put("maxShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num));
|
||||
}
|
||||
num = perColl.get("minShardSize");
|
||||
if (num != null) {
|
||||
perColl.put("minShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num));
|
||||
}
|
||||
}
|
||||
Map<String, Object> stats = new LinkedHashMap<>();
|
||||
results.put("STATISTICS", stats);
|
||||
stats.put("coresPerNodes", coreStats);
|
||||
stats.put("nodeStats", nodeStats);
|
||||
stats.put("collectionStats", collStats);
|
||||
results.put("STATISTICS", SimUtils.calculateStats(clientCloudManager, config, verbose));
|
||||
}
|
||||
if (withSuggestions) {
|
||||
results.put("SUGGESTIONS", suggestions);
|
||||
|
@ -1198,43 +1103,126 @@ public class SolrCLI implements CLIO {
|
|||
return results;
|
||||
}
|
||||
|
||||
private void simulate(SimCloudManager simCloudManager,
|
||||
|
||||
private void simulate(SolrCloudManager cloudManager,
|
||||
AutoScalingConfig config,
|
||||
Map<String, Object> results,
|
||||
String saveSimulated,
|
||||
boolean withClusterState,
|
||||
boolean withStats,
|
||||
boolean withSuggestions,
|
||||
boolean withSortedNodes,
|
||||
boolean withDiagnostics, int iterations) throws Exception {
|
||||
int loop = iterations;
|
||||
File saveDir = null;
|
||||
if (saveSimulated != null) {
|
||||
saveDir = new File(saveSimulated);
|
||||
if (!saveDir.exists()) {
|
||||
if (!saveDir.mkdirs()) {
|
||||
throw new Exception("Unable to create 'saveSimulated' directory: " + saveDir.getAbsolutePath());
|
||||
}
|
||||
} else if (!saveDir.isDirectory()) {
|
||||
throw new Exception("'saveSimulated' path exists and is not a directory! " + saveDir.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
int SPEED = 50;
|
||||
SimCloudManager simCloudManager = SimCloudManager.createCluster(cloudManager, config, TimeSource.get("simTime:" + SPEED));
|
||||
int loop = 0;
|
||||
List<Suggester.SuggestionInfo> suggestions = Collections.emptyList();
|
||||
Map<String, Object> intermediate = new LinkedHashMap<>();
|
||||
results.put("intermediateSuggestions", intermediate);
|
||||
while (loop-- > 0) {
|
||||
results.put("intermediate", intermediate);
|
||||
while (loop < iterations) {
|
||||
LinkedHashMap<String, Object> perStep = new LinkedHashMap<>();
|
||||
long start = TimeSource.NANO_TIME.getTimeNs();
|
||||
suggestions = PolicyHelper.getSuggestions(config, simCloudManager);
|
||||
log.info("-- step " + (iterations - loop) + ", " + suggestions.size() + " suggestions.");
|
||||
CLIO.err("-- step " + loop + ", " + suggestions.size() + " suggestions.");
|
||||
long end = TimeSource.NANO_TIME.getTimeNs();
|
||||
CLIO.err(" - calculated in " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms (real time ≈ simulated time)");
|
||||
if (suggestions.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
intermediate.put("step" + (iterations - loop), suggestions);
|
||||
SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(simCloudManager, config);
|
||||
if (saveDir != null) {
|
||||
File target = new File(saveDir, "step" + loop + "_start");
|
||||
snapshotCloudManager.saveSnapshot(target, true);
|
||||
}
|
||||
if (verbose) {
|
||||
Map<String, Object> snapshot = snapshotCloudManager.getSnapshot(false);
|
||||
snapshot.remove(SnapshotCloudManager.DISTRIB_STATE_KEY);
|
||||
snapshot.remove(SnapshotCloudManager.MANAGER_STATE_KEY);
|
||||
perStep.put("snapshotStart", snapshot);
|
||||
}
|
||||
intermediate.put("step" + loop, perStep);
|
||||
int unresolvedCount = 0;
|
||||
start = TimeSource.NANO_TIME.getTimeNs();
|
||||
List<Map<String, Object>> perStepOps = new ArrayList<>(suggestions.size());
|
||||
if (withSuggestions) {
|
||||
perStep.put("suggestions", suggestions);
|
||||
perStep.put("opDetails", perStepOps);
|
||||
}
|
||||
for (Suggester.SuggestionInfo suggestion : suggestions) {
|
||||
SolrRequest operation = suggestion.getOperation();
|
||||
if (operation == null) {
|
||||
unresolvedCount++;
|
||||
if (suggestion.getViolation() == null) {
|
||||
log.info(" - ignoring suggestion without violation and without operation: " + suggestion);
|
||||
CLIO.err(" - ignoring suggestion without violation and without operation: " + suggestion);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
simCloudManager.request(operation);
|
||||
SolrParams params = operation.getParams();
|
||||
if (operation instanceof V2Request) {
|
||||
params = SimUtils.v2AdminRequestToV1Params((V2Request)operation);
|
||||
}
|
||||
Map<String, Object> paramsMap = new LinkedHashMap<>();
|
||||
params.toMap(paramsMap);
|
||||
ReplicaInfo info = simCloudManager.getSimClusterStateProvider().simGetReplicaInfo(
|
||||
params.get(CollectionAdminParams.COLLECTION), params.get("replica"));
|
||||
if (info == null) {
|
||||
CLIO.err("Could not find ReplicaInfo for params: " + params);
|
||||
} else if (verbose) {
|
||||
paramsMap.put("replicaInfo", info);
|
||||
} else if (info.getVariable(Variable.Type.CORE_IDX.tagName) != null) {
|
||||
paramsMap.put(Variable.Type.CORE_IDX.tagName, info.getVariable(Variable.Type.CORE_IDX.tagName));
|
||||
}
|
||||
if (withSuggestions) {
|
||||
perStepOps.add(paramsMap);
|
||||
}
|
||||
try {
|
||||
simCloudManager.request(operation);
|
||||
} catch (Exception e) {
|
||||
CLIO.err("Aborting - error executing suggestion " + suggestion + ": " + e);
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("suggestion", suggestion);
|
||||
error.put("replicaInfo", info);
|
||||
error.put("exception", e);
|
||||
perStep.put("error", error);
|
||||
break;
|
||||
}
|
||||
}
|
||||
end = TimeSource.NANO_TIME.getTimeNs();
|
||||
long realTime = TimeUnit.NANOSECONDS.toMillis(end - start);
|
||||
long simTime = realTime * SPEED;
|
||||
CLIO.err(" - executed in " + realTime + " ms (real time), " + simTime + " ms (simulated time)");
|
||||
if (unresolvedCount == suggestions.size()) {
|
||||
log.info("--- aborting simulation, only unresolved violations remain");
|
||||
CLIO.err("--- aborting simulation, only unresolved violations remain");
|
||||
break;
|
||||
}
|
||||
if (withStats) {
|
||||
perStep.put("statsExecutionStop", SimUtils.calculateStats(simCloudManager, config, verbose));
|
||||
}
|
||||
snapshotCloudManager = new SnapshotCloudManager(simCloudManager, config);
|
||||
if (saveDir != null) {
|
||||
File target = new File(saveDir, "step" + loop + "_stop");
|
||||
snapshotCloudManager.saveSnapshot(target, true);
|
||||
}
|
||||
if (verbose) {
|
||||
Map<String, Object> snapshot = snapshotCloudManager.getSnapshot(false);
|
||||
snapshot.remove(SnapshotCloudManager.DISTRIB_STATE_KEY);
|
||||
snapshot.remove(SnapshotCloudManager.MANAGER_STATE_KEY);
|
||||
perStep.put("snapshotStop", snapshot);
|
||||
}
|
||||
loop++;
|
||||
}
|
||||
if (loop == 0 && !suggestions.isEmpty()) {
|
||||
if (loop == iterations && !suggestions.isEmpty()) {
|
||||
CLIO.err("### Failed to apply all suggestions in " + iterations + " steps. Remaining suggestions: " + suggestions + "\n");
|
||||
}
|
||||
results.put("finalState", prepareResults(simCloudManager, config, withClusterState, withStats,
|
||||
|
|
|
@ -107,7 +107,7 @@ public class TestSimClusterStateProvider extends SolrCloudTestCase {
|
|||
|
||||
if (simulated) {
|
||||
// initialize simulated provider
|
||||
SimCloudManager simCloudManager = SimCloudManager.createCluster(realManager, TimeSource.get("simTime:10"));
|
||||
SimCloudManager simCloudManager = SimCloudManager.createCluster(realManager, null, TimeSource.get("simTime:10"));
|
||||
// simCloudManager.getSimClusterStateProvider().simSetClusterProperties(clusterProperties);
|
||||
// simCloudManager.getSimDistribStateManager().simSetAutoScalingConfig(autoScalingConfig);
|
||||
// nodeValues.forEach((n, values) -> {
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling.sim;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.params.CollectionAdminParams;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TestSnapshotCloudManager extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static int NODE_COUNT = 3;
|
||||
|
||||
private static SolrCloudManager realManager;
|
||||
|
||||
// set up a real cluster as the source of test data
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(NODE_COUNT)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2, 0, 1)
|
||||
.process(cluster.getSolrClient());
|
||||
realManager = cluster.getJettySolrRunner(cluster.getJettySolrRunners().size() - 1).getCoreContainer()
|
||||
.getZkController().getSolrCloudManager();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshots() throws Exception {
|
||||
SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(realManager, null);
|
||||
Map<String, Object> snapshot = snapshotCloudManager.getSnapshot(true);
|
||||
SnapshotCloudManager snapshotCloudManager1 = new SnapshotCloudManager(snapshot);
|
||||
assertClusterStateEquals(realManager.getClusterStateProvider().getClusterState(), snapshotCloudManager.getClusterStateProvider().getClusterState());
|
||||
assertClusterStateEquals(realManager.getClusterStateProvider().getClusterState(), snapshotCloudManager1.getClusterStateProvider().getClusterState());
|
||||
// this will always fail because the metrics will be already different
|
||||
// assertNodeStateProvider(realManager, snapshotCloudManager);
|
||||
assertNodeStateProvider(snapshotCloudManager, snapshotCloudManager1);
|
||||
assertDistribStateManager(snapshotCloudManager.getDistribStateManager(), snapshotCloudManager1.getDistribStateManager());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistance() throws Exception {
|
||||
Path tmpPath = createTempDir();
|
||||
File tmpDir = tmpPath.toFile();
|
||||
SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(realManager, null);
|
||||
snapshotCloudManager.saveSnapshot(tmpDir, true);
|
||||
SnapshotCloudManager snapshotCloudManager1 = SnapshotCloudManager.readSnapshot(tmpDir);
|
||||
assertClusterStateEquals(snapshotCloudManager.getClusterStateProvider().getClusterState(), snapshotCloudManager1.getClusterStateProvider().getClusterState());
|
||||
assertNodeStateProvider(snapshotCloudManager, snapshotCloudManager1);
|
||||
assertDistribStateManager(snapshotCloudManager.getDistribStateManager(), snapshotCloudManager1.getDistribStateManager());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimulatorFromSnapshot() throws Exception {
|
||||
Path tmpPath = createTempDir();
|
||||
File tmpDir = tmpPath.toFile();
|
||||
SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(realManager, null);
|
||||
snapshotCloudManager.saveSnapshot(tmpDir, true);
|
||||
SnapshotCloudManager snapshotCloudManager1 = SnapshotCloudManager.readSnapshot(tmpDir);
|
||||
try (SimCloudManager simCloudManager = SimCloudManager.createCluster(snapshotCloudManager1, null, TimeSource.get("simTime:50"))) {
|
||||
assertClusterStateEquals(snapshotCloudManager.getClusterStateProvider().getClusterState(), simCloudManager.getClusterStateProvider().getClusterState());
|
||||
assertNodeStateProvider(snapshotCloudManager, simCloudManager);
|
||||
assertDistribStateManager(snapshotCloudManager.getDistribStateManager(), simCloudManager.getDistribStateManager());
|
||||
ClusterState state = simCloudManager.getClusterStateProvider().getClusterState();
|
||||
Replica r = state.getCollection(CollectionAdminParams.SYSTEM_COLL).getReplicas().get(0);
|
||||
// get another node
|
||||
String target = null;
|
||||
for (String node : simCloudManager.getClusterStateProvider().getLiveNodes()) {
|
||||
if (!node.equals(r.getNodeName())) {
|
||||
target = node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (target == null) {
|
||||
fail("can't find suitable target node for replica " + r + ", liveNodes=" + simCloudManager.getClusterStateProvider().getLiveNodes());
|
||||
}
|
||||
CollectionAdminRequest.MoveReplica moveReplica = CollectionAdminRequest
|
||||
.moveReplica(CollectionAdminParams.SYSTEM_COLL, r.getName(), target);
|
||||
log.info("################");
|
||||
simCloudManager.simGetSolrClient().request(moveReplica);
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertNodeStateProvider(SolrCloudManager oneMgr, SolrCloudManager twoMgr) throws Exception {
|
||||
NodeStateProvider one = oneMgr.getNodeStateProvider();
|
||||
NodeStateProvider two = twoMgr.getNodeStateProvider();
|
||||
for (String node : oneMgr.getClusterStateProvider().getLiveNodes()) {
|
||||
Map<String, Object> oneVals = one.getNodeValues(node, SimUtils.COMMON_NODE_TAGS);
|
||||
Map<String, Object> twoVals = two.getNodeValues(node, SimUtils.COMMON_NODE_TAGS);
|
||||
oneVals = Utils.getDeepCopy(oneVals, 10, false, true);
|
||||
twoVals = Utils.getDeepCopy(twoVals, 10, false, true);
|
||||
assertEquals(Utils.toJSONString(oneVals), Utils.toJSONString(twoVals));
|
||||
Map<String, Map<String, List<ReplicaInfo>>> oneInfos = one.getReplicaInfo(node, SimUtils.COMMON_REPLICA_TAGS);
|
||||
Map<String, Map<String, List<ReplicaInfo>>> twoInfos = two.getReplicaInfo(node, SimUtils.COMMON_REPLICA_TAGS);
|
||||
assertEquals(Utils.fromJSON(Utils.toJSON(oneInfos)), Utils.fromJSON(Utils.toJSON(twoInfos)));
|
||||
}
|
||||
}
|
||||
|
||||
// ignore these because SimCloudManager always modifies them
|
||||
private static final Set<Pattern> IGNORE_PATTERNS = new HashSet<>(Arrays.asList(
|
||||
Pattern.compile("/autoscaling/triggerState.*"),
|
||||
Pattern.compile("/clusterstate\\.json"), // different format in SimClusterStateProvider
|
||||
Pattern.compile("/collections/[^/]+?/leader_elect/.*"),
|
||||
Pattern.compile("/collections/[^/]+?/leaders/.*"),
|
||||
Pattern.compile("/live_nodes/.*")
|
||||
));
|
||||
|
||||
private static final Predicate<String> FILTER_FUN = p -> {
|
||||
for (Pattern pattern : IGNORE_PATTERNS) {
|
||||
if (pattern.matcher(p).matches()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
private static void assertDistribStateManager(DistribStateManager one, DistribStateManager two) throws Exception {
|
||||
List<String> treeOne = new ArrayList<>(one.listTree("/").stream()
|
||||
.filter(FILTER_FUN).collect(Collectors.toList()));
|
||||
List<String> treeTwo = new ArrayList<>(two.listTree("/").stream()
|
||||
.filter(FILTER_FUN).collect(Collectors.toList()));
|
||||
Collections.sort(treeOne);
|
||||
Collections.sort(treeTwo);
|
||||
assertEquals(treeOne, treeTwo);
|
||||
for (String path : treeOne) {
|
||||
VersionedData vd1 = one.getData(path);
|
||||
VersionedData vd2 = two.getData(path);
|
||||
assertEquals(path, vd1, vd2);
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertClusterStateEquals(ClusterState one, ClusterState two) {
|
||||
assertEquals(one.getLiveNodes(), two.getLiveNodes());
|
||||
assertEquals(one.getCollectionsMap().keySet(), two.getCollectionsMap().keySet());
|
||||
one.forEachCollection(oneColl -> {
|
||||
DocCollection twoColl = two.getCollection(oneColl.getName());
|
||||
Map<String, Slice> oneSlices = oneColl.getSlicesMap();
|
||||
Map<String, Slice> twoSlices = twoColl.getSlicesMap();
|
||||
assertEquals(oneSlices.keySet(), twoSlices.keySet());
|
||||
oneSlices.forEach((s, slice) -> {
|
||||
Slice sTwo = twoSlices.get(s);
|
||||
for (Replica oneReplica : slice.getReplicas()) {
|
||||
Replica twoReplica = sTwo.getReplica(oneReplica.getName());
|
||||
assertNotNull(twoReplica);
|
||||
assertReplicaEquals(oneReplica, twoReplica);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private static void assertReplicaEquals(Replica one, Replica two) {
|
||||
assertEquals(one.getName(), two.getName());
|
||||
assertEquals(one.getNodeName(), two.getNodeName());
|
||||
assertEquals(one.getState(), two.getState());
|
||||
assertEquals(one.getType(), two.getType());
|
||||
Map<String, Object> filteredPropsOne = one.getProperties().entrySet().stream()
|
||||
.filter(e -> !(e.getKey().startsWith("INDEX") || e.getKey().startsWith("QUERY") || e.getKey().startsWith("UPDATE")))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
Map<String, Object> filteredPropsTwo = two.getProperties().entrySet().stream()
|
||||
.filter(e -> !(e.getKey().startsWith("INDEX") || e.getKey().startsWith("QUERY") || e.getKey().startsWith("UPDATE")))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
assertEquals(filteredPropsOne, filteredPropsTwo);
|
||||
}
|
||||
|
||||
}
|
|
@ -466,6 +466,10 @@ public class Policy implements MapWriter {
|
|||
return params.stream().map(Pair::first).collect(toList());
|
||||
}
|
||||
|
||||
public List<String> getPerReplicaAttributes() {
|
||||
return Collections.unmodifiableList(perReplicaAttributes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares two {@link Row} loads according to a policy.
|
||||
*
|
||||
|
|
|
@ -72,7 +72,7 @@ public class ReplicaInfo implements MapWriter {
|
|||
this.node = node;
|
||||
}
|
||||
|
||||
ReplicaInfo(Map<String, Object> map) {
|
||||
public ReplicaInfo(Map<String, Object> map) {
|
||||
this.name = map.keySet().iterator().next();
|
||||
Map details = (Map) map.get(name);
|
||||
details = Utils.getDeepCopy(details, 4);
|
||||
|
@ -171,6 +171,30 @@ public class ReplicaInfo implements MapWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(o instanceof ReplicaInfo)) {
|
||||
return false;
|
||||
}
|
||||
ReplicaInfo other = (ReplicaInfo)o;
|
||||
if (
|
||||
name.equals(other.name) &&
|
||||
collection.equals(other.collection) &&
|
||||
core.equals(other.core) &&
|
||||
isLeader == other.isLeader &&
|
||||
node.equals(other.node) &&
|
||||
shard.equals(other.shard) &&
|
||||
type == other.type &&
|
||||
variables.equals(other.variables)) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
|
|
|
@ -16,12 +16,19 @@
|
|||
*/
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.util.Base64;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
|
||||
/**
|
||||
* Immutable representation of binary data with version.
|
||||
*/
|
||||
public class VersionedData {
|
||||
public class VersionedData implements MapWriter {
|
||||
private final int version;
|
||||
private final byte[] data;
|
||||
private final String owner;
|
||||
|
@ -56,4 +63,32 @@ public class VersionedData {
|
|||
public String getOwner() {
|
||||
return owner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("version", version);
|
||||
if (owner != null) {
|
||||
ew.put("owner", owner);
|
||||
}
|
||||
ew.put("mode", mode.toString());
|
||||
if (data != null) {
|
||||
ew.put("data", Base64.byteArrayToBase64(data));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
VersionedData that = (VersionedData) o;
|
||||
return version == that.version &&
|
||||
Arrays.equals(data, that.data) &&
|
||||
Objects.equals(owner, that.owner) &&
|
||||
mode == that.mode;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -704,6 +704,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
|
||||
}
|
||||
|
||||
public static MoveReplica moveReplica(String collection, String replica, String targetNode) {
|
||||
return new MoveReplica(collection, replica, targetNode);
|
||||
}
|
||||
|
||||
public static class MoveReplica extends AsyncCollectionAdminRequest {
|
||||
protected String collection, replica, targetNode;
|
||||
protected String shard, sourceNode;
|
||||
|
|
|
@ -179,11 +179,11 @@ public abstract class TimeSource {
|
|||
public static TimeSource get(String type) {
|
||||
if (type == null) {
|
||||
return NANO_TIME;
|
||||
} else if (type.equals("currentTime")) {
|
||||
} else if (type.equals("currentTime") || type.equals(CurrentTimeSource.class.getSimpleName())) {
|
||||
return CURRENT_TIME;
|
||||
} else if (type.equals("nanoTime")) {
|
||||
} else if (type.equals("nanoTime") || type.equals(NanoTimeSource.class.getSimpleName())) {
|
||||
return NANO_TIME;
|
||||
} else if (type.startsWith("simTime")) {
|
||||
} else if (type.startsWith("simTime") || type.startsWith(SimTimeSource.class.getSimpleName())) {
|
||||
return simTimeSources.computeIfAbsent(type, t -> {
|
||||
String[] parts = t.split(":");
|
||||
double mul = 1.0;
|
||||
|
|
Loading…
Reference in New Issue