diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 331af7bbf8a..bc9ff2f33a4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -83,6 +83,8 @@ New Features * SOLR-13047: Add facet2D Streaming Expression (Nazerke Seidan, Joel Bernstein) +* SOLR-13440: Support saving/restoring autoscaling state for repeatable simulations. (ab) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/NoopDistributedQueueFactory.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/NoopDistributedQueueFactory.java new file mode 100644 index 00000000000..b04d38ee2c9 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/NoopDistributedQueueFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Predicate; + +import org.apache.solr.client.solrj.cloud.DistributedQueue; +import org.apache.solr.client.solrj.cloud.DistributedQueueFactory; +import org.apache.solr.common.util.Pair; + +/** + * A queue factory implementation that does nothing. + */ +public class NoopDistributedQueueFactory implements DistributedQueueFactory { + + public static final DistributedQueueFactory INSTANCE = new NoopDistributedQueueFactory(); + + @Override + public DistributedQueue makeQueue(String path) throws IOException { + return NoopDistributedQueue.INSTANCE; + } + + @Override + public void removeQueue(String path) throws IOException { + + } + + private static final class NoopDistributedQueue implements DistributedQueue { + static final DistributedQueue INSTANCE = new NoopDistributedQueue(); + + @Override + public byte[] peek() throws Exception { + return new byte[0]; + } + + @Override + public byte[] peek(boolean block) throws Exception { + return new byte[0]; + } + + @Override + public byte[] peek(long wait) throws Exception { + return new byte[0]; + } + + @Override + public byte[] poll() throws Exception { + return new byte[0]; + } + + @Override + public byte[] remove() throws Exception { + return new byte[0]; + } + + @Override + public byte[] take() throws Exception { + return new byte[0]; + } + + @Override + public void offer(byte[] data) throws Exception { + + } + + @Override + public Map getStats() { + return Collections.emptyMap(); + } + + @Override + public Collection> peekElements(int max, long waitMillis, Predicate acceptFilter) throws Exception { + return Collections.emptyList(); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java index eb5027bee64..01ab2270772 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java @@ -22,9 +22,9 @@ import java.io.File; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -52,12 +52,12 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.cloud.DistributedQueueFactory; import org.apache.solr.client.solrj.cloud.NodeStateProvider; import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.cloud.autoscaling.Variable; import org.apache.solr.client.solrj.impl.ClusterStateProvider; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.CollectionApiMapping; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -173,19 +173,27 @@ public class SimCloudManager implements SolrCloudManager { * @param timeSource time source to use. */ public SimCloudManager(TimeSource timeSource) throws Exception { - this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode()); + this(timeSource, null); + } + + SimCloudManager(TimeSource timeSource, SimDistribStateManager distribStateManager) throws Exception { this.loader = new SolrResourceLoader(); - // init common paths - stateManager.makePath(ZkStateReader.CLUSTER_STATE); - stateManager.makePath(ZkStateReader.CLUSTER_PROPS); - stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH); - stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE); - stateManager.makePath(ZkStateReader.ROLES); - stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); - stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); - stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); - stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); - stateManager.makePath(Overseer.OVERSEER_ELECT); + if (distribStateManager == null) { + this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode()); + // init common paths + stateManager.makePath(ZkStateReader.CLUSTER_STATE); + stateManager.makePath(ZkStateReader.CLUSTER_PROPS); + stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH); + stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE); + stateManager.makePath(ZkStateReader.ROLES); + stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); + stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); + stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); + stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + stateManager.makePath(Overseer.OVERSEER_ELECT); + } else { + this.stateManager = distribStateManager; + } // register common metrics metricTag = Integer.toHexString(hashCode()); @@ -296,22 +304,27 @@ public class SimCloudManager implements SolrCloudManager { return cloudManager; } - public static SimCloudManager createCluster(SolrCloudManager other, TimeSource timeSource) throws Exception { - SimCloudManager cloudManager = new SimCloudManager(timeSource); + public static SimCloudManager createCluster(SolrCloudManager other, AutoScalingConfig config, TimeSource timeSource) throws Exception { + SimDistribStateManager distribStateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode()); + distribStateManager.copyFrom(other.getDistribStateManager(), false); + SimCloudManager cloudManager = new SimCloudManager(timeSource, distribStateManager); + if (config != null) { + cloudManager.getSimDistribStateManager().simSetAutoScalingConfig(config); + } else { + config = cloudManager.getDistribStateManager().getAutoScalingConfig(); + } + Set nodeTags = new HashSet<>(SimUtils.COMMON_NODE_TAGS); + nodeTags.addAll(config.getPolicy().getParams()); + Set replicaTags = new HashSet<>(SimUtils.COMMON_REPLICA_TAGS); + replicaTags.addAll(config.getPolicy().getPerReplicaAttributes()); cloudManager.getSimClusterStateProvider().copyFrom(other.getClusterStateProvider()); - List replicaTags = Arrays.asList( - Variable.Type.CORE_IDX.metricsAttribute, - "QUERY./select.requests", - "UPDATE./update.requests" - ); - Set nodeTags = createNodeValues("unused:1234_solr").keySet(); for (String node : other.getClusterStateProvider().getLiveNodes()) { SimClusterStateProvider simClusterStateProvider = cloudManager.getSimClusterStateProvider(); cloudManager.getSimNodeStateProvider().simSetNodeValues(node, other.getNodeStateProvider().getNodeValues(node, nodeTags)); Map>> infos = other.getNodeStateProvider().getReplicaInfo(node, replicaTags); simClusterStateProvider.simSetReplicaValues(node, infos, true); } - cloudManager.getSimDistribStateManager().copyFrom(other.getDistribStateManager(), false); + SimUtils.checkConsistency(cloudManager, config); return cloudManager; } @@ -721,13 +734,6 @@ public class SimCloudManager implements SolrCloudManager { count.incrementAndGet(); } - private static final Map v2v1Mapping = new HashMap<>(); - static { - for (CollectionApiMapping.Meta meta : CollectionApiMapping.Meta.values()) { - if (meta.action != null) v2v1Mapping.put(meta.commandName, meta.action.toLower()); - } - } - /** * Handler method for autoscaling requests. NOTE: only a specific subset of autoscaling requests is * supported! @@ -835,31 +841,8 @@ public class SimCloudManager implements SolrCloudManager { if (!(req instanceof CollectionAdminRequest)) { // maybe a V2Request? if (req instanceof V2Request) { - Map reqMap = new HashMap<>(); - ((V2Request)req).toMap(reqMap); - String path = (String)reqMap.get("path"); - if (!path.startsWith("/c/") || path.length() < 4) { - throw new UnsupportedOperationException("Unsupported V2 request path: " + reqMap); - } - Map cmd = (Map)reqMap.get("command"); - if (cmd.size() != 1) { - throw new UnsupportedOperationException("Unsupported multi-command V2 request: " + reqMap); - } - a = cmd.keySet().iterator().next(); - params = new ModifiableSolrParams(); - ((ModifiableSolrParams)params).add(CollectionAdminParams.COLLECTION, path.substring(3)); - if (req.getParams() != null) { - ((ModifiableSolrParams)params).add(req.getParams()); - } - Map reqParams = (Map)cmd.get(a); - for (Map.Entry e : reqParams.entrySet()) { - ((ModifiableSolrParams)params).add(e.getKey(), e.getValue().toString()); - } - // re-map from v2 to v1 action - a = v2v1Mapping.get(a); - if (a == null) { - throw new UnsupportedOperationException("Unsupported V2 request: " + reqMap); - } + params = SimUtils.v2AdminRequestToV1Params((V2Request)req); + a = params.get(CoreAdminParams.ACTION); } else { throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName() + ": " + req.getPath() + " " + req.getParams()); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java index 0b03ecfdd26..a4808727d5b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import com.google.common.util.concurrent.AtomicDouble; import org.apache.commons.math3.stat.descriptive.SummaryStatistics; @@ -199,6 +200,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { lock.lockInterruptibly(); try { collProperties.clear(); + colShardReplicaMap.clear(); sliceProperties.clear(); nodeReplicaMap.clear(); liveNodes.clear(); @@ -209,6 +211,9 @@ public class SimClusterStateProvider implements ClusterStateProvider { if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId)) { stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, -1); } + if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId)) { + stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId, -1); + } } liveNodes.addAll(initialState.getLiveNodes()); for (String nodeId : liveNodes.get()) { @@ -220,10 +225,22 @@ public class SimClusterStateProvider implements ClusterStateProvider { dc.getSlices().forEach(s -> { sliceProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()) .computeIfAbsent(s.getName(), Utils.NEW_HASHMAP_FUN).putAll(s.getProperties()); + Replica leader = s.getLeader(); s.getReplicas().forEach(r -> { - ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties()); + Map props = new HashMap<>(r.getProperties()); + if (leader != null && r.getName().equals(leader.getName())) { + props.put("leader", "true"); + } + ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), props); + if (leader != null && r.getName().equals(leader.getName())) { + ri.getVariables().put("leader", "true"); + } if (liveNodes.get().contains(r.getNodeName())) { nodeReplicaMap.computeIfAbsent(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN).add(ri); + colShardReplicaMap.computeIfAbsent(ri.getCollection(), name -> new ConcurrentHashMap<>()) + .computeIfAbsent(ri.getShard(), shard -> new ArrayList<>()).add(ri); + } else { + log.warn("- dropping replica because its node " + r.getNodeName() + " is not live: " + r); } }); }); @@ -274,7 +291,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { (r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN); synchronized (list) { for (ReplicaInfo ri : list) { - if (r.getCoreName().equals(ri.getCore())) { + if (r.getCoreName().equals(ri.getCore()) && r.getName().equals(ri.getName())) { return ri; } } @@ -562,14 +579,15 @@ public class SimClusterStateProvider implements ClusterStateProvider { Map values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues() .computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id))); // update the number of cores and freedisk in node values - Integer cores = (Integer)values.get(ImplicitSnitch.CORES); + Number cores = (Number)values.get(ImplicitSnitch.CORES); if (cores == null) { cores = 0; } - cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1); + cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores.intValue() + 1); Number disk = (Number)values.get(ImplicitSnitch.DISK); if (disk == null) { - disk = SimCloudManager.DEFAULT_FREE_DISK; + throw new Exception("Missing '" + ImplicitSnitch.DISK + "' in node metrics for node " + nodeId); + //disk = SimCloudManager.DEFAULT_FREE_DISK; } long replicaSize = ((Number)replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute)).longValue(); Number replicaSizeGB = (Number)Type.CORE_IDX.convertVal(replicaSize); @@ -598,7 +616,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { * @param nodeId node id * @param coreNodeName coreNodeName */ - public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception { + public void simRemoveReplica(String nodeId, String collection, String coreNodeName) throws Exception { ensureNotClosed(); lock.lockInterruptibly(); @@ -607,7 +625,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { (nodeId, Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN); synchronized (replicas) { for (int i = 0; i < replicas.size(); i++) { - if (coreNodeName.equals(replicas.get(i).getName())) { + if (collection.equals(replicas.get(i).getCollection()) && coreNodeName.equals(replicas.get(i).getName())) { ReplicaInfo ri = replicas.remove(i); colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>()) .computeIfAbsent(ri.getShard(), s -> new ArrayList<>()) @@ -618,11 +636,11 @@ public class SimClusterStateProvider implements ClusterStateProvider { // update the number of cores in node values, if node is live if (liveNodes.contains(nodeId)) { - Integer cores = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES); - if (cores == null || cores == 0) { + Number cores = (Number)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES); + if (cores == null || cores.intValue() == 0) { throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId); } - cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores - 1); + cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores.intValue() - 1); Number disk = (Number)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.DISK); if (disk == null || disk.doubleValue() == 0.0) { throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId); @@ -1038,13 +1056,13 @@ public class SimClusterStateProvider implements ClusterStateProvider { if (ri.getCollection().equals(collection)) { it.remove(); // update the number of cores in node values - Integer cores = (Integer) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores"); + Number cores = (Number) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores"); if (cores != null) { // node is still up - if (cores == 0) { + if (cores.intValue() == 0) { throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n); } try { - cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores - 1); + cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores.intValue() - 1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("interrupted"); @@ -1089,6 +1107,13 @@ public class SimClusterStateProvider implements ClusterStateProvider { } } + private static final Set NO_COPY_PROPS = new HashSet<>(Arrays.asList( + ZkStateReader.CORE_NODE_NAME_PROP, + ZkStateReader.NODE_NAME_PROP, + ZkStateReader.BASE_URL_PROP, + ZkStateReader.CORE_NAME_PROP + )); + /** * Move replica. This uses a similar algorithm as {@link org.apache.solr.cloud.api.collections.MoveReplicaCmd} moveNormalReplica(...) method. * @param message operation details @@ -1123,17 +1148,35 @@ public class SimClusterStateProvider implements ClusterStateProvider { } opDelay(collection, CollectionParams.CollectionAction.MOVEREPLICA.name()); + ReplicaInfo ri = getReplicaInfo(replica); + if (ri != null) { + if (ri.getVariable(Type.CORE_IDX.tagName) != null) { + // simulate very large replicas - add additional delay of 5s / GB + long sizeInGB = ((Number)ri.getVariable(Type.CORE_IDX.tagName)).longValue(); + long opDelay = opDelays.getOrDefault(ri.getCollection(), Collections.emptyMap()) + .getOrDefault(CollectionParams.CollectionAction.MOVEREPLICA.name(), defaultOpDelays.get(CollectionParams.CollectionAction.MOVEREPLICA.name())); + opDelay = TimeUnit.MILLISECONDS.toSeconds(opDelay); + if (sizeInGB > opDelay) { + // add 5s per each GB above the threshold + cloudManager.getTimeSource().sleep(TimeUnit.SECONDS.toMillis(sizeInGB - opDelay) * 5); + } + } + } // TODO: for now simulate moveNormalReplica sequence, where we first add new replica and then delete the old one String newSolrCoreName = Assign.buildSolrCoreName(stateManager, coll, slice.getName(), replica.getType()); String coreNodeName = Assign.assignCoreNodeName(stateManager, coll); - ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, null); + // copy other properties + Map props = replica.getProperties().entrySet().stream() + .filter(e -> !NO_COPY_PROPS.contains(e.getKey())) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, props); log.debug("-- new replica: " + newReplica); // xxx should run leader election here already? simAddReplica(targetNode, newReplica, false); // this will trigger leader election - simRemoveReplica(replica.getNodeName(), replica.getName()); + simRemoveReplica(replica.getNodeName(), collection, replica.getName()); results.add("success", ""); } @@ -1996,13 +2039,14 @@ public class SimClusterStateProvider implements ClusterStateProvider { public void simSetReplicaValues(String node, Map>> source, boolean overwrite) { List infos = nodeReplicaMap.get(node); - Map infoMap = new HashMap<>(); - infos.forEach(ri -> infoMap.put(ri.getName(), ri)); if (infos == null) { throw new RuntimeException("Node not present: " + node); } + // core_node_name is not unique across collections + Map> infoMap = new HashMap<>(); + infos.forEach(ri -> infoMap.computeIfAbsent(ri.getCollection(), Utils.NEW_HASHMAP_FUN).put(ri.getName(), ri)); source.forEach((coll, shards) -> shards.forEach((shard, replicas) -> replicas.forEach(r -> { - ReplicaInfo target = infoMap.get(r.getName()); + ReplicaInfo target = infoMap.getOrDefault(coll, Collections.emptyMap()).get(r.getName()); if (target == null) { throw new RuntimeException("Unable to find simulated replica of " + r); } @@ -2041,6 +2085,18 @@ public class SimClusterStateProvider implements ClusterStateProvider { } } + public ReplicaInfo simGetReplicaInfo(String collection, String coreNode) { + Map> shardsReplicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>()); + for (List replicas : shardsReplicas.values()) { + for (ReplicaInfo ri : replicas) { + if (ri.getName().equals(coreNode)) { + return ri; + } + } + } + return null; + } + /** * List collections. * @return list of existing collections. diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java index 1c50007ebc7..1872f92dc0d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java @@ -75,7 +75,8 @@ public class SimDistribStateManager implements DistribStateManager { private int version = 0; private int seq = 0; private final CreateMode mode; - private final String clientId; + // copyFrom needs to modify this + private String owner; private final String path; private final String name; private final Node parent; @@ -84,16 +85,16 @@ public class SimDistribStateManager implements DistribStateManager { Set dataWatches = ConcurrentHashMap.newKeySet(); Set childrenWatches = ConcurrentHashMap.newKeySet(); - Node(Node parent, String name, String path, CreateMode mode, String clientId) { + Node(Node parent, String name, String path, CreateMode mode, String owner) { this.parent = parent; this.name = name; this.path = path; this.mode = mode; - this.clientId = clientId; + this.owner = owner; } - Node(Node parent, String name, String path, byte[] data, CreateMode mode, String clientId) { - this(parent, name, path, mode, clientId); + Node(Node parent, String name, String path, byte[] data, CreateMode mode, String owner) { + this(parent, name, path, mode, owner); this.data = data; } @@ -136,7 +137,7 @@ public class SimDistribStateManager implements DistribStateManager { public VersionedData getData(Watcher w) { dataLock.lock(); try { - VersionedData res = new VersionedData(version, data, mode, clientId); + VersionedData res = new VersionedData(version, data, mode, owner); if (w != null && !dataWatches.contains(w)) { dataWatches.add(w); } @@ -195,7 +196,7 @@ public class SimDistribStateManager implements DistribStateManager { continue; } if ((CreateMode.EPHEMERAL == n.mode || CreateMode.EPHEMERAL_SEQUENTIAL == n.mode) && - id.equals(n.clientId)) { + id.equals(n.owner)) { removeChild(n.name, -1); } else { n.removeEphemeralChildren(id); @@ -208,7 +209,7 @@ public class SimDistribStateManager implements DistribStateManager { private final ReentrantLock multiLock = new ReentrantLock(); public static Node createNewRootNode() { - return new Node(null, "", "/", CreateMode.PERSISTENT, "__root__"); + return new Node(null, "", "/", CreateMode.PERSISTENT, "0"); } private final ExecutorService watchersPool; @@ -243,6 +244,7 @@ public class SimDistribStateManager implements DistribStateManager { */ public void copyFrom(DistribStateManager other, boolean failOnExists) throws InterruptedException, IOException, KeeperException, AlreadyExistsException, BadVersionException { List tree = other.listTree("/"); + log.info("- copying " + tree.size() + " resources..."); // check if any node exists for (String path : tree) { if (hasData(path) && failOnExists) { @@ -256,9 +258,10 @@ public class SimDistribStateManager implements DistribStateManager { } else { makePath(path, data.getData(), data.getMode(), failOnExists); } - // hack: set the version to be the same as the source + // hack: set the version and owner to be the same as the source Node n = traverse(path, false, CreateMode.PERSISTENT); n.version = data.getVersion(); + n.owner = data.getOwner(); } } @@ -328,6 +331,9 @@ public class SimDistribStateManager implements DistribStateManager { return null; } throttleOrError(path); + if (path.equals("/")) { + return root; + } if (path.charAt(0) == '/') { path = path.substring(1); } @@ -365,7 +371,8 @@ public class SimDistribStateManager implements DistribStateManager { } fullChildPath.append(nodeName); - Node child = new Node(parentNode, nodeName, fullChildPath.toString(), data, mode, id); + String owner = mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL ? id : "0"; + Node child = new Node(parentNode, nodeName, fullChildPath.toString(), data, mode, owner); if (attachToParent) { parentNode.setChild(nodeName, child); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java index 2a87357f57e..9a5656e3541 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java @@ -317,6 +317,7 @@ public class SimNodeStateProvider implements NodeStateProvider { for (ReplicaInfo r : replicas) { Map> perCollection = res.computeIfAbsent(r.getCollection(), Utils.NEW_HASHMAP_FUN); List perShard = perCollection.computeIfAbsent(r.getShard(), Utils.NEW_ARRAYLIST_FUN); + // XXX filter out some properties? perShard.add(r); } return res; diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimUtils.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimUtils.java new file mode 100644 index 00000000000..38181340db8 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimUtils.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.Policy; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; +import org.apache.solr.client.solrj.cloud.autoscaling.Row; +import org.apache.solr.client.solrj.cloud.autoscaling.Variable; +import org.apache.solr.client.solrj.request.CollectionApiMapping; +import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.params.CollectionAdminParams; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.ModifiableSolrParams; + +/** + * Various utility methods useful for autoscaling simulations and snapshots. + */ +public class SimUtils { + + public static final Set COMMON_REPLICA_TAGS = new HashSet<>(Arrays.asList( + Variable.Type.CORE_IDX.metricsAttribute, + Variable.Type.CORE_IDX.tagName, + "QUERY./select.requests", + "UPDATE./update.requests" + )); + + public static final Set COMMON_NODE_TAGS = new HashSet<>(Arrays.asList( + Variable.Type.CORES.tagName, + Variable.Type.FREEDISK.tagName, + Variable.Type.NODE.tagName, + Variable.Type.NODE_ROLE.tagName, + Variable.Type.TOTALDISK.tagName, + Variable.Type.DISKTYPE.tagName, + Variable.Type.HEAPUSAGE.tagName, + Variable.Type.HOST.tagName, + Variable.Type.IP_1.tagName, + Variable.Type.IP_2.tagName, + Variable.Type.IP_3.tagName, + Variable.Type.IP_4.tagName, + Variable.Type.PORT.tagName, + Variable.Type.SYSLOADAVG.tagName, + "withCollection" + )); + + /** + * Check consistency of data in a {@link SolrCloudManager}. This may be needed when constructing a simulated + * instance from potentially inconsistent data (eg. partial snapshots taken at different points in time). + * @param solrCloudManager source manager + * @param config optional {@link AutoScalingConfig} instance used to determine what node and replica metrics to check. + */ + public static void checkConsistency(SolrCloudManager solrCloudManager, AutoScalingConfig config) throws Exception { + if (config == null) { + config = solrCloudManager.getDistribStateManager().getAutoScalingConfig(); + } + Set replicaTags = new HashSet<>(COMMON_REPLICA_TAGS); + replicaTags.addAll(config.getPolicy().getPerReplicaAttributes()); + + // verify replicas are consistent and data is available + Map> allReplicas = new HashMap<>(); + solrCloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> { + coll.getReplicas().forEach(r -> { + if (allReplicas.containsKey(r.getName())) { + throw new RuntimeException("duplicate core_node name in clusterState: " + allReplicas.get(r.getName()) + " versus " + r); + } else { + allReplicas.computeIfAbsent(coll.getName(), c -> new HashMap<>()).put(r.getName(), r); + } + }); + }); + Map> allReplicaInfos = new HashMap<>(); + solrCloudManager.getClusterStateProvider().getLiveNodes().forEach(n -> { + Map>> infos = solrCloudManager.getNodeStateProvider().getReplicaInfo(n, replicaTags); + infos.forEach((coll, shards) -> shards.forEach((shard, replicas) -> replicas.forEach(r -> { + if (allReplicaInfos.containsKey(r.getName())) { + throw new RuntimeException("duplicate core_node name in NodeStateProvider: " + allReplicaInfos.get(r.getName()) + " versus " + r); + } else { + allReplicaInfos.computeIfAbsent(coll, c -> new HashMap<>()).put(r.getName(), r); + } + }))); + }); + if (!allReplicaInfos.keySet().equals(allReplicas.keySet())) { + Set notInClusterState = allReplicaInfos.keySet().stream() + .filter(k -> !allReplicas.containsKey(k)) + .collect(Collectors.toSet()); + Set notInNodeProvider = allReplicas.keySet().stream() + .filter(k -> !allReplicaInfos.containsKey(k)) + .collect(Collectors.toSet()); + throw new RuntimeException("Mismatched replica data between ClusterState and NodeStateProvider:\n\t" + + "collection not in ClusterState: " + notInClusterState + "\n\t" + + "collection not in NodeStateProvider: " + notInNodeProvider); + } + allReplicaInfos.keySet().forEach(collection -> { + Set infosCores = allReplicaInfos.getOrDefault(collection, Collections.emptyMap()).keySet(); + Set csCores = allReplicas.getOrDefault(collection, Collections.emptyMap()).keySet(); + if (!infosCores.equals(csCores)) { + Set notInClusterState = infosCores.stream() + .filter(k -> !csCores.contains(k)) + .collect(Collectors.toSet()); + Set notInNodeProvider = csCores.stream() + .filter(k -> !infosCores.contains(k)) + .collect(Collectors.toSet()); + throw new RuntimeException("Mismatched replica data between ClusterState and NodeStateProvider:\n\t" + + "replica not in ClusterState: " + notInClusterState + "\n\t" + + "replica not in NodeStateProvider: " + notInNodeProvider); + } + }); + // verify all replicas have size info + allReplicaInfos.forEach((coll, replicas) -> replicas.forEach((core, ri) -> { + Number size = (Number) ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute); + if (size == null) { + size = (Number) ri.getVariable(Variable.Type.CORE_IDX.tagName); + if (size == null) { +// for (String node : solrCloudManager.getClusterStateProvider().getLiveNodes()) { +// log.error("Check for missing values: {}: {}", node, solrCloudManager.getNodeStateProvider().getReplicaInfo(node, SnapshotNodeStateProvider.REPLICA_TAGS)); +// } + throw new RuntimeException("missing replica size information: " + ri); + } + } + } + )); + } + + /** + * Calculate statistics of node / collection and replica layouts for the provided {@link SolrCloudManager}. + * @param cloudManager manager + * @param config autoscaling config, or null if the one from the provided manager should be used + * @param verbose if true then add more details about replicas. + * @return a map containing detailed statistics + */ + public static Map calculateStats(SolrCloudManager cloudManager, AutoScalingConfig config, boolean verbose) throws Exception { + ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState(); + Map> collStats = new TreeMap<>(); + Policy.Session session = config.getPolicy().createSession(cloudManager); + clusterState.forEachCollection(coll -> { + Map perColl = collStats.computeIfAbsent(coll.getName(), n -> new LinkedHashMap<>()); + AtomicInteger numCores = new AtomicInteger(); + HashMap> nodes = new HashMap<>(); + coll.getSlices().forEach(s -> { + numCores.addAndGet(s.getReplicas().size()); + s.getReplicas().forEach(r -> { + nodes.computeIfAbsent(r.getNodeName(), n -> new HashMap<>()) + .computeIfAbsent(s.getName(), slice -> new AtomicInteger()).incrementAndGet(); + }); + }); + int maxCoresPerNode = 0; + int minCoresPerNode = 0; + int maxActualShardsPerNode = 0; + int minActualShardsPerNode = 0; + int maxShardReplicasPerNode = 0; + int minShardReplicasPerNode = 0; + if (!nodes.isEmpty()) { + minCoresPerNode = Integer.MAX_VALUE; + minActualShardsPerNode = Integer.MAX_VALUE; + minShardReplicasPerNode = Integer.MAX_VALUE; + for (Map counts : nodes.values()) { + int total = counts.values().stream().mapToInt(c -> c.get()).sum(); + for (AtomicInteger count : counts.values()) { + if (count.get() > maxShardReplicasPerNode) { + maxShardReplicasPerNode = count.get(); + } + if (count.get() < minShardReplicasPerNode) { + minShardReplicasPerNode = count.get(); + } + } + if (total > maxCoresPerNode) { + maxCoresPerNode = total; + } + if (total < minCoresPerNode) { + minCoresPerNode = total; + } + if (counts.size() > maxActualShardsPerNode) { + maxActualShardsPerNode = counts.size(); + } + if (counts.size() < minActualShardsPerNode) { + minActualShardsPerNode = counts.size(); + } + } + } + perColl.put("activeShards", coll.getActiveSlices().size()); + perColl.put("inactiveShards", coll.getSlices().size() - coll.getActiveSlices().size()); + perColl.put("rf", coll.getReplicationFactor()); + perColl.put("maxShardsPerNode", coll.getMaxShardsPerNode()); + perColl.put("maxActualShardsPerNode", maxActualShardsPerNode); + perColl.put("minActualShardsPerNode", minActualShardsPerNode); + perColl.put("maxShardReplicasPerNode", maxShardReplicasPerNode); + perColl.put("minShardReplicasPerNode", minShardReplicasPerNode); + perColl.put("numCores", numCores.get()); + perColl.put("numNodes", nodes.size()); + perColl.put("maxCoresPerNode", maxCoresPerNode); + perColl.put("minCoresPerNode", minCoresPerNode); + }); + Map> nodeStats = new TreeMap<>(); + Map coreStats = new TreeMap<>(); + List rows = session.getSortedNodes(); + // check consistency + if (rows.size() != clusterState.getLiveNodes().size()) { + throw new Exception("Mismatch between autoscaling matrix size (" + rows.size() + ") and liveNodes size (" + clusterState.getLiveNodes().size() + ")"); + } + for (Row row : rows) { + Map nodeStat = nodeStats.computeIfAbsent(row.node, n -> new LinkedHashMap<>()); + nodeStat.put("isLive", row.isLive()); + nodeStat.put("freedisk", row.getVal("freedisk", 0)); + nodeStat.put("totaldisk", row.getVal("totaldisk", 0)); + int cores = ((Number)row.getVal("cores", 0)).intValue(); + nodeStat.put("cores", cores); + coreStats.computeIfAbsent(cores, num -> new AtomicInteger()).incrementAndGet(); + Map>> collReplicas = new TreeMap<>(); + // check consistency + AtomicInteger rowCores = new AtomicInteger(); + row.forEachReplica(ri -> rowCores.incrementAndGet()); + if (cores != rowCores.get()) { + throw new Exception("Mismatch between autoscaling matrix row replicas (" + rowCores.get() + ") and number of cores (" + cores + ")"); + } + row.forEachReplica(ri -> { + Map perReplica = collReplicas.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>()) + .computeIfAbsent(ri.getCore().substring(ri.getCollection().length() + 1), core -> new LinkedHashMap<>()); +// if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) { +// perReplica.put(Variable.Type.CORE_IDX.tagName, ri.getVariable(Variable.Type.CORE_IDX.tagName)); +// } + if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) != null) { + perReplica.put(Variable.Type.CORE_IDX.metricsAttribute, ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute)); + if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) { + perReplica.put(Variable.Type.CORE_IDX.tagName, ri.getVariable(Variable.Type.CORE_IDX.tagName)); + } else { + perReplica.put(Variable.Type.CORE_IDX.tagName, + Variable.Type.CORE_IDX.convertVal(ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute))); + } + } + perReplica.put("coreNode", ri.getName()); + if (ri.isLeader || ri.getBool("leader", false)) { + perReplica.put("leader", true); + Double totalSize = (Double)collStats.computeIfAbsent(ri.getCollection(), c -> new HashMap<>()) + .computeIfAbsent("avgShardSize", size -> 0.0); + Number riSize = (Number)ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute); + if (riSize != null) { + totalSize += riSize.doubleValue(); + collStats.get(ri.getCollection()).put("avgShardSize", totalSize); + Double max = (Double)collStats.get(ri.getCollection()).get("maxShardSize"); + if (max == null) max = 0.0; + if (riSize.doubleValue() > max) { + collStats.get(ri.getCollection()).put("maxShardSize", riSize.doubleValue()); + } + Double min = (Double)collStats.get(ri.getCollection()).get("minShardSize"); + if (min == null) min = Double.MAX_VALUE; + if (riSize.doubleValue() < min) { + collStats.get(ri.getCollection()).put("minShardSize", riSize.doubleValue()); + } + } else { + throw new RuntimeException("ReplicaInfo without size information: " + ri); + } + } + if (verbose) { + nodeStat.put("replicas", collReplicas); + } + }); + } + + // calculate average per shard and convert the units + for (Map perColl : collStats.values()) { + Number avg = perColl.get("avgShardSize"); + if (avg != null) { + avg = avg.doubleValue() / perColl.get("activeShards").doubleValue(); + perColl.put("avgShardSize", (Number)Variable.Type.CORE_IDX.convertVal(avg)); + } + Number num = perColl.get("maxShardSize"); + if (num != null) { + perColl.put("maxShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num)); + } + num = perColl.get("minShardSize"); + if (num != null) { + perColl.put("minShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num)); + } + } + Map stats = new LinkedHashMap<>(); + stats.put("coresPerNodes", coreStats); + stats.put("sortedNodeStats", nodeStats); + stats.put("collectionStats", collStats); + return stats; + } + + private static final Map v2v1Mapping = new HashMap<>(); + static { + for (CollectionApiMapping.Meta meta : CollectionApiMapping.Meta.values()) { + if (meta.action != null) v2v1Mapping.put(meta.commandName, meta.action.toLower()); + } + } + + /** + * Convert a V2 {@link org.apache.solr.client.solrj.request.CollectionAdminRequest} to regular {@link org.apache.solr.common.params.SolrParams} + * @param req request + * @return payload converted to V1 params + */ + public static ModifiableSolrParams v2AdminRequestToV1Params(V2Request req) { + Map reqMap = new HashMap<>(); + ((V2Request)req).toMap(reqMap); + String path = (String)reqMap.get("path"); + if (!path.startsWith("/c/") || path.length() < 4) { + throw new UnsupportedOperationException("Unsupported V2 request path: " + reqMap); + } + Map cmd = (Map)reqMap.get("command"); + if (cmd.size() != 1) { + throw new UnsupportedOperationException("Unsupported multi-command V2 request: " + reqMap); + } + String a = cmd.keySet().iterator().next(); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add(CollectionAdminParams.COLLECTION, path.substring(3)); + if (req.getParams() != null) { + params.add(req.getParams()); + } + Map reqParams = (Map)cmd.get(a); + for (Map.Entry e : reqParams.entrySet()) { + params.add(e.getKey(), e.getValue().toString()); + } + // re-map from v2 to v1 action + a = v2v1Mapping.get(a); + if (a == null) { + throw new UnsupportedOperationException("Unsupported V2 request: " + reqMap); + } + params.add(CoreAdminParams.ACTION, a); + return params; + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotCloudManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotCloudManager.java new file mode 100644 index 00000000000..a0a20fda65b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotCloudManager.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.autoscaling.sim; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.commons.io.IOUtils; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.cloud.DistribStateManager; +import org.apache.solr.client.solrj.cloud.DistributedQueueFactory; +import org.apache.solr.client.solrj.cloud.NodeStateProvider; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.Policy; +import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; +import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; +import org.apache.solr.client.solrj.impl.ClusterStateProvider; +import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.common.params.CollectionAdminParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ObjectCache; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Read-only snapshot of another {@link SolrCloudManager}. + */ +public class SnapshotCloudManager implements SolrCloudManager { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ObjectCache objectCache = new ObjectCache(); + private SnapshotClusterStateProvider clusterStateProvider; + private SnapshotNodeStateProvider nodeStateProvider; + private SnapshotDistribStateManager distribStateManager; + private TimeSource timeSource; + + public static final String MANAGER_STATE_KEY = "managerState"; + public static final String CLUSTER_STATE_KEY = "clusterState"; + public static final String NODE_STATE_KEY = "nodeState"; + public static final String DISTRIB_STATE_KEY = "distribState"; + public static final String AUTOSCALING_STATE_KEY = "autoscalingState"; + public static final String STATISTICS_STATE_KEY = "statistics"; + + private static final List REQUIRED_KEYS = Arrays.asList( + MANAGER_STATE_KEY, + CLUSTER_STATE_KEY, + NODE_STATE_KEY, + DISTRIB_STATE_KEY + ); + + public SnapshotCloudManager(SolrCloudManager other, AutoScalingConfig config) throws Exception { + this.timeSource = other.getTimeSource(); + this.clusterStateProvider = new SnapshotClusterStateProvider(other.getClusterStateProvider()); + this.nodeStateProvider = new SnapshotNodeStateProvider(other, config); + this.distribStateManager = new SnapshotDistribStateManager(other.getDistribStateManager(), config); + SimUtils.checkConsistency(this, config); + } + + public SnapshotCloudManager(Map snapshot) throws Exception { + Objects.requireNonNull(snapshot); + init( + (Map)snapshot.getOrDefault(MANAGER_STATE_KEY, Collections.emptyMap()), + (Map)snapshot.getOrDefault(CLUSTER_STATE_KEY, Collections.emptyMap()), + (Map)snapshot.getOrDefault(NODE_STATE_KEY, Collections.emptyMap()), + (Map)snapshot.getOrDefault(DISTRIB_STATE_KEY, Collections.emptyMap()) + ); + } + + public void saveSnapshot(File targetDir, boolean withAutoscaling) throws Exception { + Map snapshot = getSnapshot(withAutoscaling); + targetDir.mkdirs(); + for (Map.Entry e : snapshot.entrySet()) { + FileOutputStream out = new FileOutputStream(new File(targetDir, e.getKey() + ".json")); + IOUtils.write(Utils.toJSON(e.getValue()), out); + out.flush(); + out.close(); + } + } + + public static SnapshotCloudManager readSnapshot(File sourceDir) throws Exception { + if (!sourceDir.exists()) { + throw new Exception("Source path doesn't exist: " + sourceDir); + } + if (!sourceDir.isDirectory()) { + throw new Exception("Source path is not a directory: " + sourceDir); + } + Map snapshot = new HashMap<>(); + int validData = 0; + for (String key : REQUIRED_KEYS) { + File src = new File(sourceDir, key + ".json"); + if (src.exists()) { + InputStream is = new FileInputStream(src); + Map data = (Map)Utils.fromJSON(is); + is.close(); + snapshot.put(key, data); + validData++; + } + } + if (validData < REQUIRED_KEYS.size()) { + throw new Exception("Some data is missing - expected: " + REQUIRED_KEYS + ", found: " + snapshot.keySet()); + } + return new SnapshotCloudManager(snapshot); + } + + private void init(Map managerState, Map clusterState, Map nodeState, + Map distribState) throws Exception { + Objects.requireNonNull(managerState); + Objects.requireNonNull(clusterState); + Objects.requireNonNull(nodeState); + Objects.requireNonNull(distribState); + this.timeSource = TimeSource.get((String)managerState.getOrDefault("timeSource", "simTime:50")); + this.clusterStateProvider = new SnapshotClusterStateProvider(clusterState); + this.nodeStateProvider = new SnapshotNodeStateProvider(nodeState); + this.distribStateManager = new SnapshotDistribStateManager(distribState); + + SimUtils.checkConsistency(this, null); + } + + public Map getSnapshot(boolean withAutoscaling) throws Exception { + Map snapshot = new LinkedHashMap<>(4); + Map managerState = new HashMap<>(); + managerState.put("timeSource", timeSource.toString()); + snapshot.put(MANAGER_STATE_KEY, managerState); + + snapshot.put(CLUSTER_STATE_KEY, clusterStateProvider.getSnapshot()); + snapshot.put(NODE_STATE_KEY, nodeStateProvider.getSnapshot()); + snapshot.put(DISTRIB_STATE_KEY, distribStateManager.getSnapshot()); + if (withAutoscaling) { + AutoScalingConfig config = distribStateManager.getAutoScalingConfig(); + Policy.Session session = config.getPolicy().createSession(this); + List suggestions = PolicyHelper.getSuggestions(config, this); + Map diagnostics = new LinkedHashMap<>(); + PolicyHelper.getDiagnostics(session).toMap(diagnostics); + List> suggestionDetails = new ArrayList<>(suggestions.size()); + suggestions.forEach(s -> { + Map map = new LinkedHashMap<>(); + map.put("suggestion", s); + if (s.getOperation() != null) { + SolrParams params = s.getOperation().getParams(); + if (s.getOperation() instanceof V2Request) { + params = SimUtils.v2AdminRequestToV1Params((V2Request)s.getOperation()); + } + ReplicaInfo info = nodeStateProvider.getReplicaInfo( + params.get(CollectionAdminParams.COLLECTION), params.get("replica")); + if (info == null) { + log.warn("Can't find ReplicaInfo for suggested operation: " + s); + } else { + map.put("replica", info); + } + } + suggestionDetails.add(map); + }); + Map autoscaling = new LinkedHashMap<>(); + autoscaling.put("suggestions", suggestionDetails); + autoscaling.put("diagnostics", diagnostics); + snapshot.put(AUTOSCALING_STATE_KEY, autoscaling); + } + snapshot.put(STATISTICS_STATE_KEY, SimUtils.calculateStats(this, distribStateManager.getAutoScalingConfig(), true)); + return snapshot; + } + + @Override + public ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + + @Override + public NodeStateProvider getNodeStateProvider() { + return nodeStateProvider; + } + + @Override + public DistribStateManager getDistribStateManager() { + return distribStateManager; + } + + @Override + public DistributedQueueFactory getDistributedQueueFactory() { + return NoopDistributedQueueFactory.INSTANCE; + } + + @Override + public ObjectCache getObjectCache() { + return objectCache; + } + + @Override + public TimeSource getTimeSource() { + return timeSource; + } + + @Override + public SolrResponse request(SolrRequest req) throws IOException { + throw new UnsupportedOperationException("request"); + } + + @Override + public byte[] httpRequest(String url, SolrRequest.METHOD method, Map headers, String payload, int timeout, boolean followRedirects) throws IOException { + throw new UnsupportedOperationException("httpRequest"); + } + + @Override + public void close() throws IOException { + + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java new file mode 100644 index 00000000000..4289b28e722 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.solr.client.solrj.impl.ClusterStateProvider; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.Utils; +import org.noggit.CharArr; +import org.noggit.JSONWriter; + +/** + * Read-only snapshot of another {@link ClusterStateProvider}. + */ +public class SnapshotClusterStateProvider implements ClusterStateProvider { + + final Set liveNodes; + final ClusterState clusterState; + final Map clusterProperties; + + public SnapshotClusterStateProvider(ClusterStateProvider other) throws Exception { + liveNodes = Collections.unmodifiableSet(new HashSet<>(other.getLiveNodes())); + ClusterState otherState = other.getClusterState(); + clusterState = new ClusterState(otherState.getZNodeVersion(), liveNodes, otherState.getCollectionsMap()); + clusterProperties = new HashMap<>(other.getClusterProperties()); + } + + public SnapshotClusterStateProvider(Map snapshot) { + Objects.requireNonNull(snapshot); + liveNodes = Collections.unmodifiableSet(new HashSet<>((Collection)snapshot.getOrDefault("liveNodes", Collections.emptySet()))); + clusterProperties = (Map)snapshot.getOrDefault("clusterProperties", Collections.emptyMap()); + Map stateMap = new HashMap<>((Map)snapshot.getOrDefault("clusterState", Collections.emptyMap())); + Number version = (Number)stateMap.remove("version"); + clusterState = ClusterState.load(version != null ? version.intValue() : null, stateMap, liveNodes, ZkStateReader.CLUSTER_STATE); + } + + public Map getSnapshot() { + Map snapshot = new HashMap<>(); + snapshot.put("liveNodes", liveNodes); + if (clusterProperties != null) { + snapshot.put("clusterProperties", clusterProperties); + } + Map stateMap = new HashMap<>(); + snapshot.put("clusterState", stateMap); + stateMap.put("version", clusterState.getZNodeVersion()); + clusterState.forEachCollection(coll -> { + CharArr out = new CharArr(); + JSONWriter writer = new JSONWriter(out, 2); + coll.write(writer); + String json = out.toString(); + try { + stateMap.put(coll.getName(), Utils.fromJSON(json.getBytes("UTF-8"))); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("should not happen!", e); + } + }); + return snapshot; + } + + @Override + public ClusterState.CollectionRef getState(String collection) { + return clusterState.getCollectionRef(collection); + } + + @Override + public Set getLiveNodes() { + return liveNodes; + } + + @Override + public List resolveAlias(String alias) { + throw new UnsupportedOperationException("resolveAlias"); + } + + @Override + public Map getAliasProperties(String alias) { + throw new UnsupportedOperationException("getAliasProperties"); + } + + @Override + public ClusterState getClusterState() throws IOException { + return clusterState; + } + + @Override + public Map getClusterProperties() { + return clusterProperties; + } + + @Override + public String getPolicyNameByCollection(String coll) { + DocCollection collection = clusterState.getCollectionOrNull(coll); + return collection == null ? null : (String)collection.getProperties().get("policy"); + } + + @Override + public void connect() { + + } + + @Override + public void close() throws IOException { + + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotDistribStateManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotDistribStateManager.java new file mode 100644 index 00000000000..62b6936276b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotDistribStateManager.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +import org.apache.solr.client.solrj.cloud.DistribStateManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException; +import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException; +import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.AutoScalingParams; +import org.apache.solr.common.util.Base64; +import org.apache.solr.common.util.Utils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Read-only snapshot of another {@link DistribStateManager} + */ +public class SnapshotDistribStateManager implements DistribStateManager { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + LinkedHashMap dataMap = new LinkedHashMap<>(); + + /** + * Populate this instance from another {@link DistribStateManager} instance. + * @param other another instance + * @param config optional {@link AutoScalingConfig}, which will overwrite any existing config. + */ + public SnapshotDistribStateManager(DistribStateManager other, AutoScalingConfig config) throws Exception { + List tree = other.listTree("/"); + log.debug("- copying {} resources from {}", tree.size(), other.getClass().getSimpleName()); + for (String path : tree) { + dataMap.put(path, other.getData(path)); + } + if (config != null) { // overwrite existing + VersionedData vd = new VersionedData(config.getZkVersion(), Utils.toJSON(config), CreateMode.PERSISTENT, "0"); + dataMap.put(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, vd); + } + } + + /** + * Populate this instance from a previously generated snapshot. + * @param snapshot previous snapshot created using this class. + */ + public SnapshotDistribStateManager(Map snapshot) { + snapshot.forEach((path, value) -> { + Map map = (Map)value; + Number version = (Number)map.getOrDefault("version", 0); + String owner = (String)map.get("owner"); + String modeStr = (String)map.getOrDefault("mode", CreateMode.PERSISTENT.toString()); + CreateMode mode = CreateMode.valueOf(modeStr); + byte[] bytes = null; + if (map.containsKey("data")) { + bytes = Base64.base64ToByteArray((String)map.get("data")); + } + dataMap.put(path, new VersionedData(version.intValue(), bytes, mode, owner)); + }); + log.debug("- loaded snapshot of {} resources", dataMap.size()); + } + + /** + * Create a snapshot of all content in this instance. + */ + public Map getSnapshot() { + Map snapshot = new LinkedHashMap<>(); + dataMap.forEach((path, vd) -> { + Map data = new HashMap<>(); + vd.toMap(data); + snapshot.put(path, data); + }); + return snapshot; + } + + @Override + public boolean hasData(String path) throws IOException, KeeperException, InterruptedException { + return dataMap.containsKey(path); + } + + @Override + public List listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException { + return listData(path, null); + } + + @Override + public List listTree(String path) { + return dataMap.keySet().stream() + .filter(p -> p.startsWith(path)) + .collect(Collectors.toList()); + } + + @Override + public List listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException { + final String prefix = path + "/"; + return dataMap.entrySet().stream() + .filter(e -> e.getKey().startsWith(prefix)) + .map(e -> { + String suffix = e.getKey().substring(prefix.length()); + int idx = suffix.indexOf('/'); + if (idx == -1) { + return suffix; + } else { + return suffix.substring(0, idx); + } + }) + .collect(Collectors.toList()); + } + + @Override + public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException { + if (!dataMap.containsKey(path)) { + throw new NoSuchElementException(path); + } + return dataMap.get(path); + } + + @Override + public void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException { + throw new UnsupportedOperationException("makePath"); + } + + @Override + public void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException { + throw new UnsupportedOperationException("makePath"); + } + + @Override + public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException { + throw new UnsupportedOperationException("createData"); + } + + @Override + public void removeData(String path, int version) throws NoSuchElementException, IOException, NotEmptyException, KeeperException, InterruptedException, BadVersionException { + throw new UnsupportedOperationException("removeData"); + } + + @Override + public void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException { + throw new UnsupportedOperationException("setData"); + } + + @Override + public List multi(Iterable ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException { + throw new UnsupportedOperationException("multi"); + } + + @Override + public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException { + VersionedData vd = dataMap.get(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH); + Map map = new HashMap<>(); + if (vd != null && vd.getData() != null && vd.getData().length > 0) { + map = (Map) Utils.fromJSON(vd.getData()); + map.put(AutoScalingParams.ZK_VERSION, vd.getVersion()); + } + return new AutoScalingConfig(map); + } + + @Override + public void close() throws IOException { + + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotNodeStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotNodeStateProvider.java new file mode 100644 index 00000000000..8d22dbb4aab --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotNodeStateProvider.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.solr.client.solrj.cloud.NodeStateProvider; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; +import org.apache.solr.client.solrj.cloud.autoscaling.Variable; + +/** + * Read-only snapshot of another {@link NodeStateProvider}. + */ +public class SnapshotNodeStateProvider implements NodeStateProvider { + private Map> nodeValues = new LinkedHashMap<>(); + private Map>>> replicaInfos = new LinkedHashMap<>(); + + private static double GB = 1024.0d * 1024.0d * 1024.0d; + + /** + * Populate this instance from another instance of {@link SolrCloudManager}. + * @param other another instance + * @param config optional {@link AutoScalingConfig}, which will be used to determine what node and + * replica tags to retrieve. If this is null then the other instance's config will be used. + */ + public SnapshotNodeStateProvider(SolrCloudManager other, AutoScalingConfig config) throws Exception { + if (config == null) { + config = other.getDistribStateManager().getAutoScalingConfig(); + } + Set nodeTags = new HashSet<>(SimUtils.COMMON_NODE_TAGS); + nodeTags.addAll(config.getPolicy().getParams()); + Set replicaTags = new HashSet<>(SimUtils.COMMON_REPLICA_TAGS); + replicaTags.addAll(config.getPolicy().getPerReplicaAttributes()); + for (String node : other.getClusterStateProvider().getLiveNodes()) { + nodeValues.put(node, new LinkedHashMap<>(other.getNodeStateProvider().getNodeValues(node, nodeTags))); + Map>> infos = other.getNodeStateProvider().getReplicaInfo(node, replicaTags); + infos.forEach((collection, shards) -> { + shards.forEach((shard, replicas) -> { + replicas.forEach(r -> { + List myReplicas = replicaInfos + .computeIfAbsent(node, n -> new LinkedHashMap<>()) + .computeIfAbsent(collection, c -> new LinkedHashMap<>()) + .computeIfAbsent(shard, s -> new ArrayList<>()); + Map rMap = new LinkedHashMap<>(); + r.toMap(rMap); + if (r.isLeader) { // ReplicaInfo.toMap doesn't write this!!! + ((Map)rMap.values().iterator().next()).put("leader", "true"); + } + ReplicaInfo ri = new ReplicaInfo(rMap); + // put in "leader" again if present + if (r.isLeader) { + ri.getVariables().put("leader", "true"); + } + // externally produced snapshots may not include the right units + if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) == null) { + if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) { + Number indexSizeGB = (Number) ri.getVariable(Variable.Type.CORE_IDX.tagName); + ri.getVariables().put(Variable.Type.CORE_IDX.metricsAttribute, indexSizeGB.doubleValue() * GB); + } else { + throw new RuntimeException("Missing size information for replica: " + ri); + } + } + myReplicas.add(ri); + }); + }); + }); + } + } + + /** + * Populate this instance from a previously generated snapshot. + * @param snapshot previous snapshot created using this class. + */ + public SnapshotNodeStateProvider(Map snapshot) { + Objects.requireNonNull(snapshot); + nodeValues = (Map>)snapshot.getOrDefault("nodeValues", Collections.emptyMap()); + ((Map)snapshot.getOrDefault("replicaInfos", Collections.emptyMap())).forEach((node, v) -> { + Map>> perNode = replicaInfos.computeIfAbsent(node, n -> new LinkedHashMap<>()); + ((Map)v).forEach((collection, shards) -> { + Map> perColl = perNode.computeIfAbsent(collection, c -> new LinkedHashMap<>()); + ((Map)shards).forEach((shard, replicas) -> { + List infos = perColl.computeIfAbsent(shard, s -> new ArrayList<>()); + ((List>)replicas).forEach(replicaMap -> { + ReplicaInfo ri = new ReplicaInfo(new LinkedHashMap<>(replicaMap)); // constructor modifies this map + if (ri.isLeader) { + ri.getVariables().put("leader", "true"); + } + // externally produced snapshots may not include the right units + if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) == null) { + if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) { + Number indexSizeGB = (Number) ri.getVariable(Variable.Type.CORE_IDX.tagName); + ri.getVariables().put(Variable.Type.CORE_IDX.metricsAttribute, indexSizeGB.doubleValue() * GB); + } else { + throw new RuntimeException("Missing size information for replica: " + ri); + } + } + infos.add(ri); + }); + }); + }); + }); + } + + /** + * Create a snapshot of all node and replica tag values available from the original source, per the original + * autoscaling configuration. Note: + */ + public Map getSnapshot() { + Map snapshot = new LinkedHashMap<>(); + snapshot.put("nodeValues", nodeValues); + Map>>>> replicaInfosMap = new LinkedHashMap<>(); + snapshot.put("replicaInfos", replicaInfosMap); + replicaInfos.forEach((node, perNode) -> { + perNode.forEach((collection, shards) -> { + shards.forEach((shard, replicas) -> { + replicas.forEach(r -> { + List> myReplicas = replicaInfosMap + .computeIfAbsent(node, n -> new LinkedHashMap<>()) + .computeIfAbsent(collection, c -> new LinkedHashMap<>()) + .computeIfAbsent(shard, s -> new ArrayList<>()); + Map rMap = new LinkedHashMap<>(); + r.toMap(rMap); + if (r.isLeader) { // ReplicaInfo.toMap doesn't write this!!! + ((Map)rMap.values().iterator().next()).put("leader", "true"); + } + myReplicas.add(rMap); + }); + }); + }); + }); + return snapshot; + } + + @Override + public Map getNodeValues(String node, Collection tags) { + return nodeValues.getOrDefault(node, Collections.emptyMap()); + } + + @Override + public Map>> getReplicaInfo(String node, Collection keys) { + return replicaInfos.getOrDefault(node, Collections.emptyMap()); + } + + public ReplicaInfo getReplicaInfo(String collection, String coreNode) { + for (Map>> perNode : replicaInfos.values()) { + for (List perShard : perNode.getOrDefault(collection, Collections.emptyMap()).values()) { + for (ReplicaInfo ri : perShard) { + if (ri.getName().equals(coreNode)) { + return ri; + } + } + } + } + return null; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/solr/core/src/java/org/apache/solr/util/SolrCLI.java b/solr/core/src/java/org/apache/solr/util/SolrCLI.java index db31241c162..6d46db116a6 100755 --- a/solr/core/src/java/org/apache/solr/util/SolrCLI.java +++ b/solr/core/src/java/org/apache/solr/util/SolrCLI.java @@ -53,11 +53,9 @@ import java.util.Map; import java.util.Optional; import java.util.Scanner; import java.util.Set; -import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.ZipEntry; @@ -99,13 +97,11 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.cloud.DistributedQueue; -import org.apache.solr.client.solrj.cloud.DistributedQueueFactory; import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; -import org.apache.solr.client.solrj.cloud.autoscaling.Row; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.Variable; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -116,9 +112,13 @@ import org.apache.solr.client.solrj.impl.SolrClientCloudManager; import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; +import org.apache.solr.client.solrj.request.V2Request; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.autoscaling.sim.NoopDistributedQueueFactory; import org.apache.solr.cloud.autoscaling.sim.SimCloudManager; +import org.apache.solr.cloud.autoscaling.sim.SimUtils; +import org.apache.solr.cloud.autoscaling.sim.SnapshotCloudManager; import org.apache.solr.common.MapWriter; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -132,6 +132,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ContentStreamBase; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; @@ -858,12 +859,10 @@ public class SolrCLI implements CLIO { } - public static class AutoscalingTool extends SolrCloudTool { + public static class AutoscalingTool extends ToolBase { static final String NODE_REDACTION_PREFIX = "N_"; static final String COLL_REDACTION_PREFIX = "COLL_"; - private boolean verbose; - public AutoscalingTool() { this(CLIO.getOutStream()); } @@ -911,6 +910,16 @@ public class SolrCLI implements CLIO { OptionBuilder .withDescription("Show summarized collection & node statistics.") .create("stats"), + OptionBuilder + .withDescription("Store autoscaling snapshot of the current cluster.") + .withArgName("DIR") + .hasArg() + .create("save"), + OptionBuilder + .withDescription("Load autoscaling snapshot of the cluster instead of using the real one.") + .withArgName("DIR") + .hasArg() + .create("load"), OptionBuilder .withDescription("Simulate execution of all suggestions.") .create("simulate"), @@ -920,6 +929,12 @@ public class SolrCLI implements CLIO { .hasArg() .withLongOpt("iterations") .create("i"), + OptionBuilder + .withDescription("Save autoscaling shapshots at each step of simulated execution.") + .withArgName("DIR") + .withLongOpt("saveSimulated") + .hasArg() + .create("ss"), OptionBuilder .withDescription("Turn on all options to get all available information.") .create("all") @@ -932,96 +947,109 @@ public class SolrCLI implements CLIO { return "autoscaling"; } - @Override - protected void runCloudTool(CloudSolrClient cloudSolrClient, CommandLine cli) throws Exception { - DistributedQueueFactory dummmyFactory = new DistributedQueueFactory() { - @Override - public DistributedQueue makeQueue(String path) throws IOException { - throw new UnsupportedOperationException("makeQueue"); - } - - @Override - public void removeQueue(String path) throws IOException { - throw new UnsupportedOperationException("removeQueue"); - } - }; - try (SolrClientCloudManager realCloudManager = new SolrClientCloudManager(dummmyFactory, cloudSolrClient)) { - AutoScalingConfig config = null; - HashSet liveNodes = new HashSet<>(); - String configFile = cli.getOptionValue("a"); - if (configFile != null) { - if (verbose) { - log.info("- reading autoscaling config from " + configFile); - } - config = new AutoScalingConfig(IOUtils.toByteArray(new FileInputStream(configFile))); - } else { - if (verbose) { - log.info("- reading autoscaling config from the cluster."); - } - config = realCloudManager.getDistribStateManager().getAutoScalingConfig(); - } - // freeze the cluster state - SimCloudManager cloudManager = SimCloudManager.createCluster(realCloudManager, TimeSource.get("simTime:50")); - liveNodes.addAll(cloudManager.getClusterStateProvider().getLiveNodes()); - boolean withSuggestions = cli.hasOption("s"); - boolean withDiagnostics = cli.hasOption("d") || cli.hasOption("n"); - boolean withSortedNodes = cli.hasOption("n"); - boolean withClusterState = cli.hasOption("c"); - boolean withStats = cli.hasOption("stats"); - boolean redact = cli.hasOption("r"); - if (cli.hasOption("all")) { - withSuggestions = true; - withDiagnostics = true; - withSortedNodes = true; - withClusterState = true; - withStats = true; - } - // prepare to redact also host names / IPs in base_url and other properties - Set redactNames = new HashSet<>(); - for (String nodeName : liveNodes) { - String urlString = Utils.getBaseUrlForNodeName(nodeName, "http"); - try { - URL u = new URL(urlString); - // protocol format - redactNames.add(u.getHost() + ":" + u.getPort()); - // node name format - redactNames.add(u.getHost() + "_" + u.getPort() + "_"); - } catch (MalformedURLException e) { - log.warn("Invalid URL for node name " + nodeName + ", replacing including protocol and path", e); - redactNames.add(urlString); - redactNames.add(Utils.getBaseUrlForNodeName(nodeName, "https")); - } - } - // redact collection names too - Set redactCollections = new HashSet<>(); - ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState(); - clusterState.forEachCollection(coll -> redactCollections.add(coll.getName())); - if (!withSuggestions && !withDiagnostics) { - withSuggestions = true; - } - Map results = prepareResults(cloudManager, config, withClusterState, - withStats, withSuggestions, withSortedNodes, withDiagnostics); - if (cli.hasOption("simulate")) { - String iterStr = cli.getOptionValue("i", "10"); - int iterations; - try { - iterations = Integer.parseInt(iterStr); - } catch (Exception e) { - log.warn("Invalid option 'i' value, using default 10:" + e); - iterations = 10; - } - Map simulationResults = new HashMap<>(); - simulate(cloudManager, config, simulationResults, withClusterState, - withStats, withSuggestions, withSortedNodes, withDiagnostics, iterations); - results.put("simulation", simulationResults); - } - String data = Utils.toJSONString(results); - if (redact) { - data = RedactionUtils.redactNames(redactCollections, COLL_REDACTION_PREFIX, data); - data = RedactionUtils.redactNames(redactNames, NODE_REDACTION_PREFIX, data); - } - stdout.println(data); + protected void runImpl(CommandLine cli) throws Exception { + raiseLogLevelUnlessVerbose(cli); + SnapshotCloudManager cloudManager; + AutoScalingConfig config = null; + String configFile = cli.getOptionValue("a"); + if (configFile != null) { + CLIO.err("- reading autoscaling config from " + configFile); + config = new AutoScalingConfig(IOUtils.toByteArray(new FileInputStream(configFile))); } + if (cli.hasOption("load")) { + File sourceDir = new File(cli.getOptionValue("load")); + CLIO.err("- loading autoscaling snapshot from " + sourceDir.getAbsolutePath()); + cloudManager = SnapshotCloudManager.readSnapshot(sourceDir); + if (config == null) { + CLIO.err("- reading autoscaling config from the snapshot."); + config = cloudManager.getDistribStateManager().getAutoScalingConfig(); + } + } else { + String zkHost = cli.getOptionValue("zkHost", ZK_HOST); + + log.debug("Connecting to Solr cluster: " + zkHost); + try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkHost), Optional.empty()).build()) { + + String collection = cli.getOptionValue("collection"); + if (collection != null) + cloudSolrClient.setDefaultCollection(collection); + + cloudSolrClient.connect(); + try (SolrClientCloudManager realCloudManager = new SolrClientCloudManager(NoopDistributedQueueFactory.INSTANCE, cloudSolrClient)) { + if (config == null) { + CLIO.err("- reading autoscaling config from the cluster."); + config = realCloudManager.getDistribStateManager().getAutoScalingConfig(); + } + cloudManager = new SnapshotCloudManager(realCloudManager, config); + } + } + } + if (cli.hasOption("save")) { + File targetDir = new File(cli.getOptionValue("save")); + cloudManager.saveSnapshot(targetDir, true); + CLIO.err("- saved autoscaling snapshot to " + targetDir.getAbsolutePath()); + } + HashSet liveNodes = new HashSet<>(); + liveNodes.addAll(cloudManager.getClusterStateProvider().getLiveNodes()); + boolean withSuggestions = cli.hasOption("s"); + boolean withDiagnostics = cli.hasOption("d") || cli.hasOption("n"); + boolean withSortedNodes = cli.hasOption("n"); + boolean withClusterState = cli.hasOption("c"); + boolean withStats = cli.hasOption("stats"); + boolean redact = cli.hasOption("r"); + if (cli.hasOption("all")) { + withSuggestions = true; + withDiagnostics = true; + withSortedNodes = true; + withClusterState = true; + withStats = true; + } + // prepare to redact also host names / IPs in base_url and other properties + Set redactNames = new HashSet<>(); + for (String nodeName : liveNodes) { + String urlString = Utils.getBaseUrlForNodeName(nodeName, "http"); + try { + URL u = new URL(urlString); + // protocol format + redactNames.add(u.getHost() + ":" + u.getPort()); + // node name format + redactNames.add(u.getHost() + "_" + u.getPort() + "_"); + } catch (MalformedURLException e) { + log.warn("Invalid URL for node name " + nodeName + ", replacing including protocol and path", e); + redactNames.add(urlString); + redactNames.add(Utils.getBaseUrlForNodeName(nodeName, "https")); + } + } + // redact collection names too + Set redactCollections = new HashSet<>(); + ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState(); + clusterState.forEachCollection(coll -> redactCollections.add(coll.getName())); + if (!withSuggestions && !withDiagnostics) { + withSuggestions = true; + } + Map results = prepareResults(cloudManager, config, withClusterState, + withStats, withSuggestions, withSortedNodes, withDiagnostics); + if (cli.hasOption("simulate")) { + String iterStr = cli.getOptionValue("i", "10"); + String saveSimulated = cli.getOptionValue("saveSimulated"); + int iterations; + try { + iterations = Integer.parseInt(iterStr); + } catch (Exception e) { + log.warn("Invalid option 'i' value, using default 10:" + e); + iterations = 10; + } + Map simulationResults = new HashMap<>(); + simulate(cloudManager, config, simulationResults, saveSimulated, withClusterState, + withStats, withSuggestions, withSortedNodes, withDiagnostics, iterations); + results.put("simulation", simulationResults); + } + String data = Utils.toJSONString(results); + if (redact) { + data = RedactionUtils.redactNames(redactCollections, COLL_REDACTION_PREFIX, data); + data = RedactionUtils.redactNames(redactNames, NODE_REDACTION_PREFIX, data); + } + stdout.println(data); } private Map prepareResults(SolrCloudManager clientCloudManager, @@ -1033,23 +1061,24 @@ public class SolrCLI implements CLIO { boolean withDiagnostics) throws Exception { Policy.Session session = config.getPolicy().createSession(clientCloudManager); ClusterState clusterState = clientCloudManager.getClusterStateProvider().getClusterState(); - if (verbose) { - log.info("- calculating suggestions..."); + List suggestions = Collections.emptyList(); + long start, end; + if (withSuggestions) { + CLIO.err("- calculating suggestions..."); + start = TimeSource.NANO_TIME.getTimeNs(); + suggestions = PolicyHelper.getSuggestions(config, clientCloudManager); + end = TimeSource.NANO_TIME.getTimeNs(); + CLIO.err(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)"); } - long start = TimeSource.NANO_TIME.getTimeNs(); - List suggestions = PolicyHelper.getSuggestions(config, clientCloudManager); - long end = TimeSource.NANO_TIME.getTimeNs(); - if (verbose) { - log.info(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)"); - log.info("- calculating diagnostics..."); - } - start = TimeSource.NANO_TIME.getTimeNs(); - MapWriter mw = PolicyHelper.getDiagnostics(session); - Map diagnostics = new LinkedHashMap<>(); - mw.toMap(diagnostics); - end = TimeSource.NANO_TIME.getTimeNs(); - if (verbose) { - log.info(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)"); + Map diagnostics = Collections.emptyMap(); + if (withDiagnostics) { + CLIO.err("- calculating diagnostics..."); + start = TimeSource.NANO_TIME.getTimeNs(); + MapWriter mw = PolicyHelper.getDiagnostics(session); + diagnostics = new LinkedHashMap<>(); + mw.toMap(diagnostics); + end = TimeSource.NANO_TIME.getTimeNs(); + CLIO.err(" (took " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms)"); } Map results = new LinkedHashMap<>(); if (withClusterState) { @@ -1060,131 +1089,7 @@ public class SolrCLI implements CLIO { results.put("CLUSTERSTATE", map); } if (withStats) { - Map> collStats = new TreeMap<>(); - clusterState.forEachCollection(coll -> { - Map perColl = collStats.computeIfAbsent(coll.getName(), n -> new LinkedHashMap<>()); - AtomicInteger numCores = new AtomicInteger(); - HashMap> nodes = new HashMap<>(); - coll.getSlices().forEach(s -> { - numCores.addAndGet(s.getReplicas().size()); - s.getReplicas().forEach(r -> { - nodes.computeIfAbsent(r.getNodeName(), n -> new HashMap<>()) - .computeIfAbsent(s.getName(), slice -> new AtomicInteger()).incrementAndGet(); - }); - }); - int maxCoresPerNode = 0; - int minCoresPerNode = 0; - int maxActualShardsPerNode = 0; - int minActualShardsPerNode = 0; - int maxShardReplicasPerNode = 0; - int minShardReplicasPerNode = 0; - if (!nodes.isEmpty()) { - minCoresPerNode = Integer.MAX_VALUE; - minActualShardsPerNode = Integer.MAX_VALUE; - minShardReplicasPerNode = Integer.MAX_VALUE; - for (Map counts : nodes.values()) { - int total = counts.values().stream().mapToInt(c -> c.get()).sum(); - for (AtomicInteger count : counts.values()) { - if (count.get() > maxShardReplicasPerNode) { - maxShardReplicasPerNode = count.get(); - } - if (count.get() < minShardReplicasPerNode) { - minShardReplicasPerNode = count.get(); - } - } - if (total > maxCoresPerNode) { - maxCoresPerNode = total; - } - if (total < minCoresPerNode) { - minCoresPerNode = total; - } - if (counts.size() > maxActualShardsPerNode) { - maxActualShardsPerNode = counts.size(); - } - if (counts.size() < minActualShardsPerNode) { - minActualShardsPerNode = counts.size(); - } - } - } - perColl.put("activeShards", coll.getActiveSlices().size()); - perColl.put("inactiveShards", coll.getSlices().size() - coll.getActiveSlices().size()); - perColl.put("rf", coll.getReplicationFactor()); - perColl.put("maxShardsPerNode", coll.getMaxShardsPerNode()); - perColl.put("maxActualShardsPerNode", maxActualShardsPerNode); - perColl.put("minActualShardsPerNode", minActualShardsPerNode); - perColl.put("maxShardReplicasPerNode", maxShardReplicasPerNode); - perColl.put("minShardReplicasPerNode", minShardReplicasPerNode); - perColl.put("numCores", numCores.get()); - perColl.put("numNodes", nodes.size()); - perColl.put("maxCoresPerNode", maxCoresPerNode); - perColl.put("minCoresPerNode", minCoresPerNode); - }); - Map> nodeStats = new TreeMap<>(); - Map coreStats = new TreeMap<>(); - for (Row row : session.getSortedNodes()) { - Map nodeStat = nodeStats.computeIfAbsent(row.node, n -> new LinkedHashMap<>()); - nodeStat.put("isLive", row.isLive()); - nodeStat.put("freedisk", row.getVal("freedisk", 0)); - nodeStat.put("totaldisk", row.getVal("totaldisk", 0)); - int cores = ((Number)row.getVal("cores", 0)).intValue(); - nodeStat.put("cores", cores); - coreStats.computeIfAbsent(cores, num -> new AtomicInteger()).incrementAndGet(); - Map>> collReplicas = new TreeMap<>(); - row.forEachReplica(ri -> { - Map perReplica = collReplicas.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>()) - .computeIfAbsent(ri.getCore().substring(ri.getCollection().length() + 1), core -> new LinkedHashMap<>()); -// if (ri.getVariable(Variable.Type.CORE_IDX.tagName) != null) { -// perReplica.put(Variable.Type.CORE_IDX.tagName, ri.getVariable(Variable.Type.CORE_IDX.tagName)); -// } - if (ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute) != null) { - perReplica.put(Variable.Type.CORE_IDX.metricsAttribute, ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute)); - } - perReplica.put("coreNode", ri.getName()); - if (ri.getBool("leader", false)) { - perReplica.put("leader", true); - Double totalSize = (Double)collStats.computeIfAbsent(ri.getCollection(), c -> new HashMap<>()) - .computeIfAbsent("avgShardSize", size -> 0.0); - Number riSize = (Number)ri.getVariable(Variable.Type.CORE_IDX.metricsAttribute); - if (riSize != null) { - totalSize += riSize.doubleValue(); - collStats.get(ri.getCollection()).put("avgShardSize", totalSize); - Double max = (Double)collStats.get(ri.getCollection()).get("maxShardSize"); - if (max == null) max = 0.0; - if (riSize.doubleValue() > max) { - collStats.get(ri.getCollection()).put("maxShardSize", riSize.doubleValue()); - } - Double min = (Double)collStats.get(ri.getCollection()).get("minShardSize"); - if (min == null) min = Double.MAX_VALUE; - if (riSize.doubleValue() < min) { - collStats.get(ri.getCollection()).put("minShardSize", riSize.doubleValue()); - } - } - } - nodeStat.put("replicas", collReplicas); - }); - } - - // calculate average per shard and convert the units - for (Map perColl : collStats.values()) { - Number avg = perColl.get("avgShardSize"); - if (avg != null) { - avg = avg.doubleValue() / perColl.get("activeShards").doubleValue(); - perColl.put("avgShardSize", (Number)Variable.Type.CORE_IDX.convertVal(avg)); - } - Number num = perColl.get("maxShardSize"); - if (num != null) { - perColl.put("maxShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num)); - } - num = perColl.get("minShardSize"); - if (num != null) { - perColl.put("minShardSize", (Number)Variable.Type.CORE_IDX.convertVal(num)); - } - } - Map stats = new LinkedHashMap<>(); - results.put("STATISTICS", stats); - stats.put("coresPerNodes", coreStats); - stats.put("nodeStats", nodeStats); - stats.put("collectionStats", collStats); + results.put("STATISTICS", SimUtils.calculateStats(clientCloudManager, config, verbose)); } if (withSuggestions) { results.put("SUGGESTIONS", suggestions); @@ -1198,43 +1103,126 @@ public class SolrCLI implements CLIO { return results; } - private void simulate(SimCloudManager simCloudManager, + + private void simulate(SolrCloudManager cloudManager, AutoScalingConfig config, Map results, + String saveSimulated, boolean withClusterState, boolean withStats, boolean withSuggestions, boolean withSortedNodes, boolean withDiagnostics, int iterations) throws Exception { - int loop = iterations; + File saveDir = null; + if (saveSimulated != null) { + saveDir = new File(saveSimulated); + if (!saveDir.exists()) { + if (!saveDir.mkdirs()) { + throw new Exception("Unable to create 'saveSimulated' directory: " + saveDir.getAbsolutePath()); + } + } else if (!saveDir.isDirectory()) { + throw new Exception("'saveSimulated' path exists and is not a directory! " + saveDir.getAbsolutePath()); + } + } + int SPEED = 50; + SimCloudManager simCloudManager = SimCloudManager.createCluster(cloudManager, config, TimeSource.get("simTime:" + SPEED)); + int loop = 0; List suggestions = Collections.emptyList(); Map intermediate = new LinkedHashMap<>(); - results.put("intermediateSuggestions", intermediate); - while (loop-- > 0) { + results.put("intermediate", intermediate); + while (loop < iterations) { + LinkedHashMap perStep = new LinkedHashMap<>(); + long start = TimeSource.NANO_TIME.getTimeNs(); suggestions = PolicyHelper.getSuggestions(config, simCloudManager); - log.info("-- step " + (iterations - loop) + ", " + suggestions.size() + " suggestions."); + CLIO.err("-- step " + loop + ", " + suggestions.size() + " suggestions."); + long end = TimeSource.NANO_TIME.getTimeNs(); + CLIO.err(" - calculated in " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms (real time ≈ simulated time)"); if (suggestions.isEmpty()) { break; } - intermediate.put("step" + (iterations - loop), suggestions); + SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(simCloudManager, config); + if (saveDir != null) { + File target = new File(saveDir, "step" + loop + "_start"); + snapshotCloudManager.saveSnapshot(target, true); + } + if (verbose) { + Map snapshot = snapshotCloudManager.getSnapshot(false); + snapshot.remove(SnapshotCloudManager.DISTRIB_STATE_KEY); + snapshot.remove(SnapshotCloudManager.MANAGER_STATE_KEY); + perStep.put("snapshotStart", snapshot); + } + intermediate.put("step" + loop, perStep); int unresolvedCount = 0; + start = TimeSource.NANO_TIME.getTimeNs(); + List> perStepOps = new ArrayList<>(suggestions.size()); + if (withSuggestions) { + perStep.put("suggestions", suggestions); + perStep.put("opDetails", perStepOps); + } for (Suggester.SuggestionInfo suggestion : suggestions) { SolrRequest operation = suggestion.getOperation(); if (operation == null) { unresolvedCount++; if (suggestion.getViolation() == null) { - log.info(" - ignoring suggestion without violation and without operation: " + suggestion); + CLIO.err(" - ignoring suggestion without violation and without operation: " + suggestion); } continue; } - simCloudManager.request(operation); + SolrParams params = operation.getParams(); + if (operation instanceof V2Request) { + params = SimUtils.v2AdminRequestToV1Params((V2Request)operation); + } + Map paramsMap = new LinkedHashMap<>(); + params.toMap(paramsMap); + ReplicaInfo info = simCloudManager.getSimClusterStateProvider().simGetReplicaInfo( + params.get(CollectionAdminParams.COLLECTION), params.get("replica")); + if (info == null) { + CLIO.err("Could not find ReplicaInfo for params: " + params); + } else if (verbose) { + paramsMap.put("replicaInfo", info); + } else if (info.getVariable(Variable.Type.CORE_IDX.tagName) != null) { + paramsMap.put(Variable.Type.CORE_IDX.tagName, info.getVariable(Variable.Type.CORE_IDX.tagName)); + } + if (withSuggestions) { + perStepOps.add(paramsMap); + } + try { + simCloudManager.request(operation); + } catch (Exception e) { + CLIO.err("Aborting - error executing suggestion " + suggestion + ": " + e); + Map error = new HashMap<>(); + error.put("suggestion", suggestion); + error.put("replicaInfo", info); + error.put("exception", e); + perStep.put("error", error); + break; + } } + end = TimeSource.NANO_TIME.getTimeNs(); + long realTime = TimeUnit.NANOSECONDS.toMillis(end - start); + long simTime = realTime * SPEED; + CLIO.err(" - executed in " + realTime + " ms (real time), " + simTime + " ms (simulated time)"); if (unresolvedCount == suggestions.size()) { - log.info("--- aborting simulation, only unresolved violations remain"); + CLIO.err("--- aborting simulation, only unresolved violations remain"); break; } + if (withStats) { + perStep.put("statsExecutionStop", SimUtils.calculateStats(simCloudManager, config, verbose)); + } + snapshotCloudManager = new SnapshotCloudManager(simCloudManager, config); + if (saveDir != null) { + File target = new File(saveDir, "step" + loop + "_stop"); + snapshotCloudManager.saveSnapshot(target, true); + } + if (verbose) { + Map snapshot = snapshotCloudManager.getSnapshot(false); + snapshot.remove(SnapshotCloudManager.DISTRIB_STATE_KEY); + snapshot.remove(SnapshotCloudManager.MANAGER_STATE_KEY); + perStep.put("snapshotStop", snapshot); + } + loop++; } - if (loop == 0 && !suggestions.isEmpty()) { + if (loop == iterations && !suggestions.isEmpty()) { CLIO.err("### Failed to apply all suggestions in " + iterations + " steps. Remaining suggestions: " + suggestions + "\n"); } results.put("finalState", prepareResults(simCloudManager, config, withClusterState, withStats, diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimClusterStateProvider.java index 2ddc92e06e4..0cb56018403 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimClusterStateProvider.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimClusterStateProvider.java @@ -107,7 +107,7 @@ public class TestSimClusterStateProvider extends SolrCloudTestCase { if (simulated) { // initialize simulated provider - SimCloudManager simCloudManager = SimCloudManager.createCluster(realManager, TimeSource.get("simTime:10")); + SimCloudManager simCloudManager = SimCloudManager.createCluster(realManager, null, TimeSource.get("simTime:10")); // simCloudManager.getSimClusterStateProvider().simSetClusterProperties(clusterProperties); // simCloudManager.getSimDistribStateManager().simSetAutoScalingConfig(autoScalingConfig); // nodeValues.forEach((n, values) -> { diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java new file mode 100644 index 00000000000..11ed89c6170 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.solr.client.solrj.cloud.DistribStateManager; +import org.apache.solr.client.solrj.cloud.NodeStateProvider; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; +import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.CollectionAdminParams; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class TestSnapshotCloudManager extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static int NODE_COUNT = 3; + + private static SolrCloudManager realManager; + + // set up a real cluster as the source of test data + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(NODE_COUNT) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2, 0, 1) + .process(cluster.getSolrClient()); + realManager = cluster.getJettySolrRunner(cluster.getJettySolrRunners().size() - 1).getCoreContainer() + .getZkController().getSolrCloudManager(); + } + + @Test + public void testSnapshots() throws Exception { + SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(realManager, null); + Map snapshot = snapshotCloudManager.getSnapshot(true); + SnapshotCloudManager snapshotCloudManager1 = new SnapshotCloudManager(snapshot); + assertClusterStateEquals(realManager.getClusterStateProvider().getClusterState(), snapshotCloudManager.getClusterStateProvider().getClusterState()); + assertClusterStateEquals(realManager.getClusterStateProvider().getClusterState(), snapshotCloudManager1.getClusterStateProvider().getClusterState()); + // this will always fail because the metrics will be already different + // assertNodeStateProvider(realManager, snapshotCloudManager); + assertNodeStateProvider(snapshotCloudManager, snapshotCloudManager1); + assertDistribStateManager(snapshotCloudManager.getDistribStateManager(), snapshotCloudManager1.getDistribStateManager()); + } + + @Test + public void testPersistance() throws Exception { + Path tmpPath = createTempDir(); + File tmpDir = tmpPath.toFile(); + SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(realManager, null); + snapshotCloudManager.saveSnapshot(tmpDir, true); + SnapshotCloudManager snapshotCloudManager1 = SnapshotCloudManager.readSnapshot(tmpDir); + assertClusterStateEquals(snapshotCloudManager.getClusterStateProvider().getClusterState(), snapshotCloudManager1.getClusterStateProvider().getClusterState()); + assertNodeStateProvider(snapshotCloudManager, snapshotCloudManager1); + assertDistribStateManager(snapshotCloudManager.getDistribStateManager(), snapshotCloudManager1.getDistribStateManager()); + } + + @Test + public void testSimulatorFromSnapshot() throws Exception { + Path tmpPath = createTempDir(); + File tmpDir = tmpPath.toFile(); + SnapshotCloudManager snapshotCloudManager = new SnapshotCloudManager(realManager, null); + snapshotCloudManager.saveSnapshot(tmpDir, true); + SnapshotCloudManager snapshotCloudManager1 = SnapshotCloudManager.readSnapshot(tmpDir); + try (SimCloudManager simCloudManager = SimCloudManager.createCluster(snapshotCloudManager1, null, TimeSource.get("simTime:50"))) { + assertClusterStateEquals(snapshotCloudManager.getClusterStateProvider().getClusterState(), simCloudManager.getClusterStateProvider().getClusterState()); + assertNodeStateProvider(snapshotCloudManager, simCloudManager); + assertDistribStateManager(snapshotCloudManager.getDistribStateManager(), simCloudManager.getDistribStateManager()); + ClusterState state = simCloudManager.getClusterStateProvider().getClusterState(); + Replica r = state.getCollection(CollectionAdminParams.SYSTEM_COLL).getReplicas().get(0); + // get another node + String target = null; + for (String node : simCloudManager.getClusterStateProvider().getLiveNodes()) { + if (!node.equals(r.getNodeName())) { + target = node; + break; + } + } + if (target == null) { + fail("can't find suitable target node for replica " + r + ", liveNodes=" + simCloudManager.getClusterStateProvider().getLiveNodes()); + } + CollectionAdminRequest.MoveReplica moveReplica = CollectionAdminRequest + .moveReplica(CollectionAdminParams.SYSTEM_COLL, r.getName(), target); + log.info("################"); + simCloudManager.simGetSolrClient().request(moveReplica); + } + } + + private static void assertNodeStateProvider(SolrCloudManager oneMgr, SolrCloudManager twoMgr) throws Exception { + NodeStateProvider one = oneMgr.getNodeStateProvider(); + NodeStateProvider two = twoMgr.getNodeStateProvider(); + for (String node : oneMgr.getClusterStateProvider().getLiveNodes()) { + Map oneVals = one.getNodeValues(node, SimUtils.COMMON_NODE_TAGS); + Map twoVals = two.getNodeValues(node, SimUtils.COMMON_NODE_TAGS); + oneVals = Utils.getDeepCopy(oneVals, 10, false, true); + twoVals = Utils.getDeepCopy(twoVals, 10, false, true); + assertEquals(Utils.toJSONString(oneVals), Utils.toJSONString(twoVals)); + Map>> oneInfos = one.getReplicaInfo(node, SimUtils.COMMON_REPLICA_TAGS); + Map>> twoInfos = two.getReplicaInfo(node, SimUtils.COMMON_REPLICA_TAGS); + assertEquals(Utils.fromJSON(Utils.toJSON(oneInfos)), Utils.fromJSON(Utils.toJSON(twoInfos))); + } + } + + // ignore these because SimCloudManager always modifies them + private static final Set IGNORE_PATTERNS = new HashSet<>(Arrays.asList( + Pattern.compile("/autoscaling/triggerState.*"), + Pattern.compile("/clusterstate\\.json"), // different format in SimClusterStateProvider + Pattern.compile("/collections/[^/]+?/leader_elect/.*"), + Pattern.compile("/collections/[^/]+?/leaders/.*"), + Pattern.compile("/live_nodes/.*") + )); + + private static final Predicate FILTER_FUN = p -> { + for (Pattern pattern : IGNORE_PATTERNS) { + if (pattern.matcher(p).matches()) { + return false; + } + } + return true; + }; + + private static void assertDistribStateManager(DistribStateManager one, DistribStateManager two) throws Exception { + List treeOne = new ArrayList<>(one.listTree("/").stream() + .filter(FILTER_FUN).collect(Collectors.toList())); + List treeTwo = new ArrayList<>(two.listTree("/").stream() + .filter(FILTER_FUN).collect(Collectors.toList())); + Collections.sort(treeOne); + Collections.sort(treeTwo); + assertEquals(treeOne, treeTwo); + for (String path : treeOne) { + VersionedData vd1 = one.getData(path); + VersionedData vd2 = two.getData(path); + assertEquals(path, vd1, vd2); + } + } + + private static void assertClusterStateEquals(ClusterState one, ClusterState two) { + assertEquals(one.getLiveNodes(), two.getLiveNodes()); + assertEquals(one.getCollectionsMap().keySet(), two.getCollectionsMap().keySet()); + one.forEachCollection(oneColl -> { + DocCollection twoColl = two.getCollection(oneColl.getName()); + Map oneSlices = oneColl.getSlicesMap(); + Map twoSlices = twoColl.getSlicesMap(); + assertEquals(oneSlices.keySet(), twoSlices.keySet()); + oneSlices.forEach((s, slice) -> { + Slice sTwo = twoSlices.get(s); + for (Replica oneReplica : slice.getReplicas()) { + Replica twoReplica = sTwo.getReplica(oneReplica.getName()); + assertNotNull(twoReplica); + assertReplicaEquals(oneReplica, twoReplica); + } + }); + }); + } + + private static void assertReplicaEquals(Replica one, Replica two) { + assertEquals(one.getName(), two.getName()); + assertEquals(one.getNodeName(), two.getNodeName()); + assertEquals(one.getState(), two.getState()); + assertEquals(one.getType(), two.getType()); + Map filteredPropsOne = one.getProperties().entrySet().stream() + .filter(e -> !(e.getKey().startsWith("INDEX") || e.getKey().startsWith("QUERY") || e.getKey().startsWith("UPDATE"))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map filteredPropsTwo = two.getProperties().entrySet().stream() + .filter(e -> !(e.getKey().startsWith("INDEX") || e.getKey().startsWith("QUERY") || e.getKey().startsWith("UPDATE"))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + assertEquals(filteredPropsOne, filteredPropsTwo); + } + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java index 6bc328be73f..7c488316e15 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java @@ -466,6 +466,10 @@ public class Policy implements MapWriter { return params.stream().map(Pair::first).collect(toList()); } + public List getPerReplicaAttributes() { + return Collections.unmodifiableList(perReplicaAttributes); + } + /** * Compares two {@link Row} loads according to a policy. * diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java index 9c5cf316a2f..19bd16188ea 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java @@ -72,7 +72,7 @@ public class ReplicaInfo implements MapWriter { this.node = node; } - ReplicaInfo(Map map) { + public ReplicaInfo(Map map) { this.name = map.keySet().iterator().next(); Map details = (Map) map.get(name); details = Utils.getDeepCopy(details, 4); @@ -171,6 +171,30 @@ public class ReplicaInfo implements MapWriter { } } + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (!(o instanceof ReplicaInfo)) { + return false; + } + ReplicaInfo other = (ReplicaInfo)o; + if ( + name.equals(other.name) && + collection.equals(other.collection) && + core.equals(other.core) && + isLeader == other.isLeader && + node.equals(other.node) && + shard.equals(other.shard) && + type == other.type && + variables.equals(other.variables)) { + return true; + } else { + return false; + } + } + @Override public String toString() { return Utils.toJSONString(this); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VersionedData.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VersionedData.java index e81d50ba37d..2aa4a9b2f23 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VersionedData.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VersionedData.java @@ -16,12 +16,19 @@ */ package org.apache.solr.client.solrj.cloud.autoscaling; +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +import org.apache.solr.common.MapWriter; +import org.apache.solr.common.util.Base64; +import org.apache.solr.common.util.Utils; import org.apache.zookeeper.CreateMode; /** * Immutable representation of binary data with version. */ -public class VersionedData { +public class VersionedData implements MapWriter { private final int version; private final byte[] data; private final String owner; @@ -56,4 +63,32 @@ public class VersionedData { public String getOwner() { return owner; } + + @Override + public void writeMap(EntryWriter ew) throws IOException { + ew.put("version", version); + if (owner != null) { + ew.put("owner", owner); + } + ew.put("mode", mode.toString()); + if (data != null) { + ew.put("data", Base64.byteArrayToBase64(data)); + } + } + + @Override + public String toString() { + return Utils.toJSONString(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VersionedData that = (VersionedData) o; + return version == that.version && + Arrays.equals(data, that.data) && + Objects.equals(owner, that.owner) && + mode == that.mode; + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index b747f7e5423..b0e5c948059 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -704,6 +704,10 @@ public abstract class CollectionAdminRequest } + public static MoveReplica moveReplica(String collection, String replica, String targetNode) { + return new MoveReplica(collection, replica, targetNode); + } + public static class MoveReplica extends AsyncCollectionAdminRequest { protected String collection, replica, targetNode; protected String shard, sourceNode; diff --git a/solr/solrj/src/java/org/apache/solr/common/util/TimeSource.java b/solr/solrj/src/java/org/apache/solr/common/util/TimeSource.java index ac8749d79bd..2e5cd2aeb9b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/TimeSource.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/TimeSource.java @@ -179,11 +179,11 @@ public abstract class TimeSource { public static TimeSource get(String type) { if (type == null) { return NANO_TIME; - } else if (type.equals("currentTime")) { + } else if (type.equals("currentTime") || type.equals(CurrentTimeSource.class.getSimpleName())) { return CURRENT_TIME; - } else if (type.equals("nanoTime")) { + } else if (type.equals("nanoTime") || type.equals(NanoTimeSource.class.getSimpleName())) { return NANO_TIME; - } else if (type.startsWith("simTime")) { + } else if (type.startsWith("simTime") || type.startsWith(SimTimeSource.class.getSimpleName())) { return simTimeSources.computeIfAbsent(type, t -> { String[] parts = t.split(":"); double mul = 1.0;