diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java index 909b3eddc99..3d95bbebb0d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java @@ -44,6 +44,9 @@ import org.apache.solr.client.solrj.cloud.BadVersionException; import org.apache.solr.client.solrj.cloud.VersionedData; import org.apache.solr.cloud.rule.ReplicaAssigner; import org.apache.solr.cloud.rule.Rule; +import org.apache.solr.cluster.placement.PlacementPlugin; +import org.apache.solr.cluster.placement.impl.PlacementPluginAssignStrategy; +import org.apache.solr.cluster.placement.impl.PlacementPluginConfigImpl; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -297,8 +300,7 @@ public class Assign { .assignPullReplicas(pullReplicas) .onNodes(createNodeList) .build(); - AssignStrategyFactory assignStrategyFactory = new AssignStrategyFactory(cloudManager); - AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, coll); + AssignStrategy assignStrategy = createAssignStrategy(cloudManager, clusterState, coll); return assignStrategy.assign(cloudManager, assignRequest); } @@ -387,12 +389,12 @@ public class Assign { } public static class AssignRequest { - public String collectionName; - public List shardNames; - public List nodes; - public int numNrtReplicas; - public int numTlogReplicas; - public int numPullReplicas; + public final String collectionName; + public final List shardNames; + public final List nodes; + public final int numNrtReplicas; + public final int numTlogReplicas; + public final int numPullReplicas; public AssignRequest(String collectionName, List shardNames, List nodes, int numNrtReplicas, int numTlogReplicas, int numPullReplicas) { this.collectionName = collectionName; @@ -543,40 +545,30 @@ public class Assign { } } - public static class AssignStrategyFactory { - public SolrCloudManager solrCloudManager; + /** + * Creates the appropriate instance of {@link AssignStrategy} based on how the cluster and/or individual collections are + * configured. + */ + public static AssignStrategy createAssignStrategy(SolrCloudManager solrCloudManager, ClusterState clusterState, DocCollection collection) { + PlacementPlugin plugin = PlacementPluginConfigImpl.getPlacementPlugin(solrCloudManager); - public AssignStrategyFactory(SolrCloudManager solrCloudManager) { - this.solrCloudManager = solrCloudManager; - } - - public AssignStrategy create(ClusterState clusterState, DocCollection collection) throws IOException, InterruptedException { + if (plugin != null) { + // If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin), it overrides + // per collection configuration (i.e. rules are ignored) + return new PlacementPluginAssignStrategy(collection, plugin); + } else { @SuppressWarnings({"unchecked", "rawtypes"}) - List ruleMaps = (List) collection.get("rule"); - @SuppressWarnings({"rawtypes"}) - List snitches = (List) collection.get(SNITCH); + List ruleMaps = (List) collection.get(DocCollection.RULE); - Strategy strategy = null; if (ruleMaps != null && !ruleMaps.isEmpty()) { - strategy = Strategy.RULES; + List rules = new ArrayList<>(); + for (Object map : ruleMaps) rules.add(new Rule((Map) map)); + @SuppressWarnings({"rawtypes"}) + List snitches = (List) collection.get(SNITCH); + return new RulesBasedAssignStrategy(rules, snitches, clusterState); } else { - strategy = Strategy.LEGACY; + return new LegacyAssignStrategy(); } - - switch (strategy) { - case LEGACY: - return new LegacyAssignStrategy(); - case RULES: - List rules = new ArrayList<>(); - for (Object map : ruleMaps) rules.add(new Rule((Map) map)); - return new RulesBasedAssignStrategy(rules, snitches, clusterState); - default: - throw new Assign.AssignmentException("Unknown strategy type: " + strategy); - } - } - - private enum Strategy { - LEGACY, RULES; } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index cfad397fbd5..854ee9ab934 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -378,8 +378,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd .assignPullReplicas(numPullReplicas) .onNodes(nodeList) .build(); - Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(cloudManager); - Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, docCollection); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(cloudManager, clusterState, docCollection); replicaPositions = assignStrategy.assign(cloudManager, assignRequest); } return replicaPositions; diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java index aa10bb1e4b7..2267b4ddbf0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java @@ -120,8 +120,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { .assignPullReplicas(numPullReplicas) .onNodes(new ArrayList<>(ocmh.cloudManager.getClusterStateProvider().getLiveNodes())) .build(); - Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(ocmh.cloudManager); - Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, clusterState.getCollection(sourceCollection)); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.cloudManager, clusterState, clusterState.getCollection(sourceCollection)); targetNode = assignStrategy.assign(ocmh.cloudManager, assignRequest).get(0).node; } ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java index 0aa4389c9bb..db408b4bf62 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java @@ -229,8 +229,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { .assignPullReplicas(numPullReplicas) .onNodes(nodeList) .build(); - Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(ocmh.cloudManager); - Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, restoreCollection); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.cloudManager, clusterState, restoreCollection); List replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest); CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size()); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java index 495bf65985e..072c5d69a6f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java @@ -431,8 +431,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { .assignPullReplicas(numPull.get()) .onNodes(new ArrayList<>(clusterState.getLiveNodes())) .build(); - Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(ocmh.cloudManager); - Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, collection); + Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.cloudManager, clusterState, collection); List replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest); t.stop(); diff --git a/solr/core/src/java/org/apache/solr/cluster/Cluster.java b/solr/core/src/java/org/apache/solr/cluster/Cluster.java new file mode 100644 index 00000000000..3b7bdd47c8f --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/Cluster.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; + +/** + *

A representation of the SolrCloud cluster state, providing information on which nodes and collections are part of + * the cluster and a way to get to more detailed info. + */ +public interface Cluster { + /** + * @return current set of live nodes. + */ + Set getLiveNodes(); + + /** + * Returns info about the given collection if one exists. + * + * @return {@code null} if no collection of the given name exists in the cluster. + */ + SolrCollection getCollection(String collectionName) throws IOException; + + /** + * @return an iterator over all {@link SolrCollection}s in the cluster. + */ + Iterator iterator(); + + /** + * Allow foreach iteration on all collections of the cluster, such as: {@code for (SolrCollection c : cluster.collections()) {...}}. + */ + Iterable collections(); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/Node.java b/solr/core/src/java/org/apache/solr/cluster/Node.java new file mode 100644 index 00000000000..301078e5700 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/Node.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster; + +/** + * Representation of a SolrCloud node or server in the SolrCloud cluster. + */ +public interface Node { + String getName(); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/Replica.java b/solr/core/src/java/org/apache/solr/cluster/Replica.java new file mode 100644 index 00000000000..2c9230ffc09 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/Replica.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster; + +/** + * An instantiation (or one of the copies) of a given {@link Shard} of a given {@link SolrCollection}. + */ +public interface Replica { + Shard getShard(); + + ReplicaType getType(); + ReplicaState getState(); + + String getReplicaName(); + + /** + * The core name on disk + */ + String getCoreName(); + + /** + * {@link Node} on which this {@link Replica} is located. + */ + Node getNode(); + + /** + * The order of this enum is important from the most to least "important" replica type. + */ + enum ReplicaType { + NRT, TLOG, PULL + } + + enum ReplicaState { + ACTIVE, DOWN, RECOVERING, RECOVERY_FAILED + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/Shard.java b/solr/core/src/java/org/apache/solr/cluster/Shard.java new file mode 100644 index 00000000000..b1ffc14c2ee --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/Shard.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster; + +import java.util.Iterator; + +/** + * Shard in a {@link SolrCollection}, i.e. a subset of the data indexed in that collection. + */ +public interface Shard { + String getShardName(); + + /** + * @return the collection this shard is part of + */ + SolrCollection getCollection(); + + /** + * Returns the {@link Replica} of the given name for that shard, if such a replica exists. + * @return {@code null} if the replica does not (or does not yet) exist for the shard. + */ + Replica getReplica(String name); + + /** + * @return an iterator over {@link Replica}s already existing for this {@link Shard}. + */ + Iterator iterator(); + + /** + * Allow foreach iteration on replicas such as: {@code for (Replica r : shard.replicas()) {...}}. + */ + Iterable replicas(); + + /** + * @return the current leader {@link Replica} for this {@link Shard}. Note that by the time this method returns the leader might + * have changed. Also, if there's no leader for any reason (don't shoot the messenger), this method will return {@code null}. + */ + Replica getLeader(); + + ShardState getState(); + + enum ShardState { + ACTIVE, INACTIVE, CONSTRUCTION, RECOVERY, RECOVERY_FAILED + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java b/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java new file mode 100644 index 00000000000..23e79a407b3 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/SolrCollection.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster; + +import org.apache.solr.cluster.placement.AttributeFetcher; +import org.apache.solr.cluster.placement.AttributeValues; +import org.apache.solr.cluster.placement.PlacementPlugin; +import org.apache.solr.cluster.placement.PlacementRequest; + +import java.util.Iterator; + +/** + * Represents a Collection in SolrCloud (unrelated to {@link java.util.Collection} that uses the nicer name). + */ +public interface SolrCollection { + /** + * The collection name (value passed to {@link Cluster#getCollection(String)}). + */ + String getName(); + + /** + *

Returns the {@link Shard} of the given name for that collection, if such a shard exists.

+ * + *

Note that when a request for adding replicas for a collection is received by a {@link PlacementPlugin}, it is + * possible that replicas need to be added to non existing shards (see {@link PlacementRequest#getShardNames()}. + * Non existing shards will not be returned by this method. Only shards already existing will be returned.

+ * + * @return {@code null} if the shard does not or does not yet exist for the collection. + */ + Shard getShard(String name); + + /** + * @return an iterator over {@link Shard}s of this {@link SolrCollection}. + */ + Iterator iterator(); + + /** + * Allow foreach iteration on shards such as: {@code for (Shard s : solrCollection.shards()) {...}}. + */ + Iterable shards(); + + /** + *

Returns the value of a custom property name set on the {@link SolrCollection} or {@code null} when no such + * property was set. Properties are set through the Collection API. See for example {@code COLLECTIONPROP} in the Solr reference guide. + * + *

{@link PlacementPlugin} related note:

+ *

Using custom properties in conjunction with ad hoc {@link PlacementPlugin} code allows customizing placement + * decisions per collection. + * + *

For example if a collection is to be placed only on nodes using SSD storage and not rotating disks, it can be + * identified as such using some custom property (collection property could for example be called "driveType" and have + * value "ssd" in that case), and the placement plugin (implementing {@link PlacementPlugin}) would then + * {@link AttributeFetcher#requestNodeSystemProperty(String)} for that property from all nodes and only place replicas + * of this collection on {@link Node}'s for which + * {@link AttributeValues#getDiskType(Node)} is non empty and equal to {@link org.apache.solr.cluster.placement.AttributeFetcher.DiskHardwareType#SSD}. + */ + String getCustomProperty(String customPropertyName); + + /* + * There might be missing pieces here (and in other classes in this package) and these would have to be added when + * starting to use these interfaces to code real world placement and balancing code (plugins). + */ +} diff --git a/solr/core/src/java/org/apache/solr/cluster/package-info.java b/solr/core/src/java/org/apache/solr/cluster/package-info.java new file mode 100644 index 00000000000..7571d5a478b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

This package contains the interfaces giving access to cluster state, including nodes, collections and the + * structure of the collections (shards and replicas). These interfaces allow separating external code contribution + * from the internal Solr implementations of these concepts to make usage simpler and to not require changes to + * external contributed code every time the internal abstractions are modified.

+ * + *

The top level abstraction is {@link org.apache.solr.cluster.Cluster}. The cluster is composed of {@link org.apache.solr.cluster.Node}s. + * Indexes are stored in {@link org.apache.solr.cluster.SolrCollection}s, composed of {@link org.apache.solr.cluster.Shard}s + * whose actual copies on {@link org.apache.solr.cluster.Node}s are called {@link org.apache.solr.cluster.Replica}s.

+ */ +package org.apache.solr.cluster; \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java new file mode 100644 index 00000000000..cb368d73e83 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeFetcher.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.Node; + +import java.util.Set; + +/** + *

Instances of this interface are used to fetch various attributes from nodes (and other sources) in the cluster.

+ */ +public interface AttributeFetcher { + /** Request the number of cores on each node. To get the value use {@link AttributeValues#getCoresCount(Node)} */ + AttributeFetcher requestNodeCoreCount(); + + /** Request the disk hardware type on each node. To get the value use {@link AttributeValues#getDiskType(Node)} */ + AttributeFetcher requestNodeDiskType(); + + /** Request the free disk size on each node. To get the value use {@link AttributeValues#getFreeDisk(Node)} */ + AttributeFetcher requestNodeFreeDisk(); + + /** Request the total disk size on each node. To get the value use {@link AttributeValues#getTotalDisk(Node)} */ + AttributeFetcher requestNodeTotalDisk(); + + /** Request the heap usage on each node. To get the value use {@link AttributeValues#getHeapUsage(Node)} */ + AttributeFetcher requestNodeHeapUsage(); + + /** Request the system load average on each node. To get the value use {@link AttributeValues#getSystemLoadAverage(Node)} */ + AttributeFetcher requestNodeSystemLoadAverage(); + + /** Request a given system property on each node. To get the value use {@link AttributeValues#getSystemProperty(Node, String)} */ + AttributeFetcher requestNodeSystemProperty(String name); + + /** Request an environment variable on each node. To get the value use {@link AttributeValues#getEnvironmentVariable(Node, String)} */ + AttributeFetcher requestNodeEnvironmentVariable(String name); + + /** Request a node metric from each node. To get the value use {@link AttributeValues#getMetric(Node, String, NodeMetricRegistry)} */ + AttributeFetcher requestNodeMetric(String metricName, NodeMetricRegistry registry); + + + /** + * The set of nodes from which to fetch all node related attributes. Calling this method is mandatory if any of the {@code requestNode*} + * methods got called. + */ + AttributeFetcher fetchFrom(Set nodes); + + /** Requests a (non node) metric of a given scope and name. To get the value use {@link AttributeValues#getMetric(String, String)} */ + AttributeFetcher requestMetric(String scope, String metricName); + + /** + * Fetches all requested node attributes from all nodes passed to {@link #fetchFrom(Set)} as well as non node attributes + * (those requested for example using {@link #requestMetric(String, String)}. + * @return An instance allowing retrieval of all attributed that could be fetched. + */ + AttributeValues fetchAttributes(); + + /** + * Registry options for {@link Node} metrics. + */ + enum NodeMetricRegistry { + /** corresponds to solr.node */ + SOLR_NODE, + /** corresponds to solr.jvm */ + SOLR_JVM + } + + enum DiskHardwareType { + SSD, ROTATIONAL + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java new file mode 100644 index 00000000000..4519c8ad43d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/AttributeValues.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.Node; + +import java.util.Optional; + +public interface AttributeValues { + /** For the given node: number of cores */ + Optional getCoresCount(Node node); + + /** For the given node: Hardware type of the disk partition where cores are stored */ + Optional getDiskType(Node node); + + /** For the given node: Free disk size in Gigabytes of the partition on which cores are stored */ + Optional getFreeDisk(Node node); + + /** For the given node: Total disk size in Gigabytes of the partition on which cores are stored */ + Optional getTotalDisk(Node node); + + /** For the given node: Percentage between 0 and 100 of used heap over max heap */ + Optional getHeapUsage(Node node); + + /** For the given node: matches {@link java.lang.management.OperatingSystemMXBean#getSystemLoadAverage()} */ + Optional getSystemLoadAverage(Node node); + + /** For the given node: system property value (system properties are passed to Java using {@code -Dname=value} */ + Optional getSystemProperty(Node node, String name); + + /** For the given node: environment variable value */ + Optional getEnvironmentVariable(Node node, String name); + + /** For the given node: metric of specific name and registry */ + Optional getMetric(Node node, String metricName, AttributeFetcher.NodeMetricRegistry registry); + + + /** Get a non node related metric of specific scope and name */ + Optional getMetric(String scope, String metricName); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementException.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementException.java new file mode 100644 index 00000000000..33af5116779 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +/** + * Exception thrown by a {@link PlacementPlugin} when it is unable to compute placement for whatever reason (except an + * {@link InterruptedException} that {@link PlacementPlugin#computePlacement} + * is also allowed to throw). + */ +public class PlacementException extends Exception { + + public PlacementException() { + super(); + } + + public PlacementException(String message) { + super(message); + } + + public PlacementException(String message, Throwable cause) { + super(message, cause); + } + + public PlacementException(Throwable cause) { + super(cause); + } + + protected PlacementException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java new file mode 100644 index 00000000000..c4738a5e99e --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlan.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.Node; + +import java.util.Set; + +/** + * A fully specified plan or instructions for placement, deletion or move to be applied to the cluster.

+ * Fully specified means the actual {@link Node}'s on which to place replicas have been decided. + * + * Instances are created by plugin code using {@link PlacementPlanFactory}. This interface obviously doesn't expose much but + * the underlying Solr side implementation has all that is needed (and will do at least one cast in order to execute the + * plan, likely then using some type of visitor pattern). + */ +public interface PlacementPlan { + /** + * @return the {@link PlacementRequest} at the origin of this {@link PlacementPlan}, as passed to the {@link PlacementPlanFactory} method + * that created this instance. + */ + PlacementRequest getRequest(); + + /** + * @return the set of {@link ReplicaPlacement}'s computed by the plugin to implement the request + */ + Set getReplicaPlacements(); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java new file mode 100644 index 00000000000..c602ab0de6e --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlanFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.SolrCollection; + +import java.util.Set; + +/** + * Allows plugins to create {@link PlacementPlan}s telling the Solr layer where to create replicas following the processing of + * a {@link PlacementRequest}. The Solr layer can (and will) check that the {@link PlacementPlan} conforms to the {@link PlacementRequest} (and + * if it does not, the requested operation will fail). + */ +public interface PlacementPlanFactory { + /** + *

Creates a {@link PlacementPlan} for adding replicas to a given shard(s) of an existing collection. Note this is also + * used for creating new collections since such a creation first creates the collection, then adds the replicas. + * + *

This is in support (directly or indirectly) of {@link org.apache.solr.cloud.api.collections.AddReplicaCmd}, + * {@link org.apache.solr.cloud.api.collections.CreateShardCmd}, {@link org.apache.solr.cloud.api.collections.ReplaceNodeCmd}, + * {@link org.apache.solr.cloud.api.collections.MoveReplicaCmd}, {@link org.apache.solr.cloud.api.collections.SplitShardCmd}, + * {@link org.apache.solr.cloud.api.collections.RestoreCmd}, {@link org.apache.solr.cloud.api.collections.MigrateCmd} + * as well as of {@link org.apache.solr.cloud.api.collections.CreateCollectionCmd}. + */ + PlacementPlan createPlacementPlan(PlacementRequest request, Set replicaPlacements); + + /** + *

Creates a {@link ReplicaPlacement} to be passed to {@link PlacementPlan} factory methods. + * + *

Note the plugin can also build its own instances implementing {@link ReplicaPlacement} instead of using this call + * (but using this method makes it easier). + */ + ReplicaPlacement createReplicaPlacement(SolrCollection solrCollection, String shardName, Node node, Replica.ReplicaType replicaType); +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java new file mode 100644 index 00000000000..28b64768542 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPlugin.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.Cluster; + +/** + *

Implemented by external plugins to control replica placement and movement on the search cluster (as well as other things + * such as cluster elasticity?) when cluster changes are required (initiated elsewhere, most likely following a Collection + * API call). + * + *

Instances of classes implementing this interface are created by {@link PlacementPluginFactory} + * + *

Implementations of this interface must be reentrant. {@link #computePlacement} will be called concurrently + * from many threads. + */ +public interface PlacementPlugin { + /** + *

Request from plugin code to compute placement. Note this method must be reentrant as a plugin instance may (read + * will) get multiple such calls in parallel. + * + *

Configuration is passed upon creation of a new instance of this class by {@link PlacementPluginFactory#createPluginInstance}. + * + * @param cluster initial state of the cluster. Note there are {@link java.util.Set}'s and {@link java.util.Map}'s + * accessible from the {@link Cluster} and other reachable instances. These collection will not change + * while the plugin is executing and will be thrown away once the plugin is done. The plugin code can + * therefore modify them if needed. + * @param placementRequest request for placing new replicas or moving existing replicas on the cluster. + * @param attributeFetcher Factory used by the plugin to fetch additional attributes from the cluster nodes, such as + * count of coresm ssytem properties etc.. + * @param placementPlanFactory Factory used to create instances of {@link PlacementPlan} to return computed decision. + * @return plan satisfying the placement request. + */ + PlacementPlan computePlacement(Cluster cluster, PlacementRequest placementRequest, AttributeFetcher attributeFetcher, + PlacementPlanFactory placementPlanFactory) throws PlacementException, InterruptedException; +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java new file mode 100644 index 00000000000..0beb82767dc --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginConfig.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +/** + *

Configuration passed by Solr to {@link PlacementPluginFactory#createPluginInstance(PlacementPluginConfig)} so that plugin instances + * ({@link PlacementPlugin}) created by the factory can easily retrieve their configuration.

+ * + *

A plugin writer decides the names and the types of the configurable parameters it needs. Available types are + * {@link String}, {@link Long}, {@link Boolean}, {@link Double}. This configuration currently lives in the {@code /clusterprops.json} + * file in Zookeeper (this could change in the future, the plugin code will not change but the way to store its configuration + * in the cluster might). {@code clusterprops.json} also contains the name of the plugin factory class implementing + * {@link org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory}.

+ * + *

In order to configure a plugin to be used for placement decisions, the following {@code curl} command (or something + * equivalent) has to be executed once the cluster is already running to set the configuration. + * Replace {@code localhost:8983} by one of your servers' IP address and port.

+ * + *
+ *
+ * curl -X POST -H 'Content-type:application/json' -d '{
+ *   "set-placement-plugin": {
+ *     "class": "factory.class.name$inner",
+ *     "myfirstString": "a text value",
+ *     "aLong": 50,
+ *     "aDoubleConfig": 3.1415928,
+ *     "shouldIStay": true
+ *   }
+ * }' http://localhost:8983/api/cluster
+ * 
+ * + *

The consequence will be the creation (or replacement if it exists) of an element in the Zookeeper file + * {@code /clusterprops.json} as follows:

+ * + *
+ *
+ * "placement-plugin":{
+ *     "class":"factory.class.name$inner",
+ *     "myfirstString": "a text value",
+ *     "aLong": 50,
+ *     "aDoubleConfig": 3.1415928,
+ *     "shouldIStay": true
+ * 
+ * + *

In order to delete the placement-plugin section from {@code /clusterprops.json} (and to fallback to either Legacy + * or rule based placement if so configured for a collection), execute:

+ * + *
+ *
+ * curl -X POST -H 'Content-type:application/json' -d '{
+ *   "unset-placement-plugin" : null
+ * }' http://localhost:8983/api/cluster
+ * 
+ */ +public interface PlacementPluginConfig { + /** + * @return the configured {@link String} value corresponding to {@code configName} if one exists (could be the empty + * string) and {@code null} otherwise. + */ + String getStringConfig(String configName); + + /** + * @return the configured {@link String} value corresponding to {@code configName} if one exists (could be the empty + * string) and {@code defaultValue} otherwise. + */ + String getStringConfig(String configName, String defaultValue); + + /** + * @return the configured {@link Boolean} value corresponding to {@code configName} if one exists, {@code null} otherwise. + */ + Boolean getBooleanConfig(String configName); + + /** + * @return the configured {@link Boolean} value corresponding to {@code configName} if one exists, a boxed {@code defaultValue} + * otherwise (this method never returns {@code null}. + */ + Boolean getBooleanConfig(String configName, boolean defaultValue); + + /** + * @return the configured {@link Long} value corresponding to {@code configName} if one exists, {@code null} otherwise. + */ + Long getLongConfig(String configName); + + /** + * @return the configured {@link Long} value corresponding to {@code configName} if one exists, a boxed {@code defaultValue} + * otherwise (this method never returns {@code null}. + */ + Long getLongConfig(String configName, long defaultValue); + + /** + * @return the configured {@link Double} value corresponding to {@code configName} if one exists, {@code null} otherwise. + */ + Double getDoubleConfig(String configName); + + /** + * @return the configured {@link Double} value corresponding to {@code configName} if one exists, a boxed {@code defaultValue} + * otherwise (this method never returns {@code null}. + */ + Double getDoubleConfig(String configName, double defaultValue); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginFactory.java new file mode 100644 index 00000000000..7372003e790 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementPluginFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +/** + * Factory implemented by client code and configured in {@code solr.xml} allowing the creation of instances of + * {@link PlacementPlugin} to be used for replica placement computation. + */ +public interface PlacementPluginFactory { + /** + * Returns an instance of the plugin that will be repeatedly (and concurrently) be called to compute placement. Multiple + * instances of a plugin can be used in parallel (for example if configuration has to change, but plugin instances with + * the previous configuration are still being used). + */ + PlacementPlugin createPluginInstance(PlacementPluginConfig config); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java new file mode 100644 index 00000000000..61b49dd523d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/PlacementRequest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.*; + +import java.util.Set; + +/** + * A cluster related placement request that Solr asks a {@link PlacementPlugin} plugin to resolve and compute a + * {@link PlacementPlan} placing one or more {@link Replica}'s of one or more {@link Shard}'s of an existing {@link SolrCollection}. + * The shard might or might not already exist, plugin code can easily find out by calling {@link SolrCollection#getShard(String)} + * with the shard name(s) returned by {@link #getShardNames()}. + * + *

The set of {@link Node}s on which the replicas should be placed + * is specified (defaults to being equal to the set returned by {@link Cluster#getLiveNodes()}). + */ +public interface PlacementRequest { + /** + * The {@link SolrCollection} to add {@link Replica}(s) to. + */ + SolrCollection getCollection(); + + /** + *

Shard name(s) for which new replicas placement should be computed. The shard(s) might exist or not (that's why this + * method returns a {@link Set} of {@link String}'s and not directly a set of {@link Shard} instances). + * + *

Note the Collection API allows specifying the shard name or a {@code _route_} parameter. The Solr implementation will + * convert either specification into the relevant shard name so the plugin code doesn't have to worry about this. + */ + Set getShardNames(); + + /** + *

Replicas should only be placed on nodes in the set returned by this method. + * + *

When Collection API calls do not specify a specific set of target nodes, replicas can be placed on any live node of + * the cluster. In such cases, this set will be equal to the set of all live nodes. The plugin placement code does not + * need to worry (or care) if a set of nodes was explicitly specified or not. + * + * @return never {@code null} and never empty set (if that set was to be empty for any reason, no placement would be + * possible and the Solr infrastructure driving the plugin code would detect the error itself rather than calling the plugin). + */ + Set getTargetNodes(); + + /** + * Returns the number of replica to create for the given replica type. + */ + int getCountReplicasToCreate(Replica.ReplicaType replicaType); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/ReplicaPlacement.java b/solr/core/src/java/org/apache/solr/cluster/placement/ReplicaPlacement.java new file mode 100644 index 00000000000..16bdc2c2b34 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/ReplicaPlacement.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement; + +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.Shard; +import org.apache.solr.cluster.SolrCollection; + +/** + *

Placement decision for a single {@link Replica}. Note this placement decision is used as part of a {@link PlacementPlan}, + * it does not directly lead to the plugin code getting a corresponding {@link Replica} instance, nor does it require the + * plugin to provide a {@link Shard} instance (the plugin code gets such instances for existing replicas and shards in the + * cluster but does not create them directly for adding new replicas for new or existing shards). + * + *

Captures the {@link SolrCollection}, {@link Shard} (via the shard name), {@link Node} and {@link org.apache.solr.cluster.Replica.ReplicaType} + * of a Replica to be created. + */ +public interface ReplicaPlacement { + + /** + * @return the {@link SolrCollection} for which the replica should be created + */ + SolrCollection getCollection(); + + /** + * @return the name of the {@link Shard} for which the replica should be created. Note that only the name of the shard + * is returned and not a {@link Shard} instance because the shard might not yet exist when the placement request is made. + */ + String getShardName(); + + /** + * @return the {@link Node} on which the replica should be created + */ + Node getNode(); + + /** + * @return the type of the replica to be created + */ + Replica.ReplicaType getReplicaType(); +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java new file mode 100644 index 00000000000..98367d3e0f2 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeFetcherImpl.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import com.google.common.collect.Maps; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider; +import org.apache.solr.cluster.placement.AttributeFetcher; +import org.apache.solr.cluster.placement.AttributeValues; +import org.apache.solr.cluster.Node; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.rule.ImplicitSnitch; +import org.apache.solr.core.SolrInfoBean; +import org.apache.solr.metrics.SolrMetricManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.*; +import java.util.function.BiConsumer; + +public class AttributeFetcherImpl implements AttributeFetcher { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + boolean requestedNodeCoreCount; + boolean requestedNodeDiskType; + boolean requestedNodeFreeDisk; + boolean requestedNodeTotalDisk; + boolean requestedNodeHeapUsage; + boolean requestedNodeSystemLoadAverage; + Set requestedNodeSystemPropertiesSnitchTags = new HashSet<>(); + Set requestedNodeMetricSnitchTags = new HashSet<>(); + + Set nodes = Collections.emptySet(); + + private final SolrCloudManager cloudManager; + + AttributeFetcherImpl(SolrCloudManager cloudManager) { + this.cloudManager = cloudManager; + } + + @Override + public AttributeFetcher requestNodeCoreCount() { + requestedNodeCoreCount = true; + return this; + } + + @Override + public AttributeFetcher requestNodeDiskType() { + requestedNodeDiskType = true; + return this; + } + + @Override + public AttributeFetcher requestNodeFreeDisk() { + requestedNodeFreeDisk = true; + return this; + } + + @Override + public AttributeFetcher requestNodeTotalDisk() { + requestedNodeTotalDisk = true; + return this; + } + + @Override + public AttributeFetcher requestNodeHeapUsage() { + requestedNodeHeapUsage = true; + return this; + } + + @Override + public AttributeFetcher requestNodeSystemLoadAverage() { + requestedNodeSystemLoadAverage = true; + return this; + } + + @Override + public AttributeFetcher requestNodeSystemProperty(String name) { + requestedNodeSystemPropertiesSnitchTags.add(getSystemPropertySnitchTag(name)); + return this; + } + + @Override + public AttributeFetcher requestNodeEnvironmentVariable(String name) { + throw new UnsupportedOperationException("Not yet implemented..."); + } + + @Override + public AttributeFetcher requestNodeMetric(String metricName, NodeMetricRegistry registry) { + requestedNodeMetricSnitchTags.add(getMetricSnitchTag(metricName, registry)); + return this; + } + + @Override + public AttributeFetcher fetchFrom(Set nodes) { + this.nodes = nodes; + return this; + } + + @Override + public AttributeFetcher requestMetric(String scope, String metricName) { + throw new UnsupportedOperationException("Not yet implemented..."); + } + + @Override + public AttributeValues fetchAttributes() { + // TODO Code here only supports node related attributes for now + + // Maps in which attribute values will be added + Map nodeToCoreCount = Maps.newHashMap(); + Map nodeToDiskType = Maps.newHashMap(); + Map nodeToFreeDisk = Maps.newHashMap(); + Map nodeToTotalDisk = Maps.newHashMap(); + Map nodeToHeapUsage = Maps.newHashMap(); + Map nodeToSystemLoadAverage = Maps.newHashMap(); + Map> syspropSnitchToNodeToValue = Maps.newHashMap(); + Map> metricSnitchToNodeToValue = Maps.newHashMap(); + + // In order to match the returned values for the various snitches, we need to keep track of where each + // received value goes. Given the target maps are of different types (the maps from Node to whatever defined + // above) we instead pass a function taking two arguments, the node and the (non null) returned value, + // that will cast the value into the appropriate type for the snitch tag and insert it into the appropriate map + // with the node as the key. + Map> allSnitchTagsToInsertion = Maps.newHashMap(); + if (requestedNodeCoreCount) { + allSnitchTagsToInsertion.put(ImplicitSnitch.CORES, (node, value) -> nodeToCoreCount.put(node, ((Number) value).intValue())); + } + if (requestedNodeDiskType) { + allSnitchTagsToInsertion.put(ImplicitSnitch.DISKTYPE, (node, value) -> { + if ("rotational".equals(value)) { + nodeToDiskType.put(node, DiskHardwareType.ROTATIONAL); + } else if ("ssd".equals(value)) { + nodeToDiskType.put(node, DiskHardwareType.SSD); + } + // unknown disk type: insert no value, returned optional will be empty + }); + } + if (requestedNodeFreeDisk) { + allSnitchTagsToInsertion.put(SolrClientNodeStateProvider.Variable.FREEDISK.tagName, + // Convert from bytes to GB + (node, value) -> nodeToFreeDisk.put(node, ((Number) value).longValue() / 1024 / 1024 / 1024)); + } + if (requestedNodeTotalDisk) { + allSnitchTagsToInsertion.put(SolrClientNodeStateProvider.Variable.TOTALDISK.tagName, + // Convert from bytes to GB + (node, value) -> nodeToTotalDisk.put(node, ((Number) value).longValue() / 1024 / 1024 / 1024)); + } + if (requestedNodeHeapUsage) { + allSnitchTagsToInsertion.put(ImplicitSnitch.HEAPUSAGE, + (node, value) -> nodeToHeapUsage.put(node, ((Number) value).doubleValue())); + } + if (requestedNodeSystemLoadAverage) { + allSnitchTagsToInsertion.put(ImplicitSnitch.SYSLOADAVG, + (node, value) -> nodeToSystemLoadAverage.put(node, ((Number) value).doubleValue())); + } + for (String sysPropSnitch : requestedNodeSystemPropertiesSnitchTags) { + final Map sysPropMap = Maps.newHashMap(); + syspropSnitchToNodeToValue.put(sysPropSnitch, sysPropMap); + allSnitchTagsToInsertion.put(sysPropSnitch, (node, value) -> sysPropMap.put(node, (String) value)); + } + for (String metricSnitch : requestedNodeMetricSnitchTags) { + final Map metricMap = Maps.newHashMap(); + metricSnitchToNodeToValue.put(metricSnitch, metricMap); + allSnitchTagsToInsertion.put(metricSnitch, (node, value) -> metricMap.put(node, (Double) value)); + } + + // Now that we know everything we need to fetch (and where to put it), just do it. + for (Node node : nodes) { + Map tagValues = cloudManager.getNodeStateProvider().getNodeValues(node.getName(), allSnitchTagsToInsertion.keySet()); + for (Map.Entry e : tagValues.entrySet()) { + String tag = e.getKey(); + Object value = e.getValue(); // returned value from the node + + BiConsumer inserter = allSnitchTagsToInsertion.get(tag); + // If inserter is null it's a return of a tag that we didn't request + if (inserter != null) { + inserter.accept(node, value); + } else { + log.error("Received unsolicited snitch tag {} from node {}", tag, node); + } + } + } + + return new AttributeValuesImpl(nodeToCoreCount, + nodeToDiskType, + nodeToFreeDisk, + nodeToTotalDisk, + nodeToHeapUsage, + nodeToSystemLoadAverage, + syspropSnitchToNodeToValue, + metricSnitchToNodeToValue); + } + + private static SolrInfoBean.Group getGroupFromMetricRegistry(NodeMetricRegistry registry) { + switch (registry) { + case SOLR_JVM: + return SolrInfoBean.Group.jvm; + case SOLR_NODE: + return SolrInfoBean.Group.node; + default: + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported registry value " + registry); + } + } + + static String getMetricSnitchTag(String metricName, NodeMetricRegistry registry) { + return SolrClientNodeStateProvider.METRICS_PREFIX + SolrMetricManager.getRegistryName(getGroupFromMetricRegistry(registry), metricName); + } + + static String getSystemPropertySnitchTag(String name) { + return ImplicitSnitch.SYSPROP + name; + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java new file mode 100644 index 00000000000..78c214332f6 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/AttributeValuesImpl.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import org.apache.solr.cluster.placement.AttributeFetcher; +import org.apache.solr.cluster.placement.AttributeValues; +import org.apache.solr.cluster.Node; + +import java.util.Map; +import java.util.Optional; + +public class AttributeValuesImpl implements AttributeValues { + final Map nodeToCoreCount; + final Map nodeToDiskType; + final Map nodeToFreeDisk; + final Map nodeToTotalDisk; + final Map nodeToHeapUsage; + final Map nodeToSystemLoadAverage; + final Map> syspropSnitchToNodeToValue; + final Map> metricSnitchToNodeToValue; + + AttributeValuesImpl(Map nodeToCoreCount, + Map nodeToDiskType, + Map nodeToFreeDisk, + Map nodeToTotalDisk, + Map nodeToHeapUsage, + Map nodeToSystemLoadAverage, + Map> syspropSnitchToNodeToValue, + Map> metricSnitchToNodeToValue) { + this.nodeToCoreCount = nodeToCoreCount; + this.nodeToDiskType = nodeToDiskType; + this.nodeToFreeDisk = nodeToFreeDisk; + this.nodeToTotalDisk = nodeToTotalDisk; + this.nodeToHeapUsage = nodeToHeapUsage; + this.nodeToSystemLoadAverage = nodeToSystemLoadAverage; + this.syspropSnitchToNodeToValue = syspropSnitchToNodeToValue; + this.metricSnitchToNodeToValue = metricSnitchToNodeToValue; + } + + @Override + public Optional getCoresCount(Node node) { + return Optional.ofNullable(nodeToCoreCount.get(node)); + } + + @Override + public Optional getDiskType(Node node) { + return Optional.ofNullable(nodeToDiskType.get(node)); + } + + @Override + public Optional getFreeDisk(Node node) { + return Optional.ofNullable(nodeToFreeDisk.get(node)); + } + + @Override + public Optional getTotalDisk(Node node) { + return Optional.ofNullable(nodeToTotalDisk.get(node)); + } + + @Override + public Optional getHeapUsage(Node node) { + return Optional.ofNullable(nodeToHeapUsage.get(node)); + } + + @Override + public Optional getSystemLoadAverage(Node node) { + return Optional.ofNullable(nodeToSystemLoadAverage.get(node)); + } + + @Override + public Optional getSystemProperty(Node node, String name) { + Map nodeToValue = syspropSnitchToNodeToValue.get(AttributeFetcherImpl.getSystemPropertySnitchTag(name)); + if (nodeToValue == null) { + return Optional.empty(); + } + return Optional.ofNullable(nodeToValue.get(node)); + } + + @Override + public Optional getEnvironmentVariable(Node node, String name) { + // TODO implement + return Optional.empty(); + } + + @Override + public Optional getMetric(Node node, String metricName, AttributeFetcher.NodeMetricRegistry registry) { + Map nodeToValue = metricSnitchToNodeToValue.get(AttributeFetcherImpl.getMetricSnitchTag(metricName, registry)); + if (nodeToValue == null) { + return Optional.empty(); + } + return Optional.ofNullable(nodeToValue.get(node)); + } + + @Override + public Optional getMetric(String scope, String metricName) { + // TODO implement + return Optional.empty(); + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java new file mode 100644 index 00000000000..38293722763 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanFactoryImpl.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.SolrCollection; +import org.apache.solr.cluster.placement.*; + +import java.util.Set; + +class PlacementPlanFactoryImpl implements PlacementPlanFactory { + @Override + public PlacementPlan createPlacementPlan(PlacementRequest request, Set replicaPlacements) { + return new PlacementPlanImpl(request, replicaPlacements); + } + + @Override + public ReplicaPlacement createReplicaPlacement(SolrCollection solrCollection, String shardName, Node node, Replica.ReplicaType replicaType) { + return new ReplicaPlacementImpl(solrCollection, shardName, node, replicaType); + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java new file mode 100644 index 00000000000..2dde07bae5b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPlanImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import java.util.Set; + +import org.apache.solr.cluster.placement.PlacementPlan; +import org.apache.solr.cluster.placement.PlacementRequest; +import org.apache.solr.cluster.placement.ReplicaPlacement; + +class PlacementPlanImpl implements PlacementPlan { + + final PlacementRequest request; + final Set replicaPlacements; + + PlacementPlanImpl(PlacementRequest request, Set replicaPlacements) { + this.request = request; + this.replicaPlacements = replicaPlacements; + } + + @Override + public PlacementRequest getRequest() { + return request; + } + + @Override + public Set getReplicaPlacements() { + return replicaPlacements; + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java new file mode 100644 index 00000000000..0bbf4e040fb --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginAssignStrategy.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import java.io.IOException; +import java.util.List; + +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.cloud.api.collections.Assign; +import org.apache.solr.cluster.Cluster; +import org.apache.solr.cluster.placement.PlacementException; +import org.apache.solr.cluster.placement.PlacementPlugin; +import org.apache.solr.cluster.placement.PlacementPlan; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.ReplicaPosition; + +/** + * This assign strategy delegates placement computation to "plugin" code. + */ +public class PlacementPluginAssignStrategy implements Assign.AssignStrategy { + + private static final PlacementPlanFactoryImpl PLACEMENT_PLAN_FACTORY = new PlacementPlanFactoryImpl(); + + private final PlacementPlugin plugin; + private final DocCollection collection; + + /** + * @param collection the collection for which this assign request is done. In theory would be better to pass it into the + * {@link #assign} call below (which would allow reusing instances of {@link PlacementPluginAssignStrategy}, + * but for now doing it here in order not to change the other Assign.AssignStrategy implementations. + */ + public PlacementPluginAssignStrategy(DocCollection collection, PlacementPlugin plugin) { + this.collection = collection; + this.plugin = plugin; + } + + public List assign(SolrCloudManager solrCloudManager, Assign.AssignRequest assignRequest) + throws Assign.AssignmentException, IOException, InterruptedException { + + Cluster cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(solrCloudManager); + + PlacementRequestImpl placementRequest = PlacementRequestImpl.toPlacementRequest(cluster, collection, assignRequest); + + final PlacementPlan placementPlan; + try { + placementPlan = plugin.computePlacement(cluster, placementRequest, new AttributeFetcherImpl(solrCloudManager), PLACEMENT_PLAN_FACTORY); + } catch (PlacementException pe) { + throw new Assign.AssignmentException(pe); + } + + return ReplicaPlacementImpl.toReplicaPositions(placementPlan.getReplicaPlacements()); + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java new file mode 100644 index 00000000000..e6130a30342 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementPluginConfigImpl.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.cluster.placement.PlacementPlugin; +import org.apache.solr.cluster.placement.PlacementPluginConfig; +import org.apache.solr.cluster.placement.PlacementPluginFactory; +import org.apache.solr.cluster.placement.plugins.SamplePluginAffinityReplicaPlacement; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.Utils; + +/** + *

This concrete class is implementing the config as visible by the placement plugins and contains the code transforming the + * plugin configuration (currently stored in {@code clusterprops.json} into a strongly typed abstraction (that will not + * change if internally plugin configuration is moved to some other place).

+ * + *

This class also contains the (static) code dealing with instantiating the plugin factory config (it is config, even though + * of a slightly different type). This code is not accessed by the plugin code but used from the + * {@link org.apache.solr.cloud.api.collections.Assign} class.

+ */ +public class PlacementPluginConfigImpl implements PlacementPluginConfig { + /** + * The key in {@code clusterprops.json} under which the plugin factory and the plugin configuration are defined. + */ + final public static String PLACEMENT_PLUGIN_CONFIG_KEY = "placement-plugin"; + /** Name of the property containing the factory class */ + final public static String CONFIG_CLASS = "class"; + + // Separating configs into typed maps based on the element names in solr.xml + private final Map stringConfigs; + private final Map longConfigs; + private final Map boolConfigs; + private final Map doubleConfigs; + + + private PlacementPluginConfigImpl(Map stringConfigs, + Map longConfigs, + Map boolConfigs, + Map doubleConfigs) { + this.stringConfigs = stringConfigs; + this.longConfigs = longConfigs; + this.boolConfigs = boolConfigs; + this.doubleConfigs = doubleConfigs; + } + + @Override + public String getStringConfig(String configName) { + return stringConfigs.get(configName); + } + + @Override + public String getStringConfig(String configName, String defaultValue) { + String retval = stringConfigs.get(configName); + return retval != null ? retval : defaultValue; + } + + @Override + public Boolean getBooleanConfig(String configName) { + return boolConfigs.get(configName); + } + + @Override + public Boolean getBooleanConfig(String configName, boolean defaultValue) { + Boolean retval = boolConfigs.get(configName); + return retval != null ? retval : defaultValue; + } + + @Override + public Long getLongConfig(String configName) { + return longConfigs.get(configName); + } + + @Override + public Long getLongConfig(String configName, long defaultValue) { + Long retval = longConfigs.get(configName); + return retval != null ? retval : defaultValue; + } + + @Override + public Double getDoubleConfig(String configName) { + return doubleConfigs.get(configName); + } + + @Override + public Double getDoubleConfig(String configName, double defaultValue) { + Double retval = doubleConfigs.get(configName); + return retval != null ? retval : defaultValue; + } + + /** + *

Parses the {@link Map} obtained as the value for key {@link #PLACEMENT_PLUGIN_CONFIG_KEY} from + * the {@code clusterprops.json} configuration {@link Map} (obtained by calling + * {@link org.apache.solr.client.solrj.impl.ClusterStateProvider#getClusterProperties()}) and translates it into a + * configuration consumable by the plugin (and that will not change as Solr changes internally how and where it stores + * configuration).

+ * + *

Configuration properties {@code class} and {@code name} are reserved: for defining the plugin factory class and + * a human readable plugin name. All other properties are plugin specific.

+ * + *

See configuration example and how-to in {@link SamplePluginAffinityReplicaPlacement}.

+ */ + static PlacementPluginConfig createConfigFromProperties(Map pluginConfig) { + final Map stringConfigs = new HashMap<>(); + final Map longConfigs = new HashMap<>(); + final Map boolConfigs = new HashMap<>(); + final Map doubleConfigs = new HashMap<>(); + + for (Map.Entry e : pluginConfig.entrySet()) { + String key = e.getKey(); + if (CONFIG_CLASS.equals(key)) { + continue; + } + + if (key == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config name attribute in parameter of " + PLACEMENT_PLUGIN_CONFIG_KEY); + } + + Object value = e.getValue(); + + if (value == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config value for parameter " + key + " of " + PLACEMENT_PLUGIN_CONFIG_KEY); + } + + if (value instanceof String) { + stringConfigs.put(key, (String) value); + } else if (value instanceof Long) { + longConfigs.put(key, (Long) value); + } else if (value instanceof Boolean) { + boolConfigs.put(key, (Boolean) value); + } else if (value instanceof Double) { + doubleConfigs.put(key, (Double) value); + } else { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported config type " + value.getClass().getName() + + " for parameter " + key + " of " + PLACEMENT_PLUGIN_CONFIG_KEY); + } + } + + return new PlacementPluginConfigImpl(stringConfigs, longConfigs, boolConfigs, doubleConfigs); + } + + /** + *

This is where the plugin configuration is being read (from wherever in Solr it lives, and this will likely change with time), + * a {@link org.apache.solr.cluster.placement.PlacementPluginFactory} (as configured) instantiated and a plugin instance + * created from this factory.

+ * + *

The initial implementation you see here is crude! the configuration is read anew each time and the factory class + * as well as the plugin class instantiated each time. + * This has to be changed once the code is accepted overall, to register a listener that is notified when the configuration + * changes (see {@link org.apache.solr.common.cloud.ZkStateReader#registerClusterPropertiesListener}) + * and that will either create a new instance of the plugin with new configuration using the existing factory (if the factory + * class has not changed - we need to keep track of this one) of create a new factory altogether (then a new plugin instance).

+ */ + @SuppressWarnings({"unchecked"}) + public static PlacementPlugin getPlacementPlugin(SolrCloudManager solrCloudManager) { + Map props = solrCloudManager.getClusterStateProvider().getClusterProperties(); + Map pluginConfigMap = (Map) props.get(PLACEMENT_PLUGIN_CONFIG_KEY); + + if (pluginConfigMap == null) { + return null; + } + + String pluginFactoryClassName = (String) pluginConfigMap.get(CONFIG_CLASS); + + // Get the configured plugin factory class. Is there a way to load a resource in Solr without being in the context of + // CoreContainer? Here the placement code is unrelated to the presence of cores (and one can imagine it running on + // specialized nodes not having a CoreContainer). I guess the loading code below is not totally satisfying (although + // it's not the only place in Solr doing it that way), but I didn't find more satisfying alternatives. Open to suggestions. + PlacementPluginFactory placementPluginFactory; + try { + Class factoryClazz = + Class.forName(pluginFactoryClassName, true, PlacementPluginConfigImpl.class.getClassLoader()) + .asSubclass(PlacementPluginFactory.class); + + placementPluginFactory = factoryClazz.getConstructor().newInstance(); // no args constructor - that's why we introduced a factory... + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to instantiate placement-plugin factory: " + + Utils.toJSONString(pluginConfigMap) + " please review /clusterprops.json config for " + PLACEMENT_PLUGIN_CONFIG_KEY, e); + } + + // Translate the config from the properties where they are defined into the abstraction seen by the plugin + PlacementPluginConfig pluginConfig = createConfigFromProperties(pluginConfigMap); + + return placementPluginFactory.createPluginInstance(pluginConfig); + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java new file mode 100644 index 00000000000..80cf6c5b680 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Set; + +import org.apache.solr.cloud.api.collections.Assign; +import org.apache.solr.cluster.Cluster; +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.SolrCollection; +import org.apache.solr.cluster.placement.*; +import org.apache.solr.common.cloud.DocCollection; + +public class PlacementRequestImpl implements PlacementRequest { + private final SolrCollection solrCollection; + private final Set shardNames; + private final Set targetNodes; + private final EnumMap countReplicas = new EnumMap<>(Replica.ReplicaType.class); + + private PlacementRequestImpl(SolrCollection solrCollection, + Set shardNames, Set targetNodes, + int countNrtReplicas, int countTlogReplicas, int countPullReplicas) { + this.solrCollection = solrCollection; + this.shardNames = shardNames; + this.targetNodes = targetNodes; + // Initializing map for all values of enum, so unboxing always possible later without checking for null + countReplicas.put(Replica.ReplicaType.NRT, countNrtReplicas); + countReplicas.put(Replica.ReplicaType.TLOG, countTlogReplicas); + countReplicas.put(Replica.ReplicaType.PULL, countPullReplicas); + } + + @Override + public SolrCollection getCollection() { + return solrCollection; + } + + @Override + public Set getShardNames() { + return shardNames; + } + + @Override + public Set getTargetNodes() { + return targetNodes; + } + + @Override + public int getCountReplicasToCreate(Replica.ReplicaType replicaType) { + return countReplicas.get(replicaType); + + } + + /** + * Returns a {@link PlacementRequest} that can be consumed by a plugin based on an internal Assign.AssignRequest + * for adding replicas + additional info (upon creation of a new collection or adding replicas to an existing one). + */ + static PlacementRequestImpl toPlacementRequest(Cluster cluster, DocCollection docCollection, + Assign.AssignRequest assignRequest) throws Assign.AssignmentException { + SolrCollection solrCollection = new SimpleClusterAbstractionsImpl.SolrCollectionImpl(docCollection); + Set shardNames = new HashSet<>(assignRequest.shardNames); + if (shardNames.size() < 1) { + throw new Assign.AssignmentException("Bad assign request: no shards specified for collection " + docCollection.getName()); + } + + final Set nodes; + // If no nodes specified, use all live nodes. If nodes are specified, use specified list. + if (assignRequest.nodes != null) { + nodes = SimpleClusterAbstractionsImpl.NodeImpl.getNodes(assignRequest.nodes); + if (nodes.isEmpty()) { + throw new Assign.AssignmentException("Bad assign request: empty list of nodes for collection " + docCollection.getName()); + } + } else { + nodes = cluster.getLiveNodes(); + if (nodes.isEmpty()) { + throw new Assign.AssignmentException("Impossible assign request: no live nodes for collection " + docCollection.getName()); + } + } + + return new PlacementRequestImpl(solrCollection, shardNames, nodes, + assignRequest.numNrtReplicas, assignRequest.numTlogReplicas, assignRequest.numPullReplicas); + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java new file mode 100644 index 00000000000..0bf75642540 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/ReplicaPlacementImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.SolrCollection; +import org.apache.solr.cluster.placement.ReplicaPlacement; +import org.apache.solr.common.cloud.ReplicaPosition; + +class ReplicaPlacementImpl implements ReplicaPlacement { + private final SolrCollection solrCollection; + private final String shardName; + private final Node node; + private final Replica.ReplicaType replicaType; + + ReplicaPlacementImpl(SolrCollection solrCollection, String shardName, Node node, Replica.ReplicaType replicaType) { + this.solrCollection = solrCollection; + this.shardName = shardName; + this.node = node; + this.replicaType = replicaType; + } + + @Override + public SolrCollection getCollection() { + return solrCollection; + } + + @Override + public String getShardName() { + return shardName; + } + + @Override + public Node getNode() { + return node; + } + + @Override + public Replica.ReplicaType getReplicaType() { + return replicaType; + } + + /** + * Translates a set of {@link ReplicaPlacement} returned by a plugin into a list of {@link ReplicaPosition} expected + * by {@link org.apache.solr.cloud.api.collections.Assign.AssignStrategy} + */ + static List toReplicaPositions(Set replicaPlacementSet) { + // The replica index in ReplicaPosition is not as strict a concept as it might seem. It is used in rules + // based placement (for sorting replicas) but its presence in ReplicaPosition is not justified (and when the code + // is executing here, it means rules based placement is not used). + // Looking at ReplicaAssigner.tryAllPermutations, it is well possible to create replicas with same index + // living on a given node for the same shard. This likely never happens because of the way replicas are + // placed on nodes (never two on the same node for same shard). Adopting the same shortcut/bad design here, + // but index should be removed at some point from ReplicaPosition. + List replicaPositions = new ArrayList<>(replicaPlacementSet.size()); + int index = 0; // This really an arbitrary value when adding replicas and a possible source of core name collisions + for (ReplicaPlacement placement : replicaPlacementSet) { + replicaPositions.add(new ReplicaPosition(placement.getShardName(), index++, SimpleClusterAbstractionsImpl.ReplicaImpl.toCloudReplicaType(placement.getReplicaType()), placement.getNode().getName())); + } + + return replicaPositions; + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java new file mode 100644 index 00000000000..6ea2d24396d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.impl; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import com.google.common.collect.Maps; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.cluster.*; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.util.Pair; + +import javax.annotation.Nonnull; + +/** + *

The implementation of the cluster abstractions from {@link org.apache.solr.cluster} as static inner classes of this + * one are a very straightforward approach + * for an initial implementation of the placement plugins, but are likely not the right implementations for the long term.

+ * + *

Indeed there's a delay between the moment the Collection API computes a placement for a given command and when + * this placement decision is actually executed and Zookeeper for example updated with the new state (and that state visible + * to the node or nodes). Under high load when a large number of placement requests are computed, the naive implementation + * presented here could in some cases provide the same cluster state view to all placement requests over a period of time + * that can extend to over a minute and have the resulting placement decisions all place replicas on the same nodes, + * eventually leading to severe imbalance of the cluster.

+ * + *

By modifying the cluster abstractions implementations (without changing the API seen by placement plugins) to provide + * a view of the cluster that anticipates the way the cluster will be after in flight placement decisions are taken + * into account, the underlying Solr side framework supporting placement plugins can compensate to a point the delay + * between placement decision and that decision being observable.

+ */ +class SimpleClusterAbstractionsImpl { + + static class ClusterImpl implements Cluster { + private final Set liveNodes; + private final ClusterState clusterState; + + ClusterImpl(SolrCloudManager solrCloudManager) throws IOException { + liveNodes = NodeImpl.getNodes(solrCloudManager.getClusterStateProvider().getLiveNodes()); + clusterState = solrCloudManager.getClusterStateProvider().getClusterState(); + } + + @Override + public Set getLiveNodes() { + return liveNodes; + } + + @Override + public SolrCollection getCollection(String collectionName) { + return SolrCollectionImpl.createCollectionFacade(clusterState, collectionName); + } + + @Override + @Nonnull + public Iterator iterator() { + return clusterState.getCollectionsMap().values().stream().map(SolrCollectionImpl::fromDocCollection).collect(Collectors.toSet()).iterator(); + } + + @Override + public Iterable collections() { + return ClusterImpl.this::iterator; + } + } + + + static class NodeImpl implements Node { + public final String nodeName; + + /** + * Transforms a collection of node names into a set of {@link Node} instances. + */ + static Set getNodes(Collection nodeNames) { + return nodeNames.stream().map(NodeImpl::new).collect(Collectors.toSet()); + } + + NodeImpl(String nodeName) { + this.nodeName = nodeName; + } + + @Override + public String getName() { + return nodeName; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + getName() + ")"; + } + + /** + * This class ends up as a key in Maps in {@link org.apache.solr.cluster.placement.AttributeValues}. + * It is important to implement this method comparing node names given that new instances of {@link Node} are created + * with names equal to existing instances (See {@link ReplicaImpl} constructor). + */ + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + NodeImpl other = (NodeImpl) obj; + return Objects.equals(this.nodeName, other.nodeName); + } + + public int hashCode() { + return Objects.hashCode(nodeName); + } + } + + + static class SolrCollectionImpl implements SolrCollection { + private final String collectionName; + /** Map from {@link Shard#getShardName()} to {@link Shard} */ + private final Map shards; + private final DocCollection docCollection; + + static SolrCollection createCollectionFacade(ClusterState clusterState, String collectionName) { + return fromDocCollection(clusterState.getCollectionOrNull(collectionName)); + } + + static SolrCollection fromDocCollection(DocCollection docCollection) { + return docCollection == null ? null : new SolrCollectionImpl(docCollection); + } + + SolrCollectionImpl(DocCollection docCollection) { + this.collectionName = docCollection.getName(); + this.shards = ShardImpl.getShards(this, docCollection.getSlices()); + this.docCollection = docCollection; + } + + @Override + public String getName() { + return collectionName; + } + + @Override + public Shard getShard(String name) { + return shards.get(name); + } + + @Override + @Nonnull + public Iterator iterator() { + return shards.values().iterator(); + } + + @Override + public Iterable shards() { + return SolrCollectionImpl.this::iterator; + } + + @Override + public String getCustomProperty(String customPropertyName) { + return docCollection.getStr(customPropertyName); + } + } + + + static class ShardImpl implements Shard { + private final String shardName; + private final SolrCollection collection; + private final ShardState shardState; + private final Map replicas; + private final Replica leader; + + /** + * Transforms {@link Slice}'s of a {@link org.apache.solr.common.cloud.DocCollection} into a map of {@link Shard}'s, + * keyed by shard name ({@link Shard#getShardName()}). + */ + static Map getShards(SolrCollection solrCollection, Collection slices) { + Map shards = Maps.newHashMap(); + + for (Slice slice : slices) { + String shardName = slice.getName(); + shards.put(shardName, new ShardImpl(shardName, solrCollection, slice)); + } + + return shards; + } + + private ShardImpl(String shardName, SolrCollection collection, Slice slice) { + this.shardName = shardName; + this.collection = collection; + this.shardState = translateState(slice.getState()); + + Pair, Replica> pair = ReplicaImpl.getReplicas(slice.getReplicas(), this); + replicas = pair.first(); + leader = pair.second(); + } + + private ShardState translateState(Slice.State state) { + switch (state) { + case ACTIVE: return ShardState.ACTIVE; + case INACTIVE: return ShardState.INACTIVE; + case CONSTRUCTION: return ShardState.CONSTRUCTION; + case RECOVERY: return ShardState.RECOVERY; + case RECOVERY_FAILED: return ShardState.RECOVERY_FAILED; + default: throw new RuntimeException("Unexpected " + state); + } + } + + @Override + public String getShardName() { + return shardName; + } + + @Override + public SolrCollection getCollection() { + return collection; + } + + @Override + public Replica getReplica(String name) { + return replicas.get(name); + } + + @Override + @Nonnull + public Iterator iterator() { + return replicas.values().iterator(); + } + + @Override + public Iterable replicas() { + return ShardImpl.this::iterator; + } + + @Override + public Replica getLeader() { + return leader; + } + + @Override + public ShardState getState() { + return shardState; + } + + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + ShardImpl other = (ShardImpl) obj; + return Objects.equals(this.shardName, other.shardName) + && Objects.equals(this.collection, other.collection) + && Objects.equals(this.shardState, other.shardState) + && Objects.equals(this.replicas, other.replicas) + && Objects.equals(this.leader, other.leader); + } + + public int hashCode() { + return Objects.hash(shardName, collection, shardState); + } + } + + + static class ReplicaImpl implements Replica { + private final String replicaName; + private final String coreName; + private final Shard shard; + private final ReplicaType replicaType; + private final ReplicaState replicaState; + private final Node node; + + /** + * Transforms {@link org.apache.solr.common.cloud.Replica}'s of a {@link Slice} into a map of {@link Replica}'s, + * keyed by replica name ({@link Replica#getReplicaName()}). Also returns in the + */ + static Pair, Replica> getReplicas(Collection sliceReplicas, Shard shard) { + Map replicas = Maps.newHashMap(); + Replica leader = null; + + for (org.apache.solr.common.cloud.Replica sliceReplica : sliceReplicas) { + String replicaName = sliceReplica.getName(); + Replica replica = new ReplicaImpl(replicaName, shard, sliceReplica); + replicas.put(replicaName, replica); + + if (sliceReplica.isLeader()) { + leader = replica; + } + } + + return new Pair<>(replicas, leader); + } + + private ReplicaImpl(String replicaName, Shard shard, org.apache.solr.common.cloud.Replica sliceReplica) { + this.replicaName = replicaName; + this.coreName = sliceReplica.getCoreName(); + this.shard = shard; + this.replicaType = translateType(sliceReplica.getType()); + this.replicaState = translateState(sliceReplica.getState()); + // Note this node might not be live, and if it is it is a different instance from the Nodes in Cluster, but that's ok. + this.node = new NodeImpl(sliceReplica.getNodeName()); + } + + private Replica.ReplicaType translateType(org.apache.solr.common.cloud.Replica.Type type) { + switch (type) { + case NRT: return Replica.ReplicaType.NRT; + case TLOG: return Replica.ReplicaType.TLOG; + case PULL: return Replica.ReplicaType.PULL; + default: throw new RuntimeException("Unexpected " + type); + } + } + + private Replica.ReplicaState translateState(org.apache.solr.common.cloud.Replica.State state) { + switch (state) { + case ACTIVE: return Replica.ReplicaState.ACTIVE; + case DOWN: return Replica.ReplicaState.DOWN; + case RECOVERING: return Replica.ReplicaState.RECOVERING; + case RECOVERY_FAILED: return Replica.ReplicaState.RECOVERY_FAILED; + default: throw new RuntimeException("Unexpected " + state); + } + } + + @Override + public Shard getShard() { + return shard; + } + + @Override + public ReplicaType getType() { + return replicaType; + } + + @Override + public ReplicaState getState() { + return replicaState; + } + + @Override + public String getReplicaName() { + return replicaName; + } + + @Override + public String getCoreName() { + return coreName; + } + + @Override + public Node getNode() { + return node; + } + + /** + * Translating a plugin visible ReplicaType to the internal Solr enum {@link org.apache.solr.common.cloud.Replica.Type}. + * The obvious approach would have been to add the internal Solr enum value as a parameter in the ReplicaType enum, + * but that would have leaked an internal SolrCloud implementation class to the plugin API. + */ + static org.apache.solr.common.cloud.Replica.Type toCloudReplicaType(ReplicaType type) { + switch (type) { + case NRT: return org.apache.solr.common.cloud.Replica.Type.NRT; + case TLOG: return org.apache.solr.common.cloud.Replica.Type.TLOG; + case PULL: return org.apache.solr.common.cloud.Replica.Type.PULL; + default: throw new IllegalArgumentException("Unknown " + type); + } + } + + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + ReplicaImpl other = (ReplicaImpl) obj; + return Objects.equals(this.replicaName, other.replicaName) + && Objects.equals(this.coreName, other.coreName) + && Objects.equals(this.shard, other.shard) + && Objects.equals(this.replicaType, other.replicaType) + && Objects.equals(this.replicaState, other.replicaState) + && Objects.equals(this.node, other.node); + } + + public int hashCode() { + return Objects.hash(replicaName, coreName, shard, replicaType, replicaState, node); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/package-info.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/package-info.java new file mode 100644 index 00000000000..c80cda384cc --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation classes (not visible to plugins, subject to change at any time) for the interfaces in {@link org.apache.solr.cluster.placement} + * and to make them work with the rest of Solr. + */ +package org.apache.solr.cluster.placement.impl; diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/package-info.java b/solr/core/src/java/org/apache/solr/cluster/placement/package-info.java new file mode 100644 index 00000000000..76a1dae59dc --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/package-info.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

This package contains interfaces visible by plugins (i.e. contributed code) implementing cluster elasticity, + * placement and scalability, as well as a few examples on how plugins can be implemented. + * + *

Initially, only placement related plugins are supported. + * + *

The entry point is the {@link org.apache.solr.cluster.placement.PlacementPluginFactory} building instances + * of the {@link org.apache.solr.cluster.placement.PlacementPlugin} interface where the placement computation is implemented. + * + *

From there, one will access the interfaces that allow navigating the cluster topology, see {@link org.apache.solr.cluster}. + * + *

Plugin code: + *

    + *
  • Gets work to be done by receiving a {@link org.apache.solr.cluster.placement.PlacementRequest},
  • + *
  • Can obtain more info using {@link org.apache.solr.cluster.placement.AttributeFetcher} and building an + * {@link org.apache.solr.cluster.placement.AttributeValues}
  • + *
  • Uses the values from {@link org.apache.solr.cluster.placement.AttributeValues} as well as cluster state and + * {@link org.apache.solr.cluster.SolrCollection#getCustomProperty} and other data to compute placement,
  • + *
  • Placement decisions are returned to Solr using an instance of {@link org.apache.solr.cluster.placement.PlacementPlan} + * built using the {@link org.apache.solr.cluster.placement.PlacementPlanFactory}
  • + *
+ */ +package org.apache.solr.cluster.placement; diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginAffinityReplicaPlacement.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginAffinityReplicaPlacement.java new file mode 100644 index 00000000000..2d18b3eb874 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginAffinityReplicaPlacement.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.plugins; + +import com.google.common.collect.*; +import org.apache.solr.cluster.*; +import org.apache.solr.cluster.placement.*; +import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.SuppressForbidden; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.*; +import java.util.stream.Collectors; + +/** + *

Implements placing replicas in a way that replicate past Autoscaling config defined + * here.

+ * + *

This specification is doing the following: + *

Spread replicas per shard as evenly as possible across multiple availability zones (given by a sys prop), + * assign replicas based on replica type to specific kinds of nodes (another sys prop), and avoid having more than + * one replica per shard on the same node.
+ * Only after these constraints are satisfied do minimize cores per node or disk usage.

+ * + *

Overall strategy of this plugin:

+ *
  • + * The set of nodes in the cluster is obtained and transformed into 3 independent sets (that can overlap) of nodes + * accepting each of the three replica types. + *
  • + * For each shard on which placing replicas is required and then for each replica type to place (starting with NRT, then TLOG then PULL):
      + *
    • The set of candidates nodes corresponding to the replica type is used and from that set are removed nodes + * that already have a replica (of any type) for that shard
    • + *
    • If there are not enough nodes, an error is thrown (this is checked further down during processing).
    • + *
    • The number of (already existing) replicas of the current type on each Availability Zone is collected.
    • + *
    • Separate the set of available nodes to as many subsets (possibly some are empty) as there are Availability Zones + * defined for the candidate nodes
    • + *
    • In each AZ nodes subset, sort the nodes by increasing total number of cores count, with possibly a condition + * that pushes nodes with low disk space to the end of the list? Or a weighted combination of the relative + * importance of these two factors? Some randomization? Marking as non available nodes with not enough disk space? + * These and other are likely aspects to be played with once the plugin is tested or observed to be running in prod, + * don't expect the initial code drop(s) to do all of that.
    • + *
    • Iterate over the number of replicas to place (for the current replica type for the current shard): + *
        + *
      • Based on the number of replicas per AZ collected previously, pick the non empty set of nodes having the + * lowest number of replicas. Then pick the first node in that set. That's the node the replica is placed one. + * Remove the node from the set of available nodes for the given AZ and increase the number of replicas placed + * on that AZ.
      • + *
    • + *
    • During this process, the number of cores on the nodes in general is tracked to take into account placement + * decisions so that not all shards decide to put their replicas on the same nodes (they might though if these are + * the less loaded nodes).
    • + *
    + *
  • + *
+ * + *

This code is a realistic placement computation, based on a few assumptions. The code is written in such a way to + * make it relatively easy to adapt it to (somewhat) different assumptions. Configuration options could be introduced + * to allow configuration base option selection as well...

+ * + *

In order to configure this plugin to be used for placement decisions, the following {@code curl} command (or something + * equivalent) has to be executed once the cluster is already running in order to set + * the appropriate Zookeeper stored configuration. Replace {@code localhost:8983} by one of your servers' IP address and port.

+ * + *
+ *
+  curl -X POST -H 'Content-type:application/json' -d '{
+    "set-placement-plugin": {
+      "class": "org.apache.solr.cluster.placement.plugins.SamplePluginAffinityReplicaPlacement$Factory",
+      "minimalFreeDiskGB": 10,
+      "deprioritizedFreeDiskGB": 50
+    }
+  }' http://localhost:8983/api/cluster
+ * 
+ * + *

The consequence will be the creation of an element in the Zookeeper file {@code /clusterprops.json} as follows:

+ * + *
+ *
+ * "placement-plugin":{
+ *     "class":"org.apache.solr.cluster.placement.plugins.SamplePluginAffinityReplicaPlacement$Factory",
+ *     "minimalFreeDiskGB":10,
+ *     "deprioritizedFreeDiskGB":50}
+ * 
+ * + *

In order to delete the placement-plugin section from {@code /clusterprops.json} (and to fallback to either Legacy + * or rule based placement if configured for a collection), execute:

+ * + *
+ *
+  curl -X POST -H 'Content-type:application/json' -d '{
+    "unset-placement-plugin" : null
+  }' http://localhost:8983/api/cluster
+ * 
+ */ +public class SamplePluginAffinityReplicaPlacement implements PlacementPlugin { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * This factory is instantiated by config from its class name. Using it is the only way to create instances of + * {@link SamplePluginAffinityReplicaPlacement}. + */ + static public class Factory implements PlacementPluginFactory { + + /** + * Empty public constructor is used to instantiate this factory. Using a factory pattern to allow the factory to do one + * time costly operations if needed, and to only have to instantiate a default constructor class by name, rather than + * having to call a constructor with more parameters (if we were to instantiate the plugin class directly without going + * through a factory). + */ + public Factory() { + } + + @Override + public PlacementPlugin createPluginInstance(PlacementPluginConfig config) { + final long minimalFreeDiskGB = config.getLongConfig("minimalFreeDiskGB", 20L); + final long deprioritizedFreeDiskGB = config.getLongConfig("deprioritizedFreeDiskGB", 100L); + return new SamplePluginAffinityReplicaPlacement(minimalFreeDiskGB, deprioritizedFreeDiskGB); + } + } + + + /** + *

Name of the system property on a node indicating which (public cloud) Availability Zone that node is in. The value + * is any string, different strings denote different availability zones. + * + *

Nodes on which this system property is not defined are considered being in the same Availability Zone + * {@link #UNDEFINED_AVAILABILITY_ZONE} (hopefully the value of this constant is not the name of a real Availability Zone :). + */ + public static final String AVAILABILITY_ZONE_SYSPROP = "availability_zone"; + /** This is the "AZ" name for nodes that do not define an AZ. Should not match a real AZ name (I think we're safe) */ + public static final String UNDEFINED_AVAILABILITY_ZONE = "uNd3f1NeD"; + + /** + *

Name of the system property on a node indicating the type of replicas allowed on that node. + * The value of that system property is a comma separated list or a single string of value names of + * {@link org.apache.solr.cluster.Replica.ReplicaType} (case insensitive). If that property is not defined, that node is + * considered accepting all replica types (i.e. undefined is equivalent to {@code "NRT,Pull,tlog"}). + * + *

See {@link #getNodesPerReplicaType}. + */ + public static final String REPLICA_TYPE_SYSPROP = "replica_type"; + + /** + * If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions. + * Set to 0 or less to disable. + */ + private final long minimalFreeDiskGB; + + /** + * Replica allocation will assign replicas to nodes with at least this number of GB of free disk space regardless + * of the number of cores on these nodes rather than assigning replicas to nodes with less than this amount of free + * disk space if that's an option (if that's not an option, replicas can still be assigned to nodes with less than this + * amount of free space). + */ + private final long deprioritizedFreeDiskGB; + + /** + * The factory has decoded the configuration for the plugin instance and passes it the parameters it needs. + */ + private SamplePluginAffinityReplicaPlacement(long minimalFreeDiskGB, long deprioritizedFreeDiskGB) { + this.minimalFreeDiskGB = minimalFreeDiskGB; + this.deprioritizedFreeDiskGB = deprioritizedFreeDiskGB; + } + + @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") + public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher, + PlacementPlanFactory placementPlanFactory) throws PlacementException { + Set nodes = request.getTargetNodes(); + SolrCollection solrCollection = request.getCollection(); + + // Request all needed attributes + attributeFetcher.requestNodeSystemProperty(AVAILABILITY_ZONE_SYSPROP).requestNodeSystemProperty(REPLICA_TYPE_SYSPROP); + attributeFetcher.requestNodeCoreCount().requestNodeFreeDisk(); + attributeFetcher.fetchFrom(nodes); + final AttributeValues attrValues = attributeFetcher.fetchAttributes(); + + // Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap if nodes accept multiple replica types) + // These subsets sets are actually maps, because we capture the number of cores (of any replica type) present on each node. + // Also get the number of currently existing cores per node, so we can keep update as we place new cores to not end up + // always selecting the same node(s). + Pair>, Map> p = getNodesPerReplicaType(nodes, attrValues); + + EnumMap> replicaTypeToNodes = p.first(); + Map coresOnNodes = p.second(); + + // All available zones of live nodes. Due to some nodes not being candidates for placement, and some existing replicas + // being one availability zones that might be offline (i.e. their nodes are not live), this set might contain zones + // on which it is impossible to place replicas. That's ok. + ImmutableSet availabilityZones = getZonesFromNodes(nodes, attrValues); + + // Build the replica placement decisions here + Set replicaPlacements = new HashSet<>(); + + // Let's now iterate on all shards to create replicas for and start finding home sweet homes for the replicas + for (String shardName : request.getShardNames()) { + // Iterate on the replica types in the enum order. We place more strategic replicas first + // (NRT is more strategic than TLOG more strategic than PULL). This is in case we eventually decide that less + // strategic replica placement impossibility is not a problem that should lead to replica placement computation + // failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node). + for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { + makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType), + attrValues, replicaTypeToNodes, coresOnNodes, placementPlanFactory, replicaPlacements); + } + } + + return placementPlanFactory.createPlacementPlan(request, replicaPlacements); + } + + private ImmutableSet getZonesFromNodes(Set nodes, final AttributeValues attrValues) { + Set azs = new HashSet<>(); + + for (Node n : nodes) { + azs.add(getNodeAZ(n, attrValues)); + } + + return ImmutableSet.copyOf(azs); + } + + /** + * Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property {@link #AVAILABILITY_ZONE_SYSPROP} + * to then return {@link #UNDEFINED_AVAILABILITY_ZONE} as the AZ name. + */ + private String getNodeAZ(Node n, final AttributeValues attrValues) { + Optional nodeAz = attrValues.getSystemProperty(n, AVAILABILITY_ZONE_SYSPROP); + // All nodes with undefined AZ will be considered part of the same AZ. This also works for deployments that do not care about AZ's + return nodeAz.orElse(UNDEFINED_AVAILABILITY_ZONE); + } + + /** + * This class captures an availability zone and the nodes that are legitimate targets for replica placement in that + * Availability Zone. Instances are used as values in a {@link TreeMap} in which the total number of already + * existing replicas in the AZ is the key. This allows easily picking the set of nodes from which to select a node for + * placement in order to balance the number of replicas per AZ. Picking one of the nodes from the set is done using + * different criteria unrelated to the Availability Zone (picking the node is based on the {@link CoresAndDiskComparator} + * ordering). + */ + private static class AzWithNodes { + final String azName; + List availableNodesForPlacement; + boolean hasBeenSorted; + + AzWithNodes(String azName, List availableNodesForPlacement) { + this.azName = azName; + this.availableNodesForPlacement = availableNodesForPlacement; + // Once the list is sorted to an order we're happy with, this flag is set to true to avoid sorting multiple times + // unnecessarily. + this.hasBeenSorted = false; + } + } + + /** + * Given the set of all nodes on which to do placement and fetched attributes, builds the sets representing + * candidate nodes for placement of replicas of each replica type. + * These sets are packaged and returned in an EnumMap keyed by replica type (1st member of the Pair). + * Also builds the number of existing cores on each node present in the returned EnumMap (2nd member of the returned Pair). + * Nodes for which the number of cores is not available for whatever reason are excluded from acceptable candidate nodes + * as it would not be possible to make any meaningful placement decisions. + * @param nodes all nodes on which this plugin should compute placement + * @param attrValues attributes fetched for the nodes. This method uses system property {@link #REPLICA_TYPE_SYSPROP} as + * well as the number of cores on each node. + */ + private Pair>, Map> getNodesPerReplicaType(Set nodes, final AttributeValues attrValues) { + EnumMap> replicaTypeToNodes = new EnumMap<>(Replica.ReplicaType.class); + Map coresOnNodes = Maps.newHashMap(); + + for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { + replicaTypeToNodes.put(replicaType, new HashSet<>()); + } + + for (Node node : nodes) { + // Exclude nodes with unknown or too small disk free space + if (attrValues.getFreeDisk(node).isEmpty()) { + if (log.isWarnEnabled()) { + log.warn("Unknown free disk on node {}, excluding it from placement decisions.", node.getName()); + } + // We rely later on the fact that the free disk optional is present (see CoresAndDiskComparator), be careful it you change anything here. + continue; + } if (attrValues.getFreeDisk(node).get() < minimalFreeDiskGB) { + if (log.isWarnEnabled()) { + log.warn("Node {} free disk ({}GB) lower than configured minimum {}GB, excluding it from placement decisions.", node.getName(), attrValues.getFreeDisk(node).get(), minimalFreeDiskGB); + } + continue; + } + + if (attrValues.getCoresCount(node).isEmpty()) { + if (log.isWarnEnabled()) { + log.warn("Unknown number of cores on node {}, excluding it from placement decisions.", node.getName()); + } + // We rely later on the fact that the number of cores optional is present (see CoresAndDiskComparator), be careful it you change anything here. + continue; + } + + Integer coresCount = attrValues.getCoresCount(node).get(); + coresOnNodes.put(node, coresCount); + + String supportedReplicaTypes = attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).isPresent() ? attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).get() : null; + // If property not defined or is only whitespace on a node, assuming node can take any replica type + if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) { + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + replicaTypeToNodes.get(rt).add(node); + } + } else { + Set acceptedTypes = Arrays.stream(supportedReplicaTypes.split(",")).map(String::trim).map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toSet()); + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) { + replicaTypeToNodes.get(rt).add(node); + } + } + } + } + return new Pair<>(replicaTypeToNodes, coresOnNodes); + } + + /** + *

Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas. + * + *

The criteria used in this method are, in this order: + *

    + *
  1. No more than one replica of a given shard on a given node (strictly enforced)
  2. + *
  3. Balance as much as possible the number of replicas of the given {@link org.apache.solr.cluster.Replica.ReplicaType} over available AZ's. + * This balancing takes into account existing replicas of the corresponding replica type, if any.
  4. + *
  5. Place replicas is possible on nodes having more than a certain amount of free disk space (note that nodes with a too small + * amount of free disk space were eliminated as placement targets earlier, in {@link #getNodesPerReplicaType}). There's + * a threshold here rather than sorting on the amount of free disk space, because sorting on that value would in + * practice lead to never considering the number of cores on a node.
  6. + *
  7. Place replicas on nodes having a smaller number of cores (the number of cores considered + * for this decision includes decisions made during the processing of the placement request)
  8. + *
+ */ + @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") + private void makePlacementDecisions(SolrCollection solrCollection, String shardName, ImmutableSet availabilityZones, + Replica.ReplicaType replicaType, int numReplicas, final AttributeValues attrValues, + EnumMap> replicaTypeToNodes, Map coresOnNodes, + PlacementPlanFactory placementPlanFactory, Set replicaPlacements) throws PlacementException { + // Build the set of candidate nodes, i.e. nodes not having (yet) a replica of the given shard + Set candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType)); + + // Count existing replicas per AZ. We count only instances the type of replica for which we need to do placement. This + // can be changed in the loop below if we want to count all replicas for the shard. + Map azToNumReplicas = Maps.newHashMap(); + // Add all "interesting" AZ's, i.e. AZ's for which there's a chance we can do placement. + for (String az : availabilityZones) { + azToNumReplicas.put(az, 0); + } + + Shard shard = solrCollection.getShard(shardName); + if (shard != null) { + // shard is non null if we're adding replicas to an already existing collection. + // If we're creating the collection, the shards do not exist yet. + for (Replica replica : shard.replicas()) { + // Nodes already having any type of replica for the shard can't get another replica. + candidateNodes.remove(replica.getNode()); + // The node's AZ has to be counted as having a replica if it has a replica of the same type as the one we need + // to place here (remove the "if" below to balance the number of replicas per AZ across all replica types rather + // than within each replica type, but then there's a risk that all NRT replicas for example end up on the same AZ). + // Note that if in the cluster nodes are configured to accept a single replica type and not multiple ones, the + // two options are equivalent (governed by system property AVAILABILITY_ZONE_SYSPROP on each node) + if (replica.getType() == replicaType) { + final String az = getNodeAZ(replica.getNode(), attrValues); + if (azToNumReplicas.containsKey(az)) { + // We do not count replicas on AZ's for which we don't have any node to place on because it would not help + // the placement decision. If we did want to do that, note the dereferencing below can't be assumed as the + // entry will not exist in the map. + azToNumReplicas.put(az, azToNumReplicas.get(az) + 1); + } + } + } + } + + // We now have the set of real candidate nodes, we've enforced "No more than one replica of a given shard on a given node". + // We also counted for the shard and replica type under consideration how many replicas were per AZ, so we can place + // (or try to place) replicas on AZ's that have fewer replicas + + // Get the candidate nodes per AZ in order to build (further down) a mapping of AZ to placement candidates. + Map> nodesPerAz = Maps.newHashMap(); + for (Node node : candidateNodes) { + String nodeAz = getNodeAZ(node, attrValues); + List nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new ArrayList<>()); + nodesForAz.add(node); + } + + // Build a treeMap sorted by the number of replicas per AZ and including candidates nodes suitable for placement on the + // AZ, so we can easily select the next AZ to get a replica assignment and quickly (constant time) decide if placement + // on this AZ is possible or not. + TreeMultimap azByExistingReplicas = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary()); + for (Map.Entry> e : nodesPerAz.entrySet()) { + azByExistingReplicas.put(azToNumReplicas.get(e.getKey()), new AzWithNodes(e.getKey(), e.getValue())); + } + + CoresAndDiskComparator coresAndDiskComparator = new CoresAndDiskComparator(attrValues, coresOnNodes, deprioritizedFreeDiskGB); + + // Now we have for each AZ on which we might have a chance of placing a replica, the list of candidate nodes for replicas + // (candidate: does not already have a replica of this shard and is in the corresponding AZ). + // We must now select those of the nodes on which we actually place the replicas, and will do that based on the total + // number of cores already present on these nodes as well as the free disk space. + // We sort once by the order related to number of cores and disk space each list of nodes on an AZ. We do not sort all + // of them ahead of time because we might be placing a small number of replicas and it might be wasted work. + for (int i = 0; i < numReplicas; i++) { + // Pick the AZ having the lowest number of replicas for this shard, and if that AZ has available nodes, pick the + // most appropriate one (based on number of cores and disk space constraints). In the process, remove entries (AZ's) + // that do not have nodes to place replicas on because these are useless to us. + Map.Entry azWithNodesEntry = null; + for (Iterator> it = azByExistingReplicas.entries().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + if (!entry.getValue().availableNodesForPlacement.isEmpty()) { + azWithNodesEntry = entry; + // Remove this entry. Will add it back after a node has been removed from the list of available nodes and the number + // of replicas on the AZ has been increased by one (search for "azByExistingReplicas.put" below). + it.remove(); + break; + } else { + it.remove(); + } + } + + if (azWithNodesEntry == null) { + // This can happen because not enough nodes for the placement request or already too many nodes with replicas of + // the shard that can't accept new replicas or not enough nodes with enough free disk space. + throw new PlacementException("Not enough nodes to place " + numReplicas + " replica(s) of type " + replicaType + + " for shard " + shardName + " of collection " + solrCollection.getName()); + } + + AzWithNodes azWithNodes = azWithNodesEntry.getValue(); + List nodes = azWithNodes.availableNodesForPlacement; + + if (!azWithNodes.hasBeenSorted) { + // Make sure we do not tend to use always the same nodes (within an AZ) if all conditions are identical (well, this + // likely is not the case since after having added a replica to a node its number of cores increases for the next + // placement decision, but let's be defensive here, given that multiple concurrent placement decisions might see + // the same initial cluster state, and we want placement to be reasonable even in that case without creating an + // unnecessary imbalance). + // For example, if all nodes have 0 cores and same amount of free disk space, ideally we want to pick a random node + // for placement, not always the same one due to some internal ordering. + Collections.shuffle(nodes, new Random()); + + // Sort by increasing number of cores but pushing nodes with low free disk space to the end of the list + nodes.sort(coresAndDiskComparator); + + azWithNodes.hasBeenSorted = true; + } + + Node assignTarget = nodes.remove(0); + + // Insert back a corrected entry for the AZ: one more replica living there and one less node that can accept new replicas + // (the remaining candidate node list might be empty, in which case it will be cleaned up on the next iteration). + azByExistingReplicas.put(azWithNodesEntry.getKey() + 1, azWithNodes); + + // Track that the node has one more core. These values are only used during the current run of the plugin. + coresOnNodes.merge(assignTarget, 1, Integer::sum); + + // Register the replica assignment just decided + replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, assignTarget, replicaType)); + } + } + + /** + * Comparator implementing the placement strategy based on free space and number of cores: we want to place new replicas + * on nodes with the less number of cores, but only if they do have enough disk space (expressed as a threshold value). + */ + static class CoresAndDiskComparator implements Comparator { + private final AttributeValues attrValues; + private final Map coresOnNodes; + private final long deprioritizedFreeDiskGB; + + + /** + * The data we sort on is not part of the {@link Node} instances but has to be retrieved from the attributes and configuration. + * The number of cores per node is passed in a map whereas the free disk is fetched from the attributes due to the + * fact that we update the number of cores per node as we do allocations, but we do not update the free disk. The + * attrValues correpsonding to the number of cores per node are the initial values, but we want to comapre the actual + * value taking into account placement decisions already made during the current execution of the placement plugin. + */ + CoresAndDiskComparator(AttributeValues attrValues, Map coresOnNodes, long deprioritizedFreeDiskGB) { + this.attrValues = attrValues; + this.coresOnNodes = coresOnNodes; + this.deprioritizedFreeDiskGB = deprioritizedFreeDiskGB; + } + + @Override + public int compare(Node a, Node b) { + // Note all nodes do have free disk defined. This has been verified earlier. + boolean aHasLowFreeSpace = attrValues.getFreeDisk(a).get() < deprioritizedFreeDiskGB; + boolean bHasLowFreeSpace = attrValues.getFreeDisk(b).get() < deprioritizedFreeDiskGB; + if (aHasLowFreeSpace != bHasLowFreeSpace) { + // A node with low free space should be considered > node with high free space since it needs to come later in sort order + return Boolean.compare(aHasLowFreeSpace, bHasLowFreeSpace); + } + // The ordering on the number of cores is the natural order. + return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b)); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginMinimizeCores.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginMinimizeCores.java new file mode 100644 index 00000000000..54520fc281e --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginMinimizeCores.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.plugins; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.Map; + +import com.google.common.collect.Ordering; +import com.google.common.collect.TreeMultimap; +import org.apache.solr.cluster.Cluster; +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.SolrCollection; +import org.apache.solr.cluster.placement.*; +import org.apache.solr.common.util.SuppressForbidden; + +/** + *

Implements placing replicas to minimize number of cores per {@link Node}, while not placing two replicas of the same + * shard on the same node.

+ * + *

Warning: not really tested. See {@link SamplePluginAffinityReplicaPlacement} for a more realistic example.

+ */ +public class SamplePluginMinimizeCores implements PlacementPlugin { + + private final PlacementPluginConfig config; + + private SamplePluginMinimizeCores(PlacementPluginConfig config) { + this.config = config; + } + + static public class Factory implements PlacementPluginFactory { + + /** + * Empty public constructor is used to instantiate this factory based on configuration in solr.xml, element + * {@code } in element {@code }. + */ + public Factory() { + } + + @Override + public PlacementPlugin createPluginInstance(PlacementPluginConfig config) { + return new SamplePluginMinimizeCores(config); + } + } + + @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") + public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher, + PlacementPlanFactory placementPlanFactory) throws PlacementException { + int totalReplicasPerShard = 0; + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + totalReplicasPerShard += request.getCountReplicasToCreate(rt); + } + + if (cluster.getLiveNodes().size() < totalReplicasPerShard) { + throw new PlacementException("Cluster size too small for number of replicas per shard"); + } + + // Get number of cores on each Node + TreeMultimap nodesByCores = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary()); + + Set nodes = request.getTargetNodes(); + + attributeFetcher.requestNodeCoreCount(); + attributeFetcher.fetchFrom(nodes); + AttributeValues attrValues = attributeFetcher.fetchAttributes(); + + + // Get the number of cores on each node and sort the nodes by increasing number of cores + for (Node node : nodes) { + if (attrValues.getCoresCount(node).isEmpty()) { + throw new PlacementException("Can't get number of cores in " + node); + } + nodesByCores.put(attrValues.getCoresCount(node).get(), node); + } + + Set replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size()); + + // Now place all replicas of all shards on nodes, by placing on nodes with the smallest number of cores and taking + // into account replicas placed during this computation. Note that for each shard we must place replicas on different + // nodes, when moving to the next shard we use the nodes sorted by their updated number of cores (due to replica + // placements for previous shards). + for (String shardName : request.getShardNames()) { + // Assign replicas based on the sort order of the nodesByCores tree multimap to put replicas on nodes with less + // cores first. We only need totalReplicasPerShard nodes given that's the number of replicas to place. + // We assign based on the passed nodeEntriesToAssign list so the right nodes get replicas. + ArrayList> nodeEntriesToAssign = new ArrayList<>(totalReplicasPerShard); + Iterator> treeIterator = nodesByCores.entries().iterator(); + for (int i = 0; i < totalReplicasPerShard; i++) { + nodeEntriesToAssign.add(treeIterator.next()); + } + + // Update the number of cores each node will have once the assignments below got executed so the next shard picks the + // lowest loaded nodes for its replicas. + for (Map.Entry e : nodeEntriesToAssign) { + int coreCount = e.getKey(); + Node node = e.getValue(); + nodesByCores.remove(coreCount, node); + nodesByCores.put(coreCount + 1, node); + } + + for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { + placeReplicas(request.getCollection(), nodeEntriesToAssign, placementPlanFactory, replicaPlacements, shardName, request, replicaType); + } + } + + return placementPlanFactory.createPlacementPlan(request, replicaPlacements); + } + + private void placeReplicas(SolrCollection solrCollection, ArrayList> nodeEntriesToAssign, + PlacementPlanFactory placementPlanFactory, Set replicaPlacements, + String shardName, PlacementRequest request, Replica.ReplicaType replicaType) { + for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) { + final Map.Entry entry = nodeEntriesToAssign.remove(0); + final Node node = entry.getValue(); + + replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, node, replicaType)); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginRandomPlacement.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginRandomPlacement.java new file mode 100644 index 00000000000..eecb57f902c --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SamplePluginRandomPlacement.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cluster.placement.plugins; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.solr.cluster.Cluster; +import org.apache.solr.cluster.Node; +import org.apache.solr.cluster.Replica; +import org.apache.solr.cluster.SolrCollection; +import org.apache.solr.cluster.placement.*; + +/** + * Implements random placement for new collection creation while preventing two replicas of same shard from being placed on same node. + * + *

Warning: not really tested. See {@link SamplePluginAffinityReplicaPlacement} for a more realistic example.

+ */ +public class SamplePluginRandomPlacement implements PlacementPlugin { + + private final PlacementPluginConfig config; + + private SamplePluginRandomPlacement(PlacementPluginConfig config) { + this.config = config; + } + + static public class Factory implements PlacementPluginFactory { + @Override + public PlacementPlugin createPluginInstance(PlacementPluginConfig config) { + return new SamplePluginRandomPlacement(config); + } + } + + public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher, + PlacementPlanFactory placementPlanFactory) throws PlacementException { + int totalReplicasPerShard = 0; + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + totalReplicasPerShard += request.getCountReplicasToCreate(rt); + } + + if (cluster.getLiveNodes().size() < totalReplicasPerShard) { + throw new PlacementException("Cluster size too small for number of replicas per shard"); + } + + Set replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size()); + + // Now place randomly all replicas of all shards on available nodes + for (String shardName : request.getShardNames()) { + // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes + ArrayList nodesToAssign = new ArrayList<>(cluster.getLiveNodes()); + Collections.shuffle(nodesToAssign, new Random()); + + for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { + placeForReplicaType(request.getCollection(), nodesToAssign, placementPlanFactory, replicaPlacements, shardName, request, rt); + } + } + + return placementPlanFactory.createPlacementPlan(request, replicaPlacements); + } + + private void placeForReplicaType(SolrCollection solrCollection, ArrayList nodesToAssign, PlacementPlanFactory placementPlanFactory, + Set replicaPlacements, + String shardName, PlacementRequest request, Replica.ReplicaType replicaType) { + for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) { + Node node = nodesToAssign.remove(0); + + replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, node, replicaType)); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java new file mode 100644 index 00000000000..159567912d5 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Sample plugin implementations. + */ +package org.apache.solr.cluster.placement.plugins; \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java index 8d1fcae333f..9b4fa748669 100644 --- a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java +++ b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java @@ -17,6 +17,7 @@ package org.apache.solr.handler; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -24,6 +25,7 @@ import org.apache.solr.api.Command; import org.apache.solr.api.EndPoint; import org.apache.solr.api.PayloadObj; import org.apache.solr.client.solrj.request.beans.ClusterPropInfo; +import org.apache.solr.cluster.placement.impl.PlacementPluginConfigImpl; import org.apache.solr.common.SolrException; import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.cloud.ClusterProperties; @@ -81,9 +83,8 @@ public class ClusterAPI { CollectionsHandler.CollectionOperation.DELETESTATUS_OP.execute(req, rsp, coreContainer.getCollectionsHandler()); } - @SuppressWarnings({"rawtypes"}) public static SolrQueryRequest wrapParams(SolrQueryRequest req, Object... def) { - Map m = Utils.makeMap(def); + Map m = Utils.makeMap(def); return wrapParams(req, m); } @@ -117,8 +118,6 @@ public class ClusterAPI { path = "/cluster", permission = COLL_EDIT_PERM) public class Commands { - - @Command(name = "add-role") @SuppressWarnings({"rawtypes", "unchecked"}) public void addRole(PayloadObj obj) throws Exception { @@ -134,8 +133,7 @@ public class ClusterAPI { RoleInfo info = obj.get(); Map m = info.toMap(new HashMap<>()); m.put("action", REMOVEROLE.toString()); - collectionsHandler.handleRequestBody(wrapParams(obj.getRequest(),m), obj.getResponse()); - + collectionsHandler.handleRequestBody(wrapParams(obj.getRequest(), m), obj.getResponse()); } @Command(name = "set-obj-property") @@ -149,7 +147,6 @@ public class ClusterAPI { } catch (Exception e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error in API", e); } - } @Command(name = "set-property") @@ -158,9 +155,38 @@ public class ClusterAPI { Map m = obj.get(); m.put("action", CLUSTERPROP.toString()); collectionsHandler.handleRequestBody(wrapParams(obj.getRequest(),m ), obj.getResponse()); - } + @Command(name = "set-placement-plugin") + public void setPlacementPlugin(PayloadObj> obj) { + Map placementPluginConfig = obj.getDataMap(); + ClusterProperties clusterProperties = new ClusterProperties(coreContainer.getZkController().getZkClient()); + // Very basic sanity check (not checking class actually exists) + if (!placementPluginConfig.containsKey(PlacementPluginConfigImpl.CONFIG_CLASS)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Must contain " + PlacementPluginConfigImpl.CONFIG_CLASS + " attribute"); + } + try { + // Need to reset to null first otherwise the mappings in placementPluginConfig are added to existing ones + // in /clusterprops.json rather than replace them + clusterProperties.setClusterProperties( + Collections.singletonMap(PlacementPluginConfigImpl.PLACEMENT_PLUGIN_CONFIG_KEY, null)); + clusterProperties.setClusterProperties( + Collections.singletonMap(PlacementPluginConfigImpl.PLACEMENT_PLUGIN_CONFIG_KEY, placementPluginConfig)); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error in API", e); + } + } + + @Command(name = "unset-placement-plugin") + public void unsetPlacementPlugin(PayloadObj obj) { + ClusterProperties clusterProperties = new ClusterProperties(coreContainer.getZkController().getZkClient()); + try { + clusterProperties.setClusterProperties( + Collections.singletonMap(PlacementPluginConfigImpl.PLACEMENT_PLUGIN_CONFIG_KEY, null)); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error in API", e); + } + } } public static class RoleInfo implements ReflectMapWriter { @@ -168,8 +194,5 @@ public class ClusterAPI { public String node; @JsonProperty(required = true) public String role; - } - - }