diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c837eb7c976..c68c397bc21 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -123,6 +123,12 @@ New Features * SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties (noble) +* SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node. A collection may be + co-located with another collection during collection creation time by specifying a 'withCollection' parameter. It can + also be co-located afterwards by using the modify collection API. The co-location guarantee is enforced regardless of + future cluster operations whether they are invoked manually via the Collection API or by the Autoscaling framework. + (noble, shalin) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java index c07a85adb02..0feeec99d09 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java @@ -23,15 +23,16 @@ import java.lang.invoke.MethodHandles; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.StringUtils; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; -import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.cloud.ActiveReplicaWatcher; import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.Overseer; @@ -43,6 +44,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; @@ -52,11 +54,12 @@ import org.apache.solr.handler.component.ShardHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT; @@ -79,6 +82,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) throws IOException, InterruptedException { log.debug("addReplica() : {}", Utils.toJSONString(message)); + + String collectionName = message.getStr(COLLECTION_PROP); + DocCollection coll = clusterState.getCollection(collectionName); + boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); final String asyncId = message.getStr(ASYNC); @@ -86,9 +93,6 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { AtomicReference sessionWrapper = new AtomicReference<>(); message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper); - String collection = message.getStr(COLLECTION_PROP); - DocCollection coll = clusterState.getCollection(collection); - String node = message.getStr(CoreAdminParams.NODE); String shard = message.getStr(SHARD_ID_PROP); String coreName = message.getStr(CoreAdminParams.NAME); @@ -97,6 +101,27 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); boolean parallel = message.getBool("parallel", false); + if (coll.getStr(WITH_COLLECTION) != null) { + String withCollectionName = coll.getStr(WITH_COLLECTION); + DocCollection withCollection = clusterState.getCollection(withCollectionName); + if (withCollection.getActiveSlices().size() > 1) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + withCollection.getActiveSlices().size()); + } + String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName(); + + List replicas = withCollection.getReplicas(node); + if (replicas == null || replicas.isEmpty()) { + // create a replica of withCollection on the identified node before proceeding further + ZkNodeProps props = new ZkNodeProps( + Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), + ZkStateReader.COLLECTION_PROP, withCollectionName, + ZkStateReader.SHARD_ID_PROP, withCollectionShard, + "node", node, + CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created + addReplica(clusterState, props, results, null); + } + } + ModifiableSolrParams params = new ModifiableSolrParams(); ZkStateReader zkStateReader = ocmh.zkStateReader; @@ -104,7 +129,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { if (!skipCreateReplicaInClusterState) { ZkNodeProps props = new ZkNodeProps( Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(), - ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.COLLECTION_PROP, collectionName, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), @@ -121,10 +146,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } } params.set(CoreAdminParams.CORE_NODE_NAME, - ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName()); + ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreName)).get(coreName).getName()); } - String configName = zkStateReader.readConfigName(collection); + String configName = zkStateReader.readConfigName(collectionName); String routeKey = message.getStr(ShardParams._ROUTE_); String dataDir = message.getStr(CoreAdminParams.DATA_DIR); String ulogDir = message.getStr(CoreAdminParams.ULOG_DIR); @@ -133,7 +158,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString()); params.set(CoreAdminParams.NAME, coreName); params.set(COLL_CONF, configName); - params.set(CoreAdminParams.COLLECTION, collection); + params.set(CoreAdminParams.COLLECTION, collectionName); params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name()); if (shard != null) { params.set(CoreAdminParams.SHARD, shard); @@ -172,7 +197,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { Runnable runnable = () -> { ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap); - ocmh.waitForCoreNodeName(collection, fnode, fcoreName); + ocmh.waitForCoreNodeName(collectionName, fnode, fcoreName); if (sessionWrapper.get() != null) { sessionWrapper.get().release(); } @@ -182,15 +207,15 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { if (!parallel || waitForFinalState) { if (waitForFinalState) { SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh); - ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), latch); + ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, Collections.singletonList(coreName), latch); try { - zkStateReader.registerCollectionStateWatcher(collection, watcher); + zkStateReader.registerCollectionStateWatcher(collectionName, watcher); runnable.run(); if (!latch.await(timeout, TimeUnit.SECONDS)) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active."); } } finally { - zkStateReader.removeCollectionStateWatcher(collection, watcher); + zkStateReader.removeCollectionStateWatcher(collectionName, watcher); } } else { runnable.run(); @@ -201,7 +226,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { return new ZkNodeProps( - ZkStateReader.COLLECTION_PROP, collection, + ZkStateReader.COLLECTION_PROP, collectionName, ZkStateReader.SHARD_ID_PROP, shard, ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.NODE_NAME_PROP, node diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index a776820817f..45ced2b674d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -32,14 +32,14 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.solr.client.solrj.cloud.DistribStateManager; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException; -import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; -import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; @@ -47,6 +47,7 @@ import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.Replica; @@ -73,13 +74,14 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; +import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; import static org.apache.solr.common.params.CommonParams.NAME; @@ -106,26 +108,48 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName); } + String withCollection = message.getStr(CollectionAdminParams.WITH_COLLECTION); + String withCollectionShard = null; + if (withCollection != null) { + if (!clusterState.hasCollection(withCollection)) { + throw new SolrException(ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection); + } else { + DocCollection collection = clusterState.getCollection(withCollection); + if (collection.getActiveSlices().size() > 1) { + throw new SolrException(ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size()); + } + withCollectionShard = collection.getActiveSlices().iterator().next().getName(); + } + } + String configName = getConfigName(collectionName, message); if (configName == null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection."); } ocmh.validateConfigOrThrowSolrException(configName); + + List nodeList = new ArrayList<>(); + String router = message.getStr("router.name", DocRouter.DEFAULT_NAME); + String policy = message.getStr(Policy.POLICY); + AutoScalingConfig autoScalingConfig = ocmh.cloudManager.getDistribStateManager().getAutoScalingConfig(); + boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null; + + // fail fast if parameters are wrong or incomplete + List shardNames = populateShardNames(message, router); + checkMaxShardsPerNode(message, usePolicyFramework); + checkReplicaTypes(message); + AtomicReference sessionWrapper = new AtomicReference<>(); try { final String async = message.getStr(ASYNC); - List nodeList = new ArrayList<>(); - List shardNames = new ArrayList<>(); - List replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message, - nodeList, shardNames, sessionWrapper); ZkStateReader zkStateReader = ocmh.zkStateReader; boolean isLegacyCloud = Overseer.isLegacy(zkStateReader); - ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud); + OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName, isLegacyCloud); Map collectionParams = new HashMap<>(); Map collectionProps = message.getProperties(); @@ -134,12 +158,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), (String) collectionProps.get(propName)); } } - + createCollectionZkNode(stateManager, collectionName, collectionParams); - + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message)); - // wait for a while until we don't see the collection + // wait for a while until we see the collection TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource); boolean created = false; while (! waitUntil.hasTimedOut()) { @@ -147,8 +171,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName); if(created) break; } - if (!created) + if (!created) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName); + } + + List replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message, + nodeList, shardNames, sessionWrapper); if (nodeList.isEmpty()) { log.debug("Finished create command for collection: {}", collectionName); @@ -165,6 +193,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); for (ReplicaPosition replicaPosition : replicaPositions) { String nodeName = replicaPosition.node; + + if (withCollection != null) { + // check that we have a replica of `withCollection` on this node and if not, create one + DocCollection collection = clusterState.getCollection(withCollection); + List replicas = collection.getReplicas(nodeName); + if (replicas == null || replicas.isEmpty()) { + ZkNodeProps props = new ZkNodeProps( + Overseer.QUEUE_OPERATION, ADDREPLICA.toString(), + ZkStateReader.COLLECTION_PROP, withCollection, + ZkStateReader.SHARD_ID_PROP, withCollectionShard, + "node", nodeName, + CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created + new AddReplicaCmd(ocmh).call(clusterState, props, results); + clusterState = zkStateReader.getClusterState(); // refresh + } + } + String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName), replicaPosition.shard, replicaPosition.type, true); @@ -251,6 +296,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd + " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'"); } } + + // modify the `withCollection` and store this new collection's name with it + if (withCollection != null) { + ZkNodeProps props = new ZkNodeProps( + Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(), + ZkStateReader.COLLECTION_PROP, withCollection, + CollectionAdminParams.COLOCATED_WITH, collectionName); + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); + } + } catch (SolrException ex) { throw ex; } catch (Exception ex) { @@ -274,29 +329,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd String policy = message.getStr(Policy.POLICY); boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null; - Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null); - String router = message.getStr("router.name", DocRouter.DEFAULT_NAME); - if(ImplicitDocRouter.NAME.equals(router)){ - ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null)); - numSlices = shardNames.size(); - } else { - if (numSlices == null ) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router)."); - } - if (numSlices <= 0) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0"); - } - ClusterStateMutator.getShardNames(numSlices, shardNames); - } - - int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1); - if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) { - throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used"); - } - if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE; - if (numNrtReplicas + numTlogReplicas <= 0) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0"); - } + Integer numSlices = shardNames.size(); + int maxShardsPerNode = checkMaxShardsPerNode(message, usePolicyFramework); // we need to look at every node and see how many cores it serves // add our new cores to existing nodes serving the least number of cores @@ -343,6 +377,43 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd return replicaPositions; } + public static int checkMaxShardsPerNode(ZkNodeProps message, boolean usePolicyFramework) { + int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1); + if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) { + throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used"); + } + if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE; + + return maxShardsPerNode; + } + + public static void checkReplicaTypes(ZkNodeProps message) { + int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0); + int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas > 0 ? 0 : 1)); + + if (numNrtReplicas + numTlogReplicas <= 0) { + throw new SolrException(ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0"); + } + } + + public static List populateShardNames(ZkNodeProps message, String router) { + List shardNames = new ArrayList<>(); + Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null); + if (ImplicitDocRouter.NAME.equals(router)) { + ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null)); + numSlices = shardNames.size(); + } else { + if (numSlices == null) { + throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router)."); + } + if (numSlices <= 0) { + throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0"); + } + ClusterStateMutator.getShardNames(numSlices, shardNames); + } + return shardNames; + } + String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException { String configName = message.getStr(COLL_CONF); @@ -370,7 +441,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd } return "".equals(configName)? null: configName; } - + /** * Copies the _default configset to the specified configset name (overwrites if pre-existing) */ @@ -476,7 +547,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd } } - + private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map collectionProps) throws IOException, KeeperException, InterruptedException { // check for configName diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java index c676cf3a1d8..9a569d13d70 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java @@ -31,6 +31,7 @@ import org.apache.solr.common.NonExistentCoreException; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; @@ -49,6 +50,8 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonParams.NAME; @@ -69,6 +72,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd ZkStateReader zkStateReader = ocmh.zkStateReader; checkNotReferencedByAlias(zkStateReader, collection); + checkNotColocatedWith(zkStateReader, collection); final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true); @@ -181,4 +185,21 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd .map(Map.Entry::getKey) // alias name .findFirst().orElse(null); } + + private void checkNotColocatedWith(ZkStateReader zkStateReader, String collection) throws Exception { + DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection); + if (docCollection != null) { + String colocatedWith = docCollection.getStr(COLOCATED_WITH); + if (colocatedWith != null) { + DocCollection colocatedCollection = zkStateReader.getClusterState().getCollectionOrNull(colocatedWith); + if (colocatedCollection != null && collection.equals(colocatedCollection.getStr(WITH_COLLECTION))) { + // todo how do we clean up if reverse-link is not present? + // can't delete this collection because it is still co-located with another collection + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Collection: " + collection + " is co-located with collection: " + colocatedWith + + " remove the link using modify collection API or delete the co-located collection: " + colocatedWith); + } + } + } + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java index f9392b5b259..4a9bd599190 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java @@ -33,6 +33,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; @@ -104,19 +105,34 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd { if (shardId == null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param"); } - Slice slice = clusterState.getCollection(collection).getSlice(shardId); - List sliceReplicas = new ArrayList<>(slice.getReplicas()); - Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM); - // this picks up a single random replica from the sourceNode - for (Replica r : slice.getReplicas()) { - if (r.getNodeName().equals(sourceNode)) { - replica = r; - } - } - if (replica == null) { + Slice slice = coll.getSlice(shardId); + List sliceReplicas = new ArrayList<>(slice.getReplicas(r -> sourceNode.equals(r.getNodeName()))); + if (sliceReplicas.isEmpty()) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId); } + Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM); + replica = sliceReplicas.iterator().next(); + } + + if (coll.getStr(CollectionAdminParams.COLOCATED_WITH) != null) { + // we must ensure that moving this replica does not cause the co-location to break + String sourceNode = replica.getNodeName(); + String colocatedCollectionName = coll.getStr(CollectionAdminParams.COLOCATED_WITH); + DocCollection colocatedCollection = clusterState.getCollectionOrNull(colocatedCollectionName); + if (colocatedCollection != null) { + if (colocatedCollection.getReplica((s, r) -> sourceNode.equals(r.getNodeName())) != null) { + // check if we have at least two replicas of the collection on the source node + // only then it is okay to move one out to another node + List replicasOnSourceNode = coll.getReplicas(replica.getNodeName()); + if (replicasOnSourceNode == null || replicasOnSourceNode.size() < 2) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Collection: " + collection + " is co-located with collection: " + colocatedCollectionName + + " and has a single replica: " + replica.getName() + " on node: " + replica.getNodeName() + + " so it is not possible to move it to another node"); + } + } + } } log.info("Replica will be moved to node {}: {}", targetNode, replica); @@ -151,7 +167,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd { ); removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false); removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false); - if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async); + if (async != null) removeReplicasProps.getProperties().put(ASYNC, async); NamedList deleteResult = new NamedList(); ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null); if (deleteResult.get("failure") != null) { @@ -256,7 +272,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd { } if (addResult.get("failure") != null) { String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + - " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); + " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); log.warn(errorString); results.add("failure", errorString); if (watcher != null) { // unregister diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index d7d555eed8d..e15c3899064 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -105,6 +105,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION; +import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonParams.NAME; @@ -149,7 +151,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, ZkStateReader.AUTO_ADD_REPLICAS, "false", DocCollection.RULE, null, POLICY, null, - SNITCH, null)); + SNITCH, null, + WITH_COLLECTION, null, + COLOCATED_WITH, null)); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 22e3ef5e77e..4cb15ead875 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -28,11 +28,11 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; -import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester; import org.apache.solr.common.SolrException; @@ -89,8 +89,8 @@ public class ComputePlanAction extends TriggerActionBase { log.trace("-- state: {}", clusterState); } try { - Suggester intialSuggester = getSuggester(session, event, context, cloudManager); - Suggester suggester = intialSuggester; + Suggester initialSuggester = getSuggester(session, event, context, cloudManager); + Suggester suggester = initialSuggester; int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState); int requestedOperations = getRequestedNumOps(event); if (requestedOperations > maxOperations) { @@ -119,13 +119,13 @@ public class ComputePlanAction extends TriggerActionBase { // unless a specific number of ops was requested // uncomment the following to log too many operations /*if (opCount > 10) { - PolicyHelper.logState(cloudManager, intialSuggester); + PolicyHelper.logState(cloudManager, initialSuggester); }*/ if (operation == null) { if (requestedOperations < 0) { //uncomment the following to log zero operations -// PolicyHelper.logState(cloudManager, intialSuggester); +// PolicyHelper.logState(cloudManager, initialSuggester); break; } else { log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try."); @@ -225,6 +225,7 @@ public class ComputePlanAction extends TriggerActionBase { for (Map.Entry e : op.getHints().entrySet()) { suggester = suggester.hint(e.getKey(), e.getValue()); } + suggester = suggester.forceOperation(true); start++; event.getProperties().put(START, start); break; diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 1b5edd7a9f8..8d7cdbf43c4 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -104,7 +104,6 @@ import static org.apache.solr.client.solrj.response.RequestStatusState.NOT_FOUND import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING; import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED; import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION; -import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_PROP_PREFIX; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY; @@ -137,9 +136,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION; +import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_NAME; import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_VALUE; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE; @@ -480,11 +481,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission TLOG_REPLICAS, NRT_REPLICAS, POLICY, - WAIT_FOR_FINAL_STATE); + WAIT_FOR_FINAL_STATE, + WITH_COLLECTION); - if (props.get(STATE_FORMAT) == null) { - props.put(STATE_FORMAT, "2"); - } + props.putIfAbsent(STATE_FORMAT, "2"); if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) { //TODO: Remove this in 8.0 . Keep this for SolrJ client back-compat. See SOLR-11676 for more details diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java b/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java new file mode 100644 index 00000000000..a4816a01b56 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.autoscaling.ActionContext; +import org.apache.solr.cloud.autoscaling.ComputePlanAction; +import org.apache.solr.cloud.autoscaling.ExecutePlanAction; +import org.apache.solr.cloud.autoscaling.TriggerActionBase; +import org.apache.solr.cloud.autoscaling.TriggerEvent; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.util.LogLevel; +import org.apache.solr.util.TimeOut; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; + +/** + * Tests for co-locating a collection with another collection such that any Collection API + * always ensures that the co-location is never broken. + * + * See SOLR-11990 for more details. + */ +@LogLevel("org.apache.solr.cloud.autoscaling=TRACE;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG") +public class TestWithCollection extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static SolrCloudManager cloudManager; + + private static final int NUM_JETTIES = 2; + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(NUM_JETTIES) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + if (zkClient().exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true)) { + zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(StandardCharsets.UTF_8), true); + } + ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState(); + for (Map.Entry entry : clusterState.getCollectionStates().entrySet()) { + if (entry.getKey().contains("_xyz")) { + try { + CollectionAdminRequest.deleteCollection(entry.getKey()).process(cluster.getSolrClient()); + } catch (Exception e) { + log.error("Exception while deleting collection: " + entry.getKey()); + } + } + } + cluster.deleteAllCollections(); + cluster.getSolrClient().setDefaultCollection(null); + + cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + LATCH = new CountDownLatch(1); + + int jettys = cluster.getJettySolrRunners().size(); + if (jettys < NUM_JETTIES) { + for (int i = jettys; i < NUM_JETTIES; i++) { + cluster.startJettySolrRunner(); + } + } else { + for (int i = jettys; i > NUM_JETTIES; i--) { + cluster.stopJettySolrRunner(i - 1); + } + } + } + + private void deleteChildrenRecursively(String path) throws Exception { + cloudManager.getDistribStateManager().removeRecursively(path, true, false); + } + + @Test + public void testCreateCollectionNoWithCollection() throws IOException, SolrServerException { + String prefix = "testCreateCollectionNoWithCollection"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + try { + + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc).process(solrClient); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("The 'withCollection' does not exist")); + } + + CollectionAdminRequest.createCollection(abc, 2, 1) + .process(solrClient); + try { + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc).process(solrClient); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("The `withCollection` must have only one shard, found: 2")); + } + } + + public void testCreateCollection() throws Exception { + String prefix = "testCreateCollection"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<10', 'node':'#ANY'}," + + " ]" + + "}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + solrClient.request(req); + + String chosenNode = cluster.getRandomJetty(random()).getNodeName(); + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz); + assertNotNull(c1); + assertEquals(abc, c1.getStr(WITH_COLLECTION)); + Replica replica = c1.getReplicas().get(0); + String nodeName = replica.getNodeName(); + + assertEquals(chosenNode, nodeName); + } + + @Test + public void testDeleteWithCollection() throws IOException, SolrServerException, InterruptedException { + String prefix = "testDeleteWithCollection"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + CollectionAdminRequest.createCollection(abc, 1, 1) + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + try { + CollectionAdminRequest.deleteCollection(abc).process(solrClient); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("is co-located with collection")); + } + + // delete the co-located collection first + CollectionAdminRequest.deleteCollection(xyz).process(solrClient); + // deleting the with collection should succeed now + CollectionAdminRequest.deleteCollection(abc).process(solrClient); + + xyz = xyz + "_2"; + abc = abc + "_2"; + CollectionAdminRequest.createCollection(abc, 1, 1) + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + // sanity check + try { + CollectionAdminRequest.deleteCollection(abc).process(solrClient); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("is co-located with collection")); + } + + CollectionAdminRequest.modifyCollection(xyz, null) + .unsetAttribute("withCollection") + .process(solrClient); + TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeOut.hasTimedOut()) { + DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz); + if (c1.getStr("withCollection") == null) break; + Thread.sleep(200); + } + DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz); + assertNull(c1.getStr("withCollection")); + CollectionAdminRequest.deleteCollection(abc).process(solrClient); + } + + @Test + public void testAddReplicaSimple() throws Exception { + String prefix = "testAddReplica"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + String chosenNode = cluster.getRandomJetty(random()).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode, abc); + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + String otherNode = null; + for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) { + if (!chosenNode.equals(jettySolrRunner.getNodeName())) { + otherNode = jettySolrRunner.getNodeName(); + } + } + CollectionAdminRequest.addReplicaToShard(xyz, "shard1") + .setNode(otherNode) + .process(solrClient); + DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + + assertTrue(collection.getReplicas().stream().noneMatch(replica -> withCollection.getReplicas(replica.getNodeName()).isEmpty())); + } + + public void testAddReplicaWithPolicy() throws Exception { + String prefix = "testAddReplicaWithPolicy"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<10', 'node':'#ANY'}," + + " {'replica':'<2', 'node':'#ANY'}," + + " ]" + + "}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + solrClient.request(req); + + String chosenNode = cluster.getRandomJetty(random()).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode, abc); + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName()); + +// zkClient().printLayoutToStdOut(); + + CollectionAdminRequest.addReplicaToShard(xyz, "shard1") + .process(solrClient); + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + + assertTrue(collection.getReplicas().stream().noneMatch( + replica -> withCollection.getReplicas(replica.getNodeName()) == null + || withCollection.getReplicas(replica.getNodeName()).isEmpty())); + } + + @Test + public void testMoveReplicaMainCollection() throws Exception { + String prefix = "testMoveReplicaMainCollection"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<10', 'node':'#ANY'}," + + " {'replica':'<2', 'node':'#ANY'}," + + " ]" + + "}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + solrClient.request(req); + + String chosenNode = cluster.getRandomJetty(random()).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode, abc); + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + String otherNode = null; + for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) { + if (!chosenNode.equals(jettySolrRunner.getNodeName())) { + otherNode = jettySolrRunner.getNodeName(); + } + } + + DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + assertNull(collection.getReplicas(otherNode)); // sanity check + assertNull(withCollection.getReplicas(otherNode)); // sanity check + + new CollectionAdminRequest.MoveReplica(xyz, collection.getReplicas().iterator().next().getName(), otherNode) + .process(solrClient); +// zkClient().printLayoutToStdOut(); + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); // refresh + DocCollection withCollectionRefreshed = solrClient.getZkStateReader().getClusterState().getCollection(abc); // refresh + assertTrue(collection.getReplicas().stream().noneMatch( + replica -> withCollectionRefreshed.getReplicas(replica.getNodeName()) == null + || withCollectionRefreshed.getReplicas(replica.getNodeName()).isEmpty())); + } + + @Test + public void testMoveReplicaWithCollection() throws Exception { + String prefix = "testMoveReplicaWithCollection"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<10', 'node':'#ANY'}," + + " {'replica':'<2', 'node':'#ANY'}," + + " ]" + + "}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + solrClient.request(req); + + String chosenNode = cluster.getRandomJetty(random()).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode, abc); + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName()); + + String otherNode = null; + for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) { + if (!chosenNode.equals(jettySolrRunner.getNodeName())) { + otherNode = jettySolrRunner.getNodeName(); + } + } + + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + assertNull(collection.getReplicas(otherNode)); // sanity check + assertNull(withCollection.getReplicas(otherNode)); // sanity check + + try { + new CollectionAdminRequest.MoveReplica(abc, collection.getReplicas().iterator().next().getName(), otherNode) + .process(solrClient); + fail("Expected moving a replica of 'withCollection': " + abc + " to fail"); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("Collection: testMoveReplicaWithCollection_abc is co-located with collection: testMoveReplicaWithCollection_xyz")); + } +// zkClient().printLayoutToStdOut(); + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); // refresh + DocCollection withCollectionRefreshed = solrClient.getZkStateReader().getClusterState().getCollection(abc); // refresh + + // sanity check that the failed move operation didn't actually change our co-location guarantees + assertTrue(collection.getReplicas().stream().noneMatch( + replica -> withCollectionRefreshed.getReplicas(replica.getNodeName()) == null + || withCollectionRefreshed.getReplicas(replica.getNodeName()).isEmpty())); + } + + /** + * Tests that when a new node is added to the cluster and autoscaling framework + * moves replicas to the new node, we maintain all co-locating guarantees + */ + public void testNodeAdded() throws Exception { + String prefix = "testNodeAdded"; + String xyz = prefix + "_xyz"; + String abc = prefix + "_abc"; + + CloudSolrClient solrClient = cluster.getSolrClient(); + + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<10', 'node':'#ANY'}," + + " {'replica':'<2', 'node':'#ANY'}," + + " ]" + + "}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + solrClient.request(req); + + String chosenNode = cluster.getRandomJetty(random()).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode, abc); + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName()); + + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : 'node_added_trigger1'," + + "'event' : 'nodeAdded'," + + "'waitFor' : '0s'," + + "'actions' : [" + + "{'name' : 'compute', 'class' : '" + ComputePlanAction.class.getName() + "'}" + + "{'name' : 'execute', 'class' : '" + ExecutePlanAction.class.getName() + "'}" + + "{'name' : 'compute', 'class' : '" + CapturingAction.class.getName() + "'}" + + "]" + + "}}"; + req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand); + solrClient.request(req); + + Optional other = cluster.getJettySolrRunners() + .stream().filter(j -> !chosenNode.equals(j.getNodeName())).findAny(); + String otherNode = other.orElseThrow(AssertionError::new).getNodeName(); + + // add an extra replica of abc collection on a different node + CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(abc, "shard1") + .setNode(otherNode); + addReplica.setWaitForFinalState(true); + addReplica.process(solrClient); + + // refresh + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + + // sanity check + assertColocated(collection, otherNode, withCollection); + + assertEquals(1, collection.getReplicas().size()); + Replica xyzReplica = collection.getReplicas().get(0); + + // start a new node + JettySolrRunner newNode = cluster.startJettySolrRunner(); + assertTrue("Action was not fired till 30 seconds", LATCH.await(30, TimeUnit.SECONDS)); + // refresh + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + + // sanity check + assertColocated(collection, otherNode, withCollection); + + // assert that the replica of xyz collection was not moved + assertNotNull(collection.getReplica(xyzReplica.getName())); + assertEquals(chosenNode, collection.getReplicas().get(0).getNodeName()); + + // add an extra replica of xyz collection -- this should be placed on the 'otherNode' + addReplica = CollectionAdminRequest.addReplicaToShard(xyz, "shard1"); + addReplica.setWaitForFinalState(true); + addReplica.process(solrClient); + + // refresh + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + + List replicas = collection.getReplicas(otherNode); + assertNotNull(replicas); + assertEquals(1, replicas.size()); + replicas = withCollection.getReplicas(otherNode); + assertNotNull(replicas); + assertEquals(1, replicas.size()); + + // add an extra replica of xyz collection -- this should be placed on the 'newNode' + addReplica = CollectionAdminRequest.addReplicaToShard(xyz, "shard1"); + addReplica.setWaitForFinalState(true); + addReplica.process(solrClient); + + // refresh + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + + assertNotNull(collection.getReplicas(newNode.getNodeName())); + replicas = collection.getReplicas(newNode.getNodeName()); + assertNotNull(replicas); + assertEquals(1, replicas.size()); + replicas = withCollection.getReplicas(newNode.getNodeName()); + assertNotNull(replicas); + assertEquals(1, replicas.size()); + } + + public void testMultipleWithCollections() throws Exception { + String prefix = "testMultipleWithCollections"; + String xyz = prefix + "_xyz"; + String xyz2 = prefix + "_xyz2"; + String abc = prefix + "_abc"; + String abc2 = prefix + "_abc2"; + + // start 2 more nodes so we have 4 in total + cluster.startJettySolrRunner(); + cluster.startJettySolrRunner(); + cluster.waitForAllNodes(30); + + CloudSolrClient solrClient = cluster.getSolrClient(); + + String setClusterPolicyCommand = "{" + + " 'set-cluster-policy': [" + + " {'cores':'<10', 'node':'#ANY'}," + + " {'replica':'<2', 'node':'#ANY'}," + + " ]" + + "}"; + SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand); + solrClient.request(req); + + String chosenNode = cluster.getJettySolrRunner(0).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode, abc); + + CollectionAdminRequest.createCollection(abc, 1, 1) + .setCreateNodeSet(chosenNode) + .process(solrClient); + CollectionAdminRequest.createCollection(xyz, 1, 1) + .setWithCollection(abc) + .process(solrClient); + + String chosenNode2 = cluster.getJettySolrRunner(1).getNodeName(); + log.info("Chosen node {} for collection {}", chosenNode2, abc2); + CollectionAdminRequest.createCollection(abc2, 1, 1) + .setCreateNodeSet(chosenNode2) + .process(solrClient); + CollectionAdminRequest.createCollection(xyz2, 1, 1) + .setWithCollection(abc2) + .process(solrClient); + + // refresh + DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + DocCollection collection2 = solrClient.getZkStateReader().getClusterState().getCollection(xyz2); + DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + DocCollection withCollection2 = solrClient.getZkStateReader().getClusterState().getCollection(abc2); + + // sanity check + assertColocated(collection, chosenNode2, withCollection); // no replica should be on chosenNode2 + assertColocated(collection2, chosenNode, withCollection2); // no replica should be on chosenNode + + String chosenNode3 = cluster.getJettySolrRunner(2).getNodeName(); + CollectionAdminRequest.addReplicaToShard(xyz, "shard1") + .setNode(chosenNode3) + .process(solrClient); + String chosenNode4 = cluster.getJettySolrRunner(2).getNodeName(); + CollectionAdminRequest.addReplicaToShard(xyz2, "shard1") + .setNode(chosenNode4) + .process(solrClient); + + collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); + collection2 = solrClient.getZkStateReader().getClusterState().getCollection(xyz2); + withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc); + withCollection2 = solrClient.getZkStateReader().getClusterState().getCollection(abc2); + + // sanity check + assertColocated(collection, null, withCollection); + assertColocated(collection2, null, withCollection2); + } + + /** + * Asserts that all replicas of collection are colocated with at least one + * replica of the withCollection and none of them should be on the given 'noneOnNode'. + */ + private void assertColocated(DocCollection collection, String noneOnNode, DocCollection withCollection) { + // sanity check + assertTrue(collection.getReplicas().stream().noneMatch( + replica -> withCollection.getReplicas(replica.getNodeName()) == null + || withCollection.getReplicas(replica.getNodeName()).isEmpty())); + + if (noneOnNode != null) { + assertTrue(collection.getReplicas().stream().noneMatch( + replica -> noneOnNode.equals(replica.getNodeName()))); + } + } + + private static CountDownLatch LATCH = new CountDownLatch(1); + public static class CapturingAction extends TriggerActionBase { + @Override + public void process(TriggerEvent event, ActionContext context) throws Exception { + LATCH.countDown(); + } + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java index c4fbf2f5986..6024790a3a1 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.client.solrj.cloud.DistribStateManager; +import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion; @@ -92,6 +93,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION; import static org.apache.solr.common.params.CommonParams.NAME; /** @@ -680,11 +682,37 @@ public class SimClusterStateProvider implements ClusterStateProvider { } boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); List nodeList = new ArrayList<>(); - List shardNames = new ArrayList<>(); final String collectionName = props.getStr(NAME); + + String router = props.getStr("router.name", DocRouter.DEFAULT_NAME); + String policy = props.getStr(Policy.POLICY); + AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(); + boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null; + + // fail fast if parameters are wrong or incomplete + List shardNames = CreateCollectionCmd.populateShardNames(props, router); + CreateCollectionCmd.checkMaxShardsPerNode(props, usePolicyFramework); + CreateCollectionCmd.checkReplicaTypes(props); + // always force getting fresh state collectionsStatesRef.set(null); - ClusterState clusterState = getClusterState(); + final ClusterState clusterState = getClusterState(); + + String withCollection = props.getStr(CollectionAdminParams.WITH_COLLECTION); + String wcShard = null; + if (withCollection != null) { + if (!clusterState.hasCollection(withCollection)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection); + } else { + DocCollection collection = clusterState.getCollection(withCollection); + if (collection.getActiveSlices().size() > 1) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size()); + } + wcShard = collection.getActiveSlices().iterator().next().getName(); + } + } + final String withCollectionShard = wcShard; + ZkWriteCommand cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props); if (cmd.noop) { LOG.warn("Collection {} already exists. exit", collectionName); @@ -710,6 +738,35 @@ public class SimClusterStateProvider implements ClusterStateProvider { final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size()); AtomicInteger replicaNum = new AtomicInteger(1); replicaPositions.forEach(pos -> { + + if (withCollection != null) { + // check that we have a replica of `withCollection` on this node and if not, create one + DocCollection collection = clusterState.getCollection(withCollection); + List replicas = collection.getReplicas(pos.node); + if (replicas == null || replicas.isEmpty()) { + Map replicaProps = new HashMap<>(); + replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node); + replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString()); + String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", withCollection, withCollectionShard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT), + collection.getReplicas().size() + 1); + try { + replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName); + replicaProps.put("SEARCHER.searcher.deletedDocs", 0); + replicaProps.put("SEARCHER.searcher.numDocs", 0); + replicaProps.put("SEARCHER.searcher.maxDoc", 0); + ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0), + coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps); + cloudManager.submit(() -> { + simAddReplica(pos.node, ri, false); + // do not count down the latch here + return true; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + Map replicaProps = new HashMap<>(); replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node); replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString()); @@ -744,6 +801,16 @@ public class SimClusterStateProvider implements ClusterStateProvider { } }); }); + + // modify the `withCollection` and store this new collection's name with it + if (withCollection != null) { + ZkNodeProps message = new ZkNodeProps( + Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(), + ZkStateReader.COLLECTION_PROP, withCollection, + CollectionAdminParams.COLOCATED_WITH, collectionName); + cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message); + } + // force recreation of collection states collectionsStatesRef.set(null); simRunLeaderElection(Collections.singleton(collectionName), true); diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc index c266f964fa2..527f4c7c9c5 100644 --- a/solr/solr-ref-guide/src/collections-api.adoc +++ b/solr/solr-ref-guide/src/collections-api.adoc @@ -112,6 +112,11 @@ Details of the snitch provider. See the section <> for more details. + + === CREATE Response The response will include the status of the request and the new core names. If the status is anything other than "success", an error message will explain why the request failed. @@ -181,6 +186,7 @@ The attributes that can be modified are: * rule * snitch * policy +* withCollection See the <> section above for details on these attributes. diff --git a/solr/solr-ref-guide/src/colocating-collections.adoc b/solr/solr-ref-guide/src/colocating-collections.adoc new file mode 100644 index 00000000000..cc6b5184b6e --- /dev/null +++ b/solr/solr-ref-guide/src/colocating-collections.adoc @@ -0,0 +1,76 @@ += Colocating Collections +:page-toclevels: 1 +:page-tocclass: right +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +Solr provides a way to co-locate a collection with another so that cross-collection joins are always possible. + +The co-location guarantee applies to all future Collection operations made either via Collections API or by Autoscaling +actions. + +A collection may only be colocated with exactly one `withCollection`. However, arbitrarily many collections may be +_linked_ to the same `withCollection`. + +== Create Collection using `withCollection` +The Create Collection API supports a parameter named `withCollection` which can be used to specify a collection +with which the replicas of the newly created collection should be co-located. See <> + +`/admin/collections?action=CREATE&name=techproducts&numShards=1&replicationFactor=2&withCollection=tech_categories` + +In the above example, all replicas of the `techproducts` collection will be co-located on a node with at least one +replica of the `tech_categories` collection. + +== Colocating existing collections +When collections already exist beforehand, the <> can be +used to set the `withCollection` parameter so that the two collections can be linked. This will *not* trigger +changes to the cluster automatically because moving a large number of replicas immediately might de-stabilize the system. +Instead, it is recommended that the Suggestions UI page should be consulted on the operations that can be performed +to change the cluster manually. + +Example: +`/admin/collections?action=MODIFYCOLLECTION&collection=techproducts&withCollection=tech_categories` + +== Deleting colocated collections +Deleting a collection which has been linked to another will fail unless the link itself is deleted first by using the +<> to un-set the `withCollection` attribute. + +Example: +`/admin/collections?action=MODIFYCOLLECTION&collection=techproducts&withCollection=` + +== Limitations and caveats + +The collection being used as the `withCollection` must have one shard only and that shard should be named `shard1`. Note +that when using the default router, the shard name is always set to `shard1` but special care must be taken to name the +shard as `shard1` when using the implicit router. + +In case new replicas of the `withCollection` have to be added to maintain the co-location guarantees then the new replicas +will be of type `NRT` only. Automatically creating replicas of `TLOG` or `PULL` types is not supported. + +In case, replicas have to be moved from one node to another, perhaps in response to a node lost trigger, then the target +nodes will be chosen by preferring nodes that already have a replica of the `withCollection` so that the number of moves +is minimized. However, this also means that unless there are Autoscaling policy violations, Solr will continue to move +such replicas to already loaded nodes instead of preferring empty nodes. Therefore, it is advised to have policy rules +which can prevent such overloading by e.g. setting the maximum number of cores per node to a fixed value. + +Example: +`{'cores' : '<8', 'node' : '#ANY'}` + +The co-location guarantee is one-way only i.e. a collection 'X' co-located with 'Y' will always have one or more +replicas of 'Y' on any node that has a replica of 'X' but the reverse is not true. There may be nodes which have one or +more replicas of 'Y' but no replicas of 'X'. Such replicas of 'Y' will not be considered a violation of co-location +rules and will not be cleaned up automatically. \ No newline at end of file diff --git a/solr/solr-ref-guide/src/solrcloud.adoc b/solr/solr-ref-guide/src/solrcloud.adoc index c0dd1284d50..7e611225434 100644 --- a/solr/solr-ref-guide/src/solrcloud.adoc +++ b/solr/solr-ref-guide/src/solrcloud.adoc @@ -1,5 +1,5 @@ = SolrCloud -:page-children: getting-started-with-solrcloud, how-solrcloud-works, solrcloud-resilience, solrcloud-configuration-and-parameters, rule-based-replica-placement, cross-data-center-replication-cdcr, solrcloud-autoscaling +:page-children: getting-started-with-solrcloud, how-solrcloud-works, solrcloud-resilience, solrcloud-configuration-and-parameters, rule-based-replica-placement, cross-data-center-replication-cdcr, solrcloud-autoscaling, colocating-collections // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -46,3 +46,4 @@ In this section, we'll cover everything you need to know about using Solr in Sol * <> * <> * <> +* <> diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java index 4226d81ba16..2a31e710e73 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java @@ -50,7 +50,7 @@ class AddReplicaSuggester extends Suggester { for (int i = getMatrix().size() - 1; i >= 0; i--) { Row row = getMatrix().get(i); if (!isNodeSuitableForReplicaAddition(row)) continue; - Row tmpRow = row.addReplica(shard.first(), shard.second(), type); + Row tmpRow = row.addReplica(shard.first(), shard.second(), type, strict); List errs = testChangedMatrix(strict, tmpRow.session); if (!containsNewErrors(errs)) { @@ -77,4 +77,4 @@ class AddReplicaSuggester extends Suggester { public CollectionParams.CollectionAction getAction() { return ADDREPLICA; } -} +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java index 87fecda1ea5..8fd815e3ba0 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java @@ -38,7 +38,6 @@ import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; import static java.util.Collections.singletonMap; -import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS; import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL; import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN; import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN; @@ -75,6 +74,17 @@ public class Clause implements MapWriter, Comparable { this.strict = clause.strict; } + // internal use only + Clause(Map original, Condition tag, Condition globalTag, boolean isStrict) { + this.original = original; + this.tag = tag; + this.globalTag = globalTag; + this.globalTag.clause = this; + this.type = null; + this.hasComputedValue = false; + this.strict = isStrict; + } + private Clause(Map m) { this.original = Utils.getDeepCopy(m, 10); String type = (String) m.get("type"); @@ -374,7 +384,7 @@ public class Clause implements MapWriter, Comparable { for (Row r : session.matrix) { SealedClause sealedClause = getSealedClause(computedValueEvaluator); if (!sealedClause.getGlobalTag().isPass(r)) { - ConditionType.CORES.addViolatingReplicas(ctx.reset(null, null, + sealedClause.getGlobalTag().varType.addViolatingReplicas(ctx.reset(null, null, new Violation(sealedClause, null, null, r.node, r.getVal(sealedClause.globalTag.name), sealedClause.globalTag.delta(r.getVal(globalTag.name)), null))); } } @@ -553,21 +563,21 @@ public class Clause implements MapWriter, Comparable { } boolean isPass(Object inputVal) { + return isPass(inputVal, null); + } + + boolean isPass(Object inputVal, Row row) { if (computedType != null) { throw new IllegalStateException("This is supposed to be called only from a Condition with no computed value or a SealedCondition"); } if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(getClause().type); - if (varType == ConditionType.LAZY) { // we don't know the type - return op.match(parseString(val), parseString(inputVal)) == PASS; - } else { - return op.match(val, validate(name, inputVal, false)) == PASS; - } + return varType.match(inputVal, op, val, name, row); } boolean isPass(Row row) { - return isPass(row.getVal(name)); + return isPass(row.getVal(name), row); } @Override diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java index 2843f8ac135..fffaee2c75e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java @@ -17,6 +17,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling; +import java.lang.invoke.MethodHandles; import java.util.Comparator; import java.util.List; @@ -24,10 +25,13 @@ import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA; public class MoveReplicaSuggester extends Suggester { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Override SolrRequest init() { @@ -56,20 +60,28 @@ public class MoveReplicaSuggester extends Suggester { targetRow = session.matrix.get(j); if (targetRow.node.equals(fromRow.node)) continue; if (!isNodeSuitableForReplicaAddition(targetRow)) continue; - targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType());//add replica to target first - Pair pair = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node - if (pair == null) continue;//should not happen - Row srcRowModified = pair.first();//this is the final state of the source row and session + targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType(), strict); // add replica to target first + Row srcRowModified = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node List errs = testChangedMatrix(strict, srcRowModified.session); - srcRowModified.session.applyRules();// now resort the nodes with the new values + srcRowModified.session.applyRules(); // now resort the nodes with the new values Policy.Session tmpSession = srcRowModified.session; + if (!containsNewErrors(errs) && isLessSerious(errs, leastSeriousViolation) && (force || (tmpSession.indexOf(srcRowModified.node) < tmpSession.indexOf(targetRow.node)))) { - leastSeriousViolation = errs; - bestSrcRow = srcRowModified; - sourceReplicaInfo = ri; - bestTargetRow = targetRow; + + int result = -1; + if (!force && srcRowModified.isLive && targetRow.isLive) { + result = tmpSession.getPolicy().clusterPreferences.get(0).compare(srcRowModified, tmpSession.getNode(targetRow.node), true); + if (result == 0) result = tmpSession.getPolicy().clusterPreferences.get(0).compare(srcRowModified, tmpSession.getNode(targetRow.node), false); + } + + if (result <= 0) { + leastSeriousViolation = errs; + bestSrcRow = srcRowModified; + sourceReplicaInfo = ri; + bestTargetRow = targetRow; + } } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java index a5f57bb9483..879bb74d50f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java @@ -44,6 +44,7 @@ import org.apache.solr.common.IteratorWriter; import org.apache.solr.common.MapWriter; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.rule.ImplicitSnitch; +import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.StrUtils; @@ -55,6 +56,8 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; +import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.NODE; +import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION; /*The class that reads, parses and applies policies specified in * autoscaling.json @@ -74,7 +77,7 @@ public class Policy implements MapWriter { public static final String POLICIES = "policies"; public static final String CLUSTER_POLICY = "cluster-policy"; public static final String CLUSTER_PREFERENCES = "cluster-preferences"; - public static final Set GLOBAL_ONLY_TAGS = Collections.singleton("cores"); + public static final Set GLOBAL_ONLY_TAGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("cores", CollectionAdminParams.WITH_COLLECTION))); public static final List DEFAULT_PREFERENCES = Collections.unmodifiableList( Arrays.asList( new Preference((Map) Utils.fromJSONString("{minimize : cores, precision:1}")), @@ -131,9 +134,12 @@ public class Policy implements MapWriter { this.policies = Collections.unmodifiableMap( policiesFromMap((Map>>) jsonMap.getOrDefault(POLICIES, emptyMap()), newParams)); - this.params = Collections.unmodifiableList(newParams.stream() + List> params = newParams.stream() .map(s -> new Pair<>(s, Suggestion.getTagType(s))) - .collect(toList())); + .collect(toList()); + //let this be there always, there is no extra cost + params.add(new Pair<>(WITH_COLLECTION.tagName, WITH_COLLECTION)); + this.params = Collections.unmodifiableList(params); perReplicaAttributes = readPerReplicaAttrs(); } @@ -501,6 +507,21 @@ public class Policy implements MapWriter { .filter(clause -> !clause.isPerCollectiontag()) .collect(Collectors.toList()); + if (nodes.size() > 0) { + //if any collection has 'withCollection' irrespective of the node, the NodeStateProvider returns a map value + Map vals = nodeStateProvider.getNodeValues(nodes.get(0), Collections.singleton("withCollection")); + if (!vals.isEmpty() && vals.get("withCollection") != null) { + Map withCollMap = (Map) vals.get("withCollection"); + if (!withCollMap.isEmpty()) { + Clause withCollClause = new Clause((Map)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") , + new Clause.Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null), + new Clause.Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true + ); + expandedClauses.add(withCollClause); + } + } + } + ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider(); for (String c : collections) { addClausesForCollection(stateProvider, c); @@ -594,4 +615,15 @@ public class Policy implements MapWriter { throw new RuntimeException("NO such node found " + node); } } + static final Map validatetypes = new HashMap<>(); + static { + for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) + validatetypes.put(t.tagName, t); + } + public static ConditionType getTagType(String name) { + ConditionType info = validatetypes.get(name); + if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = ConditionType.STRING; + if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = ConditionType.LAZY; + return info; + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java index a965bf836bf..ed2a3956fc9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java @@ -93,7 +93,7 @@ public class PolicyHelper { if (autoScalingConfig != null) { return new DelegatingDistribStateManager(null) { @Override - public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException { + public AutoScalingConfig getAutoScalingConfig() { return autoScalingConfig; } }; @@ -135,7 +135,7 @@ public class PolicyHelper { } } } catch (IOException e) { - /*ignore*/ + log.warn("Exception while reading disk free metric values for nodes to be used for collection: " + collName, e); } @@ -178,7 +178,7 @@ public class PolicyHelper { } - public static final int SESSION_EXPIRY = 180;//3 seconds + public static final int SESSION_EXPIRY = 180; // 3 minutes public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) { Policy.Session session = policy.createSession(cloudManager); @@ -230,7 +230,7 @@ public class PolicyHelper { /**Use this to dump the state of a system and to generate a testcase */ public static void logState(SolrCloudManager cloudManager, Suggester suggester) { - if(log.isTraceEnabled()) { + if (log.isTraceEnabled()) { log.trace("LOGSTATE: {}", Utils.toJSONString((MapWriter) ew -> { ew.put("liveNodes", cloudManager.getClusterStateProvider().getLiveNodes()); @@ -249,9 +249,9 @@ public class PolicyHelper { public enum Status { NULL, - //it is just created and not yet used or all operations on it has been competed fully + //it is just created and not yet used or all operations on it has been completed fully UNUSED, - COMPUTING, EXECUTING; + COMPUTING, EXECUTING } /** @@ -265,7 +265,7 @@ public class PolicyHelper { */ static class SessionRef { private final Object lockObj = new Object(); - private SessionWrapper sessionWrapper = SessionWrapper.DEF_INST; + private SessionWrapper sessionWrapper = SessionWrapper.DEFAULT_INSTANCE; public SessionRef() { @@ -286,7 +286,7 @@ public class PolicyHelper { synchronized (lockObj) { if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) { log.debug("session set to NULL"); - this.sessionWrapper = SessionWrapper.DEF_INST; + this.sessionWrapper = SessionWrapper.DEFAULT_INSTANCE; } // else somebody created a new session b/c of expiry . So no need to do anything about it } } @@ -311,7 +311,7 @@ public class PolicyHelper { //one thread who is waiting for this need to be notified. lockObj.notify(); } else { - log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime); + log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime); //else just ignore it } } @@ -343,7 +343,7 @@ public class PolicyHelper { } log.debug("out of waiting curr-time:{} time-elapsed {}", time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS)); // now this thread has woken up because it got timed out after 10 seconds or it is notified after - //the session was returned from another COMPUTING operation + // the session was returned from another COMPUTING operation if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) { log.debug("Wait over. reusing the existing session "); this.sessionWrapper.status = Status.COMPUTING; @@ -401,12 +401,12 @@ public class PolicyHelper { public static class SessionWrapper { - public static final SessionWrapper DEF_INST = new SessionWrapper(null, null); + public static final SessionWrapper DEFAULT_INSTANCE = new SessionWrapper(null, null); static { - DEF_INST.status = Status.NULL; - DEF_INST.createTime = -1l; - DEF_INST.lastUpdateTime = -1l; + DEFAULT_INSTANCE.status = Status.NULL; + DEFAULT_INSTANCE.createTime = -1L; + DEFAULT_INSTANCE.lastUpdateTime = -1L; } private long createTime; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java index c00a90ebd32..8a14abd83ba 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java @@ -18,10 +18,12 @@ package org.apache.solr.client.solrj.cloud.autoscaling; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; @@ -33,6 +35,8 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.solr.common.params.CoreAdminParams.NODE; @@ -40,6 +44,8 @@ import static org.apache.solr.common.params.CoreAdminParams.NODE; * Each instance represents a node in the cluster */ public class Row implements MapWriter { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public final String node; final Cell[] cells; //this holds the details of each replica in the node @@ -109,6 +115,14 @@ public class Row implements MapWriter { return jsonStr(); } + public Row addReplica(String coll, String shard, Replica.Type type) { + return addReplica(coll, shard, type, 0, true); + } + + public Row addReplica(String coll, String shard, Replica.Type type, boolean strictMode) { + return addReplica(coll, shard, type, 0, strictMode); + } + /** * this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica , * values of certain attributes will be modified, in this node as well as other nodes. Please note that @@ -117,9 +131,18 @@ public class Row implements MapWriter { * @param coll collection name * @param shard shard name * @param type replica type + * @param recursionCount the number of times we have recursed to add more replicas + * @param strictMode whether suggester is operating in strict mode or not */ - public Row addReplica(String coll, String shard, Replica.Type type) { - Row row = session.copy().getNode(this.node); + Row addReplica(String coll, String shard, Replica.Type type, int recursionCount, boolean strictMode) { + if (recursionCount > 3) { + log.error("more than 3 levels of recursion ", new RuntimeException()); + return this; + } + List furtherOps = new LinkedList<>(); + Consumer opCollector = it -> furtherOps.add(it); + Row row = null; + row = session.copy().getNode(this.node); if (row == null) throw new RuntimeException("couldn't get a row"); Map> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>()); List replicas = c.computeIfAbsent(shard, k -> new ArrayList<>()); @@ -128,12 +151,37 @@ public class Row implements MapWriter { Utils.makeMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString())); replicas.add(ri); for (Cell cell : row.cells) { - cell.type.projectAddReplica(cell, ri); + cell.type.projectAddReplica(cell, ri, opCollector, strictMode); } + for (OperationInfo op : furtherOps) { + if (op.isAdd) { + row = row.session.getNode(op.node).addReplica(op.coll, op.shard, op.type, recursionCount + 1, strictMode); + } else { + row.session.getNode(op.node).removeReplica(op.coll, op.shard, op.type, recursionCount+1); + } + } + return row; } + static class OperationInfo { + final String coll, shard, node, cellName; + final boolean isAdd;// true =addReplica, false=removeReplica + final Replica.Type type; + + + OperationInfo(String coll, String shard, String node, String cellName, boolean isAdd, Replica.Type type) { + this.coll = coll; + this.shard = shard; + this.node = node; + this.cellName = cellName; + this.isAdd = isAdd; + this.type = type; + } + } + + public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) { Map> c = collectionVsShardVsReplicas.get(coll); if (c == null) return null; @@ -150,9 +198,18 @@ public class Row implements MapWriter { if (idx == -1) return null; return r.get(idx); } + public Row removeReplica(String coll, String shard, Replica.Type type) { + return removeReplica(coll,shard, type, 0); + } // this simulates removing a replica from a node - public Pair removeReplica(String coll, String shard, Replica.Type type) { + public Row removeReplica(String coll, String shard, Replica.Type type, int recursionCount) { + if (recursionCount > 3) { + log.error("more than 3 levels of recursion ", new RuntimeException()); + return this; + } + List furtherOps = new LinkedList<>(); + Consumer opCollector = it -> furtherOps.add(it); Row row = session.copy().getNode(this.node); Map> c = row.collectionVsShardVsReplicas.get(coll); if (c == null) return null; @@ -169,9 +226,9 @@ public class Row implements MapWriter { if (idx == -1) return null; ReplicaInfo removed = r.remove(idx); for (Cell cell : row.cells) { - cell.type.projectRemoveReplica(cell, removed); + cell.type.projectRemoveReplica(cell, removed, opCollector); } - return new Pair(row, removed); + return row; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java index c201aa30823..67721ba3618 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java @@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; @@ -34,13 +36,18 @@ import java.util.function.Predicate; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.impl.ClusterStateProvider; import org.apache.solr.common.MapWriter; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; /* A suggester is capable of suggesting a collection operation * given a particular session. Before it suggests a new operation, @@ -50,6 +57,8 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Conditio * */ public abstract class Suggester implements MapWriter { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected final EnumMap hints = new EnumMap<>(Hint.class); Policy.Session session; SolrRequest operation; @@ -94,34 +103,40 @@ public abstract class Suggester implements MapWriter { abstract SolrRequest init(); - + @SuppressWarnings("unchecked") public SolrRequest getSuggestion() { if (!isInitialized) { Set collections = (Set) hints.getOrDefault(Hint.COLL, Collections.emptySet()); Set> s = (Set>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet()); if (!collections.isEmpty() || !s.isEmpty()) { - HashSet> shards = new HashSet<>(s); - collections.stream().forEach(c -> shards.add(new Pair<>(c, null))); - ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider(); - for (Pair shard : shards) { - // if this is not a known collection from the existing clusterstate, - // then add it - if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) { - session.addClausesForCollection(stateProvider, shard.first()); + HashSet> collectionShardPairs = new HashSet<>(s); + collections.forEach(c -> collectionShardPairs.add(new Pair<>(c, null))); + collections.forEach(c -> { + try { + getWithCollection(c).ifPresent(withCollection -> collectionShardPairs.add(new Pair<>(withCollection, null))); + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Exception while fetching 'withCollection' attribute for collection: " + c, e); } - for (Row row : session.matrix) { - Map> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>()); - if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>()); + }); + s.forEach(kv -> { + try { + getWithCollection(kv.first()).ifPresent(withCollection -> collectionShardPairs.add(new Pair<>(withCollection, null))); + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Exception while fetching 'withCollection' attribute for collection: " + kv.first(), e); } - } + }); + setupCollection(collectionShardPairs); Collections.sort(session.expandedClauses); } Set srcNodes = (Set) hints.get(Hint.SRC_NODE); if (srcNodes != null && !srcNodes.isEmpty()) { // the source node is dead so live nodes may not have it for (String srcNode : srcNodes) { - if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) + if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) { session.matrix.add(new Row(srcNode, session.getPolicy().params, session.getPolicy().perReplicaAttributes, session)); + } } } session.applyRules(); @@ -135,6 +150,30 @@ public abstract class Suggester implements MapWriter { return operation; } + protected Optional getWithCollection(String collectionName) throws IOException { + DocCollection collection = session.cloudManager.getClusterStateProvider().getCollection(collectionName); + if (collection != null) { + return Optional.ofNullable(collection.getStr(WITH_COLLECTION)); + } else { + return Optional.empty(); + } + } + + private void setupCollection(HashSet> collectionShardPairs) { + ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider(); + for (Pair shard : collectionShardPairs) { + // if this is not a known collection from the existing clusterstate, + // then add it + if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) { + session.addClausesForCollection(stateProvider, shard.first()); + } + for (Row row : session.matrix) { + Map> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>()); + if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>()); + } + } + } + public Policy.Session getSession() { return session; } @@ -355,4 +394,67 @@ public abstract class Suggester implements MapWriter { ew.put("action", String.valueOf(getAction())); ew.put("hints", (MapWriter) ew1 -> hints.forEach((hint, o) -> ew1.putNoEx(hint.toString(), o))); } + + protected Collection setupWithCollectionTargetNodes(Set collections, Set> s, String withCollection) { + Collection originalTargetNodesCopy = null; + if (withCollection != null) { + if (log.isDebugEnabled()) { + HashSet set = new HashSet<>(collections); + s.forEach(kv -> set.add(kv.first())); + log.debug("Identified withCollection = {} for collection: {}", withCollection, set); + } + + originalTargetNodesCopy = Utils.getDeepCopy((Collection) hints.get(Hint.TARGET_NODE), 10, true); + + Set withCollectionNodes = new HashSet<>(); + + for (Row row : getMatrix()) { + row.forEachReplica(r -> { + if (withCollection.equals(r.getCollection()) && + "shard1".equals(r.getShard())) { + withCollectionNodes.add(r.getNode()); + } + }); + } + + if (originalTargetNodesCopy != null && !originalTargetNodesCopy.isEmpty()) { + // find intersection of the set of target nodes with the set of 'withCollection' nodes + Set set = (Set) hints.computeIfAbsent(Hint.TARGET_NODE, h -> new HashSet<>()); + set.retainAll(withCollectionNodes); + if (set.isEmpty()) { + // no nodes common between the sets, we have no choice but to restore the original target node hint + hints.put(Hint.TARGET_NODE, originalTargetNodesCopy); + } + } else if (originalTargetNodesCopy == null) { + hints.put(Hint.TARGET_NODE, withCollectionNodes); + } + } + return originalTargetNodesCopy; + } + + protected String findWithCollection(Set collections, Set> s) { + List withCollections = new ArrayList<>(1); + collections.forEach(c -> { + try { + getWithCollection(c).ifPresent(withCollections::add); + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Exception while fetching 'withCollection' attribute for collection: " + c, e); + } + }); + s.forEach(kv -> { + try { + getWithCollection(kv.first()).ifPresent(withCollections::add); + } catch (IOException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Exception while fetching 'withCollection' attribute for collection: " + kv.first(), e); + } + }); + + if (withCollections.size() > 1) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "The number of 'withCollection' attributes should be exactly 1 for any policy but found: " + withCollections); + } + return withCollections.isEmpty() ? null : withCollections.get(0); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java index a481a4070c8..af20fac96b7 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java @@ -25,12 +25,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -44,13 +43,14 @@ import org.apache.solr.common.util.StrUtils; import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; +import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.parseString; import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY; import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA; public class Suggestion { public static final String coreidxsize = "INDEX.sizeInGB"; - static final Map validatetypes = new HashMap<>(); + private static final String NULL = ""; @Target(ElementType.FIELD) @@ -82,14 +82,13 @@ public class Suggestion { String metricsKey() default NULL; + Class implementation() default void.class; + ComputedType[] computedValues() default ComputedType.NULL; } public static ConditionType getTagType(String name) { - ConditionType info = validatetypes.get(name); - if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = ConditionType.STRING; - if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = ConditionType.LAZY; - return info; + return Policy.getTagType(name); } private static Object getOperandAdjustedValue(Object val, Object original) { @@ -160,7 +159,9 @@ public class Suggestion { /** * Type details of each variable in policies */ - public enum ConditionType { + public enum ConditionType implements VarType { + @Meta(name = "withCollection", type = String.class, isNodeSpecificVal = true, implementation = WithCollectionVarType.class) + WITH_COLLECTION(), @Meta(name = "collection", type = String.class) @@ -375,7 +376,7 @@ public class Suggestion { //When a replica is added, freedisk should be incremented @Override - public void projectAddReplica(Cell cell, ReplicaInfo ri) { + public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer ops, boolean strictMode) { //go through other replicas of this shard and copy the index size value into this for (Row row : cell.getRow().session.matrix) { row.forEachReplica(replicaInfo -> { @@ -395,7 +396,7 @@ public class Suggestion { } @Override - public void projectRemoveReplica(Cell cell, ReplicaInfo ri) { + public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer opCollector) { Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false); if (idxSize == null) return; Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val; @@ -464,12 +465,12 @@ public class Suggestion { } @Override - public void projectAddReplica(Cell cell, ReplicaInfo ri) { + public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer ops, boolean strictMode) { cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1; } @Override - public void projectRemoveReplica(Cell cell, ReplicaInfo ri) { + public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer opCollector) { cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1; } }, @@ -525,7 +526,12 @@ public class Suggestion { LAZY() { @Override public Object validate(String name, Object val, boolean isRuleVal) { - return Clause.parseString(val); + return parseString(val); + } + + @Override + public boolean match(Object inputVal, Operand op, Object val, String name, Row row) { + return op.match(parseString(val), parseString(inputVal)) == Clause.TestStatus.PASS; } @Override @@ -543,6 +549,8 @@ public class Suggestion { public void getSuggestions(SuggestionCtx ctx) { perNodeSuggestions(ctx); } + + }; public final String tagName; @@ -558,6 +566,7 @@ public class Suggestion { public final Set associatedPerNodeValues; public final String metricsAttribute; public final Set supportedComputedTypes; + private final VarType impl; ConditionType() { @@ -569,6 +578,15 @@ public class Suggestion { } catch (NoSuchFieldException e) { //cannot happen } + if (meta.implementation() != void.class) { + try { + impl = (VarType) meta.implementation().newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate: " + meta.implementation().getName()); + } + } else { + impl = null; + } this.tagName = meta.name(); this.type = meta.type(); @@ -583,6 +601,7 @@ public class Suggestion { emptySet() : unmodifiableSet(new HashSet(Arrays.asList(meta.computedValues()))); this.wildCards = readSet(meta.wildCards()); + } public String getTagName() { @@ -603,11 +622,21 @@ public class Suggestion { return unmodifiableSet(new HashSet<>(Arrays.asList(vals))); } + @Override public void getSuggestions(SuggestionCtx ctx) { + if (impl != null) { + impl.getSuggestions(ctx); + return; + } perNodeSuggestions(ctx); } + @Override public void addViolatingReplicas(ViolationCtx ctx) { + if (impl != null) { + impl.addViolatingReplicas(ctx); + return; + } for (Row row : ctx.allRows) { if (ctx.clause.tag.varType.meta.isNodeSpecificVal() && !row.node.equals(ctx.tagKey)) continue; collectViolatingReplicas(ctx, row); @@ -669,21 +698,35 @@ public class Suggestion { /** * Simulate a replica addition to a node in the cluster */ - public void projectAddReplica(Cell cell, ReplicaInfo ri) { + public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer opCollector, boolean strictMode) { + if (impl != null) impl.projectAddReplica(cell, ri, opCollector, strictMode); } - public void projectRemoveReplica(Cell cell, ReplicaInfo ri) { + public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer opCollector) { + if (impl != null) { + impl.projectRemoveReplica(cell, ri, opCollector); + } } + @Override public int compareViolation(Violation v1, Violation v2) { + if (impl != null) return impl.compareViolation(v1, v2); if (v2.replicaCountDelta == null || v1.replicaCountDelta == null) return 0; if (Math.abs(v1.replicaCountDelta) == Math.abs(v2.replicaCountDelta)) return 0; return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1; } + @Override public Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard, String node) { + if (impl != null) return impl.computeValue(session, condition, collection, shard, node); return condition.val; } + + @Override + public boolean match(Object inputVal, Operand op, Object val, String name, Row row) { + if (impl != null) return impl.match(inputVal, op, val, name, row); + return op.match(val, validate(name, inputVal, false)) == Clause.TestStatus.PASS; + } } private static void collectViolatingReplicas(ViolationCtx ctx, Row row) { @@ -757,9 +800,4 @@ public class Suggestion { } } - static { - for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t); - } - - } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java new file mode 100644 index 00000000000..00224a95192 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.cloud.autoscaling; + +import java.util.function.Consumer; + +/** + * A Variable Type used in Autoscaling policy rules + */ +public interface VarType { + boolean match(Object inputVal, Operand op, Object val, String name, Row row); + + void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer opCollector, boolean strictMode); + + void addViolatingReplicas(Suggestion.ViolationCtx ctx); + + default void getSuggestions(Suggestion.SuggestionCtx ctx) { + } + + default Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard, String node) { + return condition.val; + } + + int compareViolation(Violation v1, Violation v2); + + default void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer opCollector) { + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java index d4ab0e6e321..7b0f0f3a5c9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java @@ -30,7 +30,7 @@ import org.apache.solr.common.util.Utils; public class Violation implements MapWriter { final String shard, coll, node; final Object actualVal; - final Double replicaCountDelta;//how many extra replicas + Double replicaCountDelta;//how many extra replicas final Object tagKey; private final int hash; private final Clause clause; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java new file mode 100644 index 00000000000..989a0875c77 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.cloud.autoscaling; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.util.Pair; + +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA; + +/** + * Implements the 'withCollection' variable type + */ +public class WithCollectionVarType implements VarType { + @Override + public boolean match(Object inputVal, Operand op, Object val, String name, Row row) { + Map withCollectionMap = (Map) inputVal; + if (withCollectionMap == null || withCollectionMap.isEmpty()) return true; + + Set uniqueColls = new HashSet<>(); + row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection())); + + for (Map.Entry e : withCollectionMap.entrySet()) { + if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue())) return false; + } + + return true; + } + + public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer opCollector, boolean strictMode) { + if (strictMode) { + // we do not want to add a replica of the 'withCollection' in strict mode + return; + } + + Map withCollectionMap = (Map) cell.val; + if (withCollectionMap == null || withCollectionMap.isEmpty()) return; + + Set uniqueColls = new HashSet<>(); + Row row = cell.row; + row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection())); + + for (Map.Entry e : withCollectionMap.entrySet()) { + if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue())) { + String withCollection = e.getValue(); + + opCollector.accept(new Row.OperationInfo(withCollection, "shard1", row.node, cell.name, true, Replica.Type.NRT)); + } + } + } + + @Override + public int compareViolation(Violation v1, Violation v2) { + return Integer.compare(v1.getViolatingReplicas().size(), v2.getViolatingReplicas().size()); + } + + public void addViolatingReplicas(Suggestion.ViolationCtx ctx) { + String node = ctx.currentViolation.node; + for (Row row : ctx.allRows) { + if (node.equals(row.node)) { + Map withCollectionMap = (Map) row.getVal("withCollection"); + if (withCollectionMap != null) { + row.forEachReplica(r -> { + String withCollection = withCollectionMap.get(r.getCollection()); + if (withCollection != null) { + // test whether this row has at least 1 replica of withCollection, else there is a violation + Set uniqueCollections = new HashSet<>(); + row.forEachReplica(replicaInfo -> uniqueCollections.add(replicaInfo.getCollection())); + if (!uniqueCollections.contains(withCollection)) { + ctx.currentViolation.addReplica(new Violation.ReplicaInfoAndErr(r).withDelta(1.0d)); + } + } + }); + ctx.currentViolation.replicaCountDelta = (double) ctx.currentViolation.getViolatingReplicas().size(); + } + } + } + } + + @Override + public void getSuggestions(Suggestion.SuggestionCtx ctx) { + if (ctx.violation.getViolatingReplicas().isEmpty()) return; + + Map nodeValues = ctx.session.nodeStateProvider.getNodeValues(ctx.violation.node, Collections.singleton("withCollection")); + Map withCollectionsMap = (Map) nodeValues.get("withCollection"); + if (withCollectionsMap == null) return; + + Set uniqueCollections = new HashSet<>(); + for (Violation.ReplicaInfoAndErr replicaInfoAndErr : ctx.violation.getViolatingReplicas()) { + uniqueCollections.add(replicaInfoAndErr.replicaInfo.getCollection()); + } + + collectionLoop: + for (String collection : uniqueCollections) { + String withCollection = withCollectionsMap.get(collection); + if (withCollection == null) continue; + + // can we find a node from which we can move a replica of the `withCollection` + // without creating another violation? + for (Row row : ctx.session.matrix) { + if (ctx.violation.node.equals(row.node)) continue; // filter the violating node + + Set hostedCollections = new HashSet<>(); + row.forEachReplica(replicaInfo -> hostedCollections.add(replicaInfo.getCollection())); + + if (hostedCollections.contains(withCollection) && !hostedCollections.contains(collection)) { + // find the candidate replicas that we can move + List movableReplicas = new ArrayList<>(); + row.forEachReplica(replicaInfo -> { + if (replicaInfo.getCollection().equals(withCollection)) { + movableReplicas.add(replicaInfo); + } + }); + + for (ReplicaInfo toMove : movableReplicas) { + // candidate source node for a move replica operation + Suggester suggester = ctx.session.getSuggester(MOVEREPLICA) + .forceOperation(true) + .hint(Suggester.Hint.COLL_SHARD, new Pair<>(withCollection, "shard1")) + .hint(Suggester.Hint.SRC_NODE, row.node) + .hint(Suggester.Hint.REPLICA, toMove.getName()) + .hint(Suggester.Hint.TARGET_NODE, ctx.violation.node); + if (ctx.addSuggestion(suggester) != null) + continue collectionLoop; // one suggestion is enough for this collection + } + } + } + + // we could not find a valid move, so we suggest adding a replica + Suggester suggester = ctx.session.getSuggester(ADDREPLICA) + .forceOperation(true) + .hint(Suggester.Hint.COLL_SHARD, new Pair<>(withCollection, "shard1")) + .hint(Suggester.Hint.TARGET_NODE, ctx.violation.node); + ctx.addSuggestion(suggester); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java index a7701029572..2015b52391f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java @@ -45,6 +45,7 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.cloud.rule.SnitchContext; +import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -60,6 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.METRICS_PREFIX; import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK; import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.TOTALDISK; +import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION; /** * @@ -75,6 +77,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter protected final Map>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>(); private Map snitchSession = new HashMap<>(); private Map nodeVsTags = new HashMap<>(); + private Map withCollectionsMap = new HashMap<>(); public SolrClientNodeStateProvider(CloudSolrClient solrClient) { this.solrClient = solrClient; @@ -100,6 +103,9 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter all.forEach((collName, ref) -> { DocCollection coll = ref.get(); if (coll == null) return; + if (coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION) != null) { + withCollectionsMap.put(coll.getName(), (String) coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION)); + } coll.forEachReplica((shard, replica) -> { Map>> nodeData = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(replica.getNodeName(), k -> new HashMap<>()); Map> collData = nodeData.computeIfAbsent(collName, k -> new HashMap<>()); @@ -114,13 +120,15 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter // ew.put("liveNodes", liveNodes); ew.put("replicaInfo", Utils.getDeepCopy(nodeVsCollectionVsShardVsReplicaInfo, 5)); ew.put("nodeValues", nodeVsTags); - } @Override public Map getNodeValues(String node, Collection tags) { Map tagVals = fetchTagValues(node, tags); nodeVsTags.put(node, tagVals); + if (tags.contains(WITH_COLLECTION.tagName)) { + tagVals.put(WITH_COLLECTION.tagName, withCollectionsMap); + } return tagVals; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 301cbc83bf8..8d362960b91 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -59,9 +59,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; +import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM; import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; /** * This class is experimental and subject to change. @@ -80,7 +82,9 @@ public abstract class CollectionAdminRequest MAX_SHARDS_PER_NODE, AUTO_ADD_REPLICAS, POLICY, - COLL_CONF); + COLL_CONF, + WITH_COLLECTION, + COLOCATED_WITH); protected final CollectionAction action; @@ -417,10 +421,11 @@ public abstract class CollectionAdminRequest protected Integer pullReplicas; protected Integer tlogReplicas; - private Properties properties; + protected Properties properties; protected Boolean autoAddReplicas; protected Integer stateFormat; - private String[] rule , snitch; + protected String[] rule , snitch; + protected String withCollection; /** Constructor intended for typical use cases */ protected Create(String collection, String config, Integer numShards, Integer numNrtReplicas, Integer numTlogReplicas, Integer numPullReplicas) { // TODO: maybe add other constructors @@ -557,6 +562,7 @@ public abstract class CollectionAdminRequest if (rule != null) params.set(DocCollection.RULE, rule); if (snitch != null) params.set(DocCollection.SNITCH, snitch); params.setNonNull(POLICY, policy); + params.setNonNull(WITH_COLLECTION, withCollection); return params; } @@ -564,6 +570,15 @@ public abstract class CollectionAdminRequest this.policy = policy; return this; } + + public String getWithCollection() { + return withCollection; + } + + public Create setWithCollection(String withCollection) { + this.withCollection = withCollection; + return this; + } } /** diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 4c12d9c639b..411fe568205 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.BiPredicate; import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.common.SolrException; @@ -174,6 +175,9 @@ public class DocCollection extends ZkNodeProps implements Iterable { return slices.get(sliceName); } + /** + * @param consumer consume shardName vs. replica + */ public void forEachReplica(BiConsumer consumer) { slices.forEach((shard, slice) -> slice.getReplicasMap().forEach((s, replica) -> consumer.accept(shard, replica))); } @@ -321,7 +325,22 @@ public class DocCollection extends ZkNodeProps implements Iterable { } return replicas; } - + + /** + * @param predicate test against shardName vs. replica + * @return the first replica that matches the predicate + */ + public Replica getReplica(BiPredicate predicate) { + final Replica[] result = new Replica[1]; + forEachReplica((s, replica) -> { + if (result[0] != null) return; + if (predicate.test(s, replica)) { + result[0] = replica; + } + }); + return result[0]; + } + public List getReplicas(EnumSet s) { List replicas = new ArrayList<>(); for (Slice slice : this) { diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java index 6153eb13617..a0ef11f4d68 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java @@ -79,4 +79,15 @@ public interface CollectionAdminParams { * The name of the config set to be used for a collection */ String COLL_CONF = "collection.configName"; + + /** + * The name of the collection with which a collection is to be co-located + */ + String WITH_COLLECTION = "withCollection"; + + /** + * The reverse-link to WITH_COLLECTION flag. It is stored in the cluster state of the `withCollection` + * and points to the collection on which the `withCollection` was specified. + */ + String COLOCATED_WITH = "COLOCATED_WITH"; } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java index feae38b8403..16addd4dd04 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java @@ -166,6 +166,564 @@ public class TestPolicy extends SolrTestCaseJ4 { return result; } + + public void testWithCollection() { + String clusterStateStr = "{" + + " 'comments_coll':{" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards':{}," + + " 'withCollection' :'articles_coll'" + + " }," + + " 'articles_coll': {" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards': {" + + " 'shard1': {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'," + + " 'leader': 'true'" + + " }," + + " 'r2': {" + + " 'core': 'r2'," + + " 'base_url': 'http://10.0.0.4:7574/solr'," + + " 'node_name': 'node2'," + + " 'state': 'active'" + + " }" + + " }" + + " }" + + " }" + + " }" + + "}"; + ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8), + ImmutableSet.of("node1", "node2", "node3", "node4", "node5")); + DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) { + @Override + public ClusterState getClusterState() throws IOException { + return clusterState; + } + + @Override + public Set getLiveNodes() { + return clusterState.getLiveNodes(); + } + }; + + SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) { + @Override + protected Map fetchTagValues(String node, Collection tags) { + Map result = new HashMap<>(); + AtomicInteger cores = new AtomicInteger(); + forEachReplica(node, replicaInfo -> cores.incrementAndGet()); + if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get()); + if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100); + return result; + } + + @Override + protected Map fetchReplicaMetrics(String solrNode, Map> metricsKeyVsTagReplica) { + //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes + Map result = new HashMap<>(); + metricsKeyVsTagReplica.forEach((k, v) -> { + if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100); + }); + return result; + } + + @Override + protected ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + }; + Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection")); + assertNotNull(m.get("withCollection")); + + Map policies = (Map) Utils.fromJSONString("{" + + " 'cluster-preferences': [" + + " { 'minimize': 'cores'}," + + " { 'maximize': 'freedisk', 'precision': 50}" + + " ]," + + " 'cluster-policy': [" + + " { 'replica': 0, 'nodeRole': 'overseer'}" + + " { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," + + " ]" + + "}"); + AutoScalingConfig config = new AutoScalingConfig(policies); + Policy policy = config.getPolicy(); + Policy.Session session = policy.createSession(new DelegatingCloudManager(null) { + @Override + public ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + + @Override + public NodeStateProvider getNodeStateProvider() { + return solrClientNodeStateProvider; + } + }); + Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA); + suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1")); + SolrRequest op = suggester.getSuggestion(); + assertNotNull(op); + Set nodes = new HashSet<>(2); + nodes.add(op.getParams().get("node")); + session = suggester.getSession(); + suggester = session.getSuggester(ADDREPLICA); + suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1")); + op = suggester.getSuggestion(); + assertNotNull(op); + nodes.add(op.getParams().get("node")); + assertEquals(2, nodes.size()); + assertTrue("node1 should have been selected by add replica", nodes.contains("node1")); + assertTrue("node2 should have been selected by add replica", nodes.contains("node2")); + + session = suggester.getSession(); + suggester = session.getSuggester(MOVEREPLICA); + suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1")); + op = suggester.getSuggestion(); + assertNull(op); + } + + public void testWithCollectionSuggestions() { + String clusterStateStr = "{" + + " 'articles_coll':{" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards':{'shard1':{}}," + + " }," + + " 'comments_coll': {" + + " 'withCollection' :'articles_coll'," + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards': {" + + " 'shard1': {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'," + + " 'leader': 'true'" + + " }," + + " 'r2': {" + + " 'core': 'r2'," + + " 'base_url': 'http://10.0.0.4:7574/solr'," + + " 'node_name': 'node2'," + + " 'state': 'active'" + + " }" + + " }" + + " }" + + " }" + + " }" + + "}"; + ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8), + ImmutableSet.of("node1", "node2", "node3", "node4", "node5")); + DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) { + @Override + public ClusterState getClusterState() throws IOException { + return clusterState; + } + + @Override + public Set getLiveNodes() { + return clusterState.getLiveNodes(); + } + }; + + SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) { + @Override + protected Map fetchTagValues(String node, Collection tags) { + Map result = new HashMap<>(); + AtomicInteger cores = new AtomicInteger(); + forEachReplica(node, replicaInfo -> cores.incrementAndGet()); + if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get()); + if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100); + return result; + } + + @Override + protected Map fetchReplicaMetrics(String solrNode, Map> metricsKeyVsTagReplica) { + //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes + Map result = new HashMap<>(); + metricsKeyVsTagReplica.forEach((k, v) -> { + if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100); + }); + return result; + } + + @Override + protected ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + }; + Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection")); + assertNotNull(m.get("withCollection")); + + Map policies = (Map) Utils.fromJSONString("{" + + " 'cluster-preferences': [" + + " { 'maximize': 'freedisk', 'precision': 50}," + + " { 'minimize': 'cores'}" + + " ]," + + " 'cluster-policy': [" + + " { 'replica': 0, 'nodeRole': 'overseer'}" + + " { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," + + " ]" + + "}"); + + List l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies), + new DelegatingCloudManager(null) { + @Override + public ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + + @Override + public NodeStateProvider getNodeStateProvider() { + return solrClientNodeStateProvider; + } + }); + assertNotNull(l); + assertEquals(2, l.size()); + + // collect the set of nodes to which replicas are being added + Set nodes = new HashSet<>(2); + + m = l.get(0).toMap(new LinkedHashMap<>()); + assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta")); + assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method")); + assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(m, true, "operation/path")); + assertNotNull(Utils.getObjectByPath(m, false, "operation/command/add-replica")); + nodes.add((String) Utils.getObjectByPath(m, true, "operation/command/add-replica/node")); + + m = l.get(1).toMap(new LinkedHashMap<>()); + assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta")); + assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method")); + assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(m, true, "operation/path")); + assertNotNull(Utils.getObjectByPath(m, false, "operation/command/add-replica")); + nodes.add((String) Utils.getObjectByPath(m, true, "operation/command/add-replica/node")); + + assertEquals(2, nodes.size()); + assertTrue(nodes.contains("node1")); + assertTrue(nodes.contains("node2")); + } + + public void testWithCollectionMoveVsAddSuggestions() { + String clusterStateStr = "{" + + " 'articles_coll':{" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards': {" + + " 'shard1': {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'," + + " 'leader': 'true'" + + " }," + + " 'r2': {" + + " 'core': 'r2'," + + " 'base_url': 'http://10.0.0.4:7574/solr'," + + " 'node_name': 'node2'," + + " 'state': 'active'" + + " }," + + " 'r3': {" + + " 'core': 'r3'," + + " 'base_url': 'http://10.0.0.4:7579/solr'," + + " 'node_name': 'node6'," + + " 'state': 'active'" + + " }" + + " }" + + " }" + + " }" + + " }," + + " 'comments_coll': {" + + " 'withCollection' :'articles_coll'," + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards': {" + + " 'shard1': {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:7576/solr'," + + " 'node_name': 'node3'," + + " 'state': 'active'," + + " 'leader': 'true'" + + " }," + + " 'r2': {" + + " 'core': 'r2'," + + " 'base_url': 'http://10.0.0.4:7577/solr'," + + " 'node_name': 'node4'," + + " 'state': 'active'" + + " }," + + " 'r3': {" + + " 'core': 'r3'," + + " 'base_url': 'http://10.0.0.4:7578/solr'," + + " 'node_name': 'node5'," + + " 'state': 'active'" + + " }," + + " 'r4': {" + + " 'core': 'r4'," + + " 'base_url': 'http://10.0.0.4:7579/solr'," + + " 'node_name': 'node6'," + + " 'state': 'active'" + + " }" + + " }" + + " }" + + " }" + + " }" + + "}"; + ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8), + ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6")); + DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) { + @Override + public ClusterState getClusterState() throws IOException { + return clusterState; + } + + @Override + public Set getLiveNodes() { + return clusterState.getLiveNodes(); + } + }; + + SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) { + @Override + protected Map fetchTagValues(String node, Collection tags) { + Map result = new HashMap<>(); + AtomicInteger cores = new AtomicInteger(); + forEachReplica(node, replicaInfo -> cores.incrementAndGet()); + if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get()); + if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100); + return result; + } + + @Override + protected Map fetchReplicaMetrics(String solrNode, Map> metricsKeyVsTagReplica) { + //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes + Map result = new HashMap<>(); + metricsKeyVsTagReplica.forEach((k, v) -> { + if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100); + }); + return result; + } + + @Override + protected ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + }; + Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection")); + assertNotNull(m.get("withCollection")); + + Map policies = (Map) Utils.fromJSONString("{" + + " 'cluster-preferences': [" + + " { 'maximize': 'freedisk', 'precision': 50}," + + " { 'minimize': 'cores'}" + + " ]," + + " 'cluster-policy': [" + + " { 'replica': 0, 'nodeRole': 'overseer'}" + + " { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," + + " ]" + + "}"); + + List l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies), + new DelegatingCloudManager(null) { + @Override + public ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + + @Override + public NodeStateProvider getNodeStateProvider() { + return solrClientNodeStateProvider; + } + }); + assertNotNull(l); + assertEquals(3, l.size()); + + // collect the set of nodes to which replicas are being added + Set nodes = new HashSet<>(2); + + int numMoves = 0, numAdds = 0; + Set addNodes = new HashSet<>(); + Set targetNodes = new HashSet<>(); + Set movedReplicas = new HashSet<>(); + for (Suggester.SuggestionInfo suggestionInfo : l) { + Map s = suggestionInfo.toMap(new LinkedHashMap<>()); + assertEquals("POST", Utils.getObjectByPath(s, true, "operation/method")); + if (Utils.getObjectByPath(s, false, "operation/command/add-replica") != null) { + numAdds++; + assertEquals(1.0d, Utils.getObjectByPath(s, true, "violation/violation/delta")); + assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(s, true, "operation/path")); + addNodes.add((String) Utils.getObjectByPath(s, true, "operation/command/add-replica/node")); + } else if (Utils.getObjectByPath(s, false, "operation/command/move-replica") != null) { + numMoves++; + assertEquals("/c/articles_coll", Utils.getObjectByPath(s, true, "operation/path")); + targetNodes.add((String) Utils.getObjectByPath(s, true, "operation/command/move-replica/targetNode")); + movedReplicas.add((String) Utils.getObjectByPath(s, true, "operation/command/move-replica/replica")); + } else { + fail("Unexpected operation type suggested for suggestion: " + suggestionInfo); + } + } + + assertEquals(2, targetNodes.size()); + assertEquals(1, addNodes.size()); + assertEquals(2, movedReplicas.size()); + Set allTargetNodes = new HashSet<>(targetNodes); + allTargetNodes.addAll(addNodes); + assertEquals(3, allTargetNodes.size()); + assertTrue(allTargetNodes.contains("node3")); + assertTrue(allTargetNodes.contains("node4")); + assertTrue(allTargetNodes.contains("node5")); + } + + public void testWithCollectionMoveReplica() { + String clusterStateStr = "{" + + " 'comments_coll':{" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards':{" + + " 'shard1' : {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'," + + " 'leader': 'true'" + + " }" + + " }" + + " }" + + " }," + + " 'withCollection' :'articles_coll'" + + " }," + + " 'articles_coll': {" + + " 'router': {" + + " 'name': 'compositeId'" + + " }," + + " 'shards': {" + + " 'shard1': {" + + " 'range': '80000000-ffffffff'," + + " 'replicas': {" + + " 'r1': {" + + " 'core': 'r1'," + + " 'base_url': 'http://10.0.0.4:8983/solr'," + + " 'node_name': 'node1'," + + " 'state': 'active'," + + " 'leader': 'true'" + + " }," + + " 'r2': {" + + " 'core': 'r2'," + + " 'base_url': 'http://10.0.0.4:7574/solr'," + + " 'node_name': 'node2'," + + " 'state': 'active'" + + " }" + + " }" + + " }" + + " }" + + " }" + + "}"; + ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8), + ImmutableSet.of("node2", "node3", "node4", "node5")); + DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) { + @Override + public ClusterState getClusterState() throws IOException { + return clusterState; + } + + @Override + public Set getLiveNodes() { + return clusterState.getLiveNodes(); + } + }; + + SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) { + @Override + protected Map fetchTagValues(String node, Collection tags) { + Map result = new HashMap<>(); + AtomicInteger cores = new AtomicInteger(); + forEachReplica(node, replicaInfo -> cores.incrementAndGet()); + if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get()); + if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100); + return result; + } + + @Override + protected Map fetchReplicaMetrics(String solrNode, Map> metricsKeyVsTagReplica) { + //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes + Map result = new HashMap<>(); + metricsKeyVsTagReplica.forEach((k, v) -> { + if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100); + }); + return result; + } + + @Override + protected ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + }; + Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection")); + assertNotNull(m.get("withCollection")); + + Map policies = (Map) Utils.fromJSONString("{" + + " 'cluster-preferences': [" + + " { 'minimize': 'cores'}," + + " { 'maximize': 'freedisk', 'precision': 50}" + + " ]," + + " 'cluster-policy': [" + + " { 'replica': 0, 'nodeRole': 'overseer'}" + + " { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," + + " ]" + + "}"); + AutoScalingConfig config = new AutoScalingConfig(policies); + Policy policy = config.getPolicy(); + Policy.Session session = policy.createSession(new DelegatingCloudManager(null) { + @Override + public ClusterStateProvider getClusterStateProvider() { + return clusterStateProvider; + } + + @Override + public NodeStateProvider getNodeStateProvider() { + return solrClientNodeStateProvider; + } + }); + Suggester suggester = session.getSuggester(CollectionAction.MOVEREPLICA); + suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1")); + suggester.hint(Hint.SRC_NODE, "node1"); + SolrRequest op = suggester.getSuggestion(); + assertNotNull(op); + assertEquals("node2 should have been selected by move replica","node2", + op.getParams().get("targetNode")); + + session = suggester.getSession(); + suggester = session.getSuggester(MOVEREPLICA); + suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1")); + suggester.hint(Hint.SRC_NODE, "node1"); + op = suggester.getSuggestion(); + assertNull(op); + } + public void testValidate() { expectError("replica", -1, "must be greater than"); expectError("replica", "hello", "not a valid number"); @@ -1228,7 +1786,7 @@ public class TestPolicy extends SolrTestCaseJ4 { assertTrue(session.getPolicy() == config.getPolicy()); assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING); sessionWrapper.release(); - assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST); + assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE); PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager); assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime()); PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1]; @@ -1256,9 +1814,9 @@ public class TestPolicy extends SolrTestCaseJ4 { assertEquals(2, s1.getRefCount()); s2[0].release(); - assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST); + assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE); s1.release(); - assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST); + assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE); } @@ -1479,7 +2037,7 @@ public class TestPolicy extends SolrTestCaseJ4 { assertEquals("node2", op.getNode()); } - private SolrCloudManager getSolrCloudManager(final Map nodeValues, String clusterState) { + private SolrCloudManager getSolrCloudManager(final Map nodeValues, String clusterS) { return new SolrCloudManager() { ObjectCache objectCache = new ObjectCache(); @@ -1521,7 +2079,7 @@ public class TestPolicy extends SolrTestCaseJ4 { @Override public Map>> getReplicaInfo(String node, Collection keys) { - return getReplicaDetails(node, clusterState); + return getReplicaDetails(node, clusterS); } }; }