SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node.

A collection may be co-located with another collection during collection creation time by specifying a
'withCollection' parameter. It can also be co-located afterwards by using the modify collection API.
The co-location guarantee is enforced regardless of future cluster operations whether they are invoked
manually via the Collection API or automatically by the Autoscaling framework.

Squashed commit of the following:

commit 3827703b38c598f1247c90ab57d3d640ab3a9e21
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 11:54:10 2018 +0530

    SOLR-11990: Added change log entry

commit 7977222e07ba47274062cb8d8a69e7956d644000
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 11:52:17 2018 +0530

    SOLR-11990: Added change log entry

commit 1857075fdb9d535b6149ad4369fed8b64b0c01f6
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 11:49:51 2018 +0530

    SOLR-11990: Added note about co-location guarantees being one way only

commit 8557cbc8a511f21d1fcad99e11ea9d2104d0bef4
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 10:43:37 2018 +0530

    SOLR-11990: Remove unused import

commit 864b013fd744edca9b6b84a8a7573fab3c5310d5
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 10:21:59 2018 +0530

    SOLR-11990: Fixing compilation issues after merging master

commit dd840a2f7e765ee96c899d4d9ea89b6b67c5ae62
Merge: bb4ffb3 828d281
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 10:03:50 2018 +0530

    Merge branch 'master' into jira/solr-11990

    # Conflicts:
    #	solr/solr-ref-guide/src/collections-api.adoc
    #	solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
    #	solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java

commit bb4ffb32c4960a2809ac8927e214e1e012204a73
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 27 14:09:44 2018 +0530

    SOLR-11990: Ensure that the suggestion are validated by the policy engine otherwise move to the next candidate replica or the next candidate node

commit a97d45b22f9c232e939f979502c761001be9ae24
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 27 13:22:10 2018 +0530

    SOLR-11990: Autoscaling suggestions for withCollection violations should prefer moving replicas before adding replicas

commit 7b5a84338dfe7335599a5e96aff2d26cb4eeaac6
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 27 12:22:45 2018 +0530

    SOLR-11990: Fix statement about the behavior of the modify collection API when modifying the withCollection parameter

commit 63aec4fe0de7025c16b6ebc47dad1004531ecee1
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Thu Jul 26 07:29:07 2018 +0530

    SOLR-11990: Added new page to the reference guide describing how to colocate collections together including guarantees and limitations

commit 6bfcd0786bb30353de9c26a01ec97ce3191b58f8
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 21:42:25 2018 +0530

    SOLR-11990: Added another test which creates two collections which are colocated with two different collections and ensures that create collection and add replica operations work correctly

commit 4cead778f0044b6fb4012b085abf7b60350f495b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 21:07:47 2018 +0530

    SOLR-11990: Stop or start jettys in test setup to ensure that we always have exactly 2 replicas running before a test starts

commit 70dbfd042c2164fcd76d406eeab1518e4d3147fb
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 19:19:07 2018 +0530

    SOLR-11990: Added description of the new withCollection parameter in the reference guide

commit 9d8260852b9d667d4d8e026432fd7727b7789393
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 19:16:46 2018 +0530

    SOLR-11990: Reset count down latch during test setup

commit ae508165571b1afde54337859b8d5fdbb1d67312
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 15:43:54 2018 +0530

    SOLR-11990: Add support for withCollection in simulated create collection API

commit 84f026b8c4cc25edb548430b8f5ad09d2486b3b5
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 17:21:33 2018 +0530

    SOLR-11990: Ported the refactoring made in CreateCollectionCmd to the simulated version so that simulation tests are able to create collections correctly

commit defe111c9d31c8e4f0f00b4f2f3c875f5b2fa602
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 16:17:52 2018 +0530

    SOLR-11990: Add missing javadoc for return statement

commit 8e47d5bc4545548c5441909c3fcc1a7901b38185
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 16:11:45 2018 +0530

    SOLR-11990: Replace usage of forbidden Charsets with StandardCharsets class

commit 2d1b9eb25ea96a3a42c000ae654400ed44c17554
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 16:07:36 2018 +0530

    SOLR-11990: Extract ConditionType to an interface VarType along with a WithCollectionVarType implementation

commit 1de2a4f52a59afca28de75bfa5156a3d6567a4f5
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 12:53:26 2018 +0530

    SOLR-11990: Pass strict-ness parameter to the ConditionType so that WITH_COLLECTION can choose not to project add replica in strict mode.

    This ensures that add replica or move replica suggesters always choose nodes that already have withCollection replicas first unless there are violations in doing so. Only if the first pass fails to find a suitable replica, do we go to the other nodes in the cluster. This also removes the need for the majority of changes in AddReplicaSuggester and so they've been reverted.

commit 0d616ed9e9bad791548c87086cba7760d724350d
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 11:36:34 2018 +0530

    SOLR-11990: Minor changes to formatting and code comments

commit 1228538f934f35f15797d89c2c66f2deb9cddd8c
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Mon Jul 23 14:26:19 2018 +0530

    SOLR-11990: Added a test which simulates a lost node and asserts that move replica suggester moves the replica on the lost node to a node already having the withCollection present

commit 582f1fd98de93ab73c74a1f623749dd031beb381
Author: Noble Paul <noble@apache.org>
Date:   Mon Jul 23 18:35:22 2018 +1000

    SOLR-11990: NPE removing unnecessary System.out.println

commit 501bc6c1d066321b344bbb8b1de3c2ead52f8c49
Author: Noble Paul <noble@apache.org>
Date:   Mon Jul 23 18:31:07 2018 +1000

    SOLR-11990: NPE during class init

commit acbf4a69321e16cff11cc7cf0a1f076fd9ac0037
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Mon Jul 23 13:55:30 2018 +0530

    SOLR-11990: Added asserts on the nodes that should be selected by the add replica suggester

commit 4824933fd6eb7d1773acbff1a1a0c5e670226e0b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 20 14:30:52 2018 +0530

    SOLR-11990: Added WITH_COLLECTION to global tags. Fixed implementation of addViolatingReplicas and getSuggestions in the clause impl. Added more asserts in testWithCollectionSuggestions.

commit dbadb33211c190026e08d8e3ea587b6f8df8720b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 20 13:44:36 2018 +0530

    SOLR-11990: Added support for comparing violations, generating suggestions and adding violating replicas

commit ada1f17d5c93a4186260473e4822d2bee1da0e16
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 18 19:14:56 2018 +0530

    SOLR-11990: Fix mock node state provider in TestPolicy to use the right cluster state. Added nocommits to ensure that we return the right suggestions for this feature.

commit ef2d61812e0d96eb2275b3411906d9de57ab835e
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 18 18:39:51 2018 +0530

    SOLR-11990: Add missing node in nodeValues configuration

commit 34841fc01fea4a9f1e6a9f64050e576f2247a72b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 18 16:32:57 2018 +0530

    SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node
This commit is contained in:
Shalin Shekhar Mangar 2018-07-29 07:26:13 +05:30
parent 828d2815f1
commit 179c8f9b48
29 changed files with 2134 additions and 164 deletions

View File

@ -123,6 +123,12 @@ New Features
* SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties (noble)
* SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node. A collection may be
co-located with another collection during collection creation time by specifying a 'withCollection' parameter. It can
also be co-located afterwards by using the modify collection API. The co-location guarantee is enforced regardless of
future cluster operations whether they are invoked manually via the Collection API or by the Autoscaling framework.
(noble, shalin)
Bug Fixes
----------------------

View File

@ -23,15 +23,16 @@ import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.Overseer;
@ -43,6 +44,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@ -52,11 +54,12 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
@ -79,6 +82,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws IOException, InterruptedException {
log.debug("addReplica() : {}", Utils.toJSONString(message));
String collectionName = message.getStr(COLLECTION_PROP);
DocCollection coll = clusterState.getCollection(collectionName);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final String asyncId = message.getStr(ASYNC);
@ -86,9 +93,6 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
String collection = message.getStr(COLLECTION_PROP);
DocCollection coll = clusterState.getCollection(collection);
String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
@ -97,6 +101,27 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
boolean parallel = message.getBool("parallel", false);
if (coll.getStr(WITH_COLLECTION) != null) {
String withCollectionName = coll.getStr(WITH_COLLECTION);
DocCollection withCollection = clusterState.getCollection(withCollectionName);
if (withCollection.getActiveSlices().size() > 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + withCollection.getActiveSlices().size());
}
String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName();
List<Replica> replicas = withCollection.getReplicas(node);
if (replicas == null || replicas.isEmpty()) {
// create a replica of withCollection on the identified node before proceeding further
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP, withCollectionName,
ZkStateReader.SHARD_ID_PROP, withCollectionShard,
"node", node,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
addReplica(clusterState, props, results, null);
}
}
ModifiableSolrParams params = new ModifiableSolrParams();
ZkStateReader zkStateReader = ocmh.zkStateReader;
@ -104,7 +129,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (!skipCreateReplicaInClusterState) {
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
@ -121,10 +146,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreName)).get(coreName).getName());
}
String configName = zkStateReader.readConfigName(collection);
String configName = zkStateReader.readConfigName(collectionName);
String routeKey = message.getStr(ShardParams._ROUTE_);
String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
String ulogDir = message.getStr(CoreAdminParams.ULOG_DIR);
@ -133,7 +158,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
params.set(CoreAdminParams.NAME, coreName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collection);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
if (shard != null) {
params.set(CoreAdminParams.SHARD, shard);
@ -172,7 +197,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
ocmh.waitForCoreNodeName(collectionName, fnode, fcoreName);
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
@ -182,15 +207,15 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), latch);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, Collections.singletonList(coreName), latch);
try {
zkStateReader.registerCollectionStateWatcher(collection, watcher);
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
runnable.run();
if (!latch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collection, watcher);
zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
}
} else {
runnable.run();
@ -201,7 +226,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
return new ZkNodeProps(
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, node

View File

@ -32,14 +32,14 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
@ -47,6 +47,7 @@ import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
@ -73,13 +74,14 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
@ -106,26 +108,48 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
String withCollection = message.getStr(CollectionAdminParams.WITH_COLLECTION);
String withCollectionShard = null;
if (withCollection != null) {
if (!clusterState.hasCollection(withCollection)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection);
} else {
DocCollection collection = clusterState.getCollection(withCollection);
if (collection.getActiveSlices().size() > 1) {
throw new SolrException(ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size());
}
withCollectionShard = collection.getActiveSlices().iterator().next().getName();
}
}
String configName = getConfigName(collectionName, message);
if (configName == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
}
ocmh.validateConfigOrThrowSolrException(configName);
List<String> nodeList = new ArrayList<>();
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
String policy = message.getStr(Policy.POLICY);
AutoScalingConfig autoScalingConfig = ocmh.cloudManager.getDistribStateManager().getAutoScalingConfig();
boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
// fail fast if parameters are wrong or incomplete
List<String> shardNames = populateShardNames(message, router);
checkMaxShardsPerNode(message, usePolicyFramework);
checkReplicaTypes(message);
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
try {
final String async = message.getStr(ASYNC);
List<String> nodeList = new ArrayList<>();
List<String> shardNames = new ArrayList<>();
List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
nodeList, shardNames, sessionWrapper);
ZkStateReader zkStateReader = ocmh.zkStateReader;
boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
Map<String,String> collectionParams = new HashMap<>();
Map<String,Object> collectionProps = message.getProperties();
@ -134,12 +158,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), (String) collectionProps.get(propName));
}
}
createCollectionZkNode(stateManager, collectionName, collectionParams);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
// wait for a while until we don't see the collection
// wait for a while until we see the collection
TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean created = false;
while (! waitUntil.hasTimedOut()) {
@ -147,8 +171,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
if(created) break;
}
if (!created)
if (!created) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
}
List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
nodeList, shardNames, sessionWrapper);
if (nodeList.isEmpty()) {
log.debug("Finished create command for collection: {}", collectionName);
@ -165,6 +193,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
if (withCollection != null) {
// check that we have a replica of `withCollection` on this node and if not, create one
DocCollection collection = clusterState.getCollection(withCollection);
List<Replica> replicas = collection.getReplicas(nodeName);
if (replicas == null || replicas.isEmpty()) {
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
ZkStateReader.COLLECTION_PROP, withCollection,
ZkStateReader.SHARD_ID_PROP, withCollectionShard,
"node", nodeName,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
new AddReplicaCmd(ocmh).call(clusterState, props, results);
clusterState = zkStateReader.getClusterState(); // refresh
}
}
String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
replicaPosition.shard, replicaPosition.type, true);
@ -251,6 +296,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
+ " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
}
}
// modify the `withCollection` and store this new collection's name with it
if (withCollection != null) {
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
ZkStateReader.COLLECTION_PROP, withCollection,
CollectionAdminParams.COLOCATED_WITH, collectionName);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
}
} catch (SolrException ex) {
throw ex;
} catch (Exception ex) {
@ -274,29 +329,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
String policy = message.getStr(Policy.POLICY);
boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
if(ImplicitDocRouter.NAME.equals(router)){
ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
numSlices = shardNames.size();
} else {
if (numSlices == null ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
}
if (numSlices <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
}
ClusterStateMutator.getShardNames(numSlices, shardNames);
}
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
}
if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
if (numNrtReplicas + numTlogReplicas <= 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
Integer numSlices = shardNames.size();
int maxShardsPerNode = checkMaxShardsPerNode(message, usePolicyFramework);
// we need to look at every node and see how many cores it serves
// add our new cores to existing nodes serving the least number of cores
@ -343,6 +377,43 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
return replicaPositions;
}
public static int checkMaxShardsPerNode(ZkNodeProps message, boolean usePolicyFramework) {
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
}
if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
return maxShardsPerNode;
}
public static void checkReplicaTypes(ZkNodeProps message) {
int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas > 0 ? 0 : 1));
if (numNrtReplicas + numTlogReplicas <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
}
public static List<String> populateShardNames(ZkNodeProps message, String router) {
List<String> shardNames = new ArrayList<>();
Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
if (ImplicitDocRouter.NAME.equals(router)) {
ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
numSlices = shardNames.size();
} else {
if (numSlices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
}
if (numSlices <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
}
ClusterStateMutator.getShardNames(numSlices, shardNames);
}
return shardNames;
}
String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
String configName = message.getStr(COLL_CONF);
@ -370,7 +441,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
return "".equals(configName)? null: configName;
}
/**
* Copies the _default configset to the specified configset name (overwrites if pre-existing)
*/
@ -476,7 +547,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
}
private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException,
KeeperException, InterruptedException {
// check for configName

View File

@ -31,6 +31,7 @@ import org.apache.solr.common.NonExistentCoreException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -49,6 +50,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@ -69,6 +72,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader zkStateReader = ocmh.zkStateReader;
checkNotReferencedByAlias(zkStateReader, collection);
checkNotColocatedWith(zkStateReader, collection);
final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true);
@ -181,4 +185,21 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
.map(Map.Entry::getKey) // alias name
.findFirst().orElse(null);
}
private void checkNotColocatedWith(ZkStateReader zkStateReader, String collection) throws Exception {
DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
if (docCollection != null) {
String colocatedWith = docCollection.getStr(COLOCATED_WITH);
if (colocatedWith != null) {
DocCollection colocatedCollection = zkStateReader.getClusterState().getCollectionOrNull(colocatedWith);
if (colocatedCollection != null && collection.equals(colocatedCollection.getStr(WITH_COLLECTION))) {
// todo how do we clean up if reverse-link is not present?
// can't delete this collection because it is still co-located with another collection
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " is co-located with collection: " + colocatedWith
+ " remove the link using modify collection API or delete the co-located collection: " + colocatedWith);
}
}
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
@ -104,19 +105,34 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (shardId == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
}
Slice slice = clusterState.getCollection(collection).getSlice(shardId);
List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
// this picks up a single random replica from the sourceNode
for (Replica r : slice.getReplicas()) {
if (r.getNodeName().equals(sourceNode)) {
replica = r;
}
}
if (replica == null) {
Slice slice = coll.getSlice(shardId);
List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas(r -> sourceNode.equals(r.getNodeName())));
if (sliceReplicas.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
}
Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
replica = sliceReplicas.iterator().next();
}
if (coll.getStr(CollectionAdminParams.COLOCATED_WITH) != null) {
// we must ensure that moving this replica does not cause the co-location to break
String sourceNode = replica.getNodeName();
String colocatedCollectionName = coll.getStr(CollectionAdminParams.COLOCATED_WITH);
DocCollection colocatedCollection = clusterState.getCollectionOrNull(colocatedCollectionName);
if (colocatedCollection != null) {
if (colocatedCollection.getReplica((s, r) -> sourceNode.equals(r.getNodeName())) != null) {
// check if we have at least two replicas of the collection on the source node
// only then it is okay to move one out to another node
List<Replica> replicasOnSourceNode = coll.getReplicas(replica.getNodeName());
if (replicasOnSourceNode == null || replicasOnSourceNode.size() < 2) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " is co-located with collection: " + colocatedCollectionName
+ " and has a single replica: " + replica.getName() + " on node: " + replica.getNodeName()
+ " so it is not possible to move it to another node");
}
}
}
}
log.info("Replica will be moved to node {}: {}", targetNode, replica);
@ -151,7 +167,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
);
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
NamedList deleteResult = new NamedList();
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
if (deleteResult.get("failure") != null) {
@ -256,7 +272,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
" on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
log.warn(errorString);
results.add("failure", errorString);
if (watcher != null) { // unregister

View File

@ -105,6 +105,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@ -149,7 +151,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ZkStateReader.AUTO_ADD_REPLICAS, "false",
DocCollection.RULE, null,
POLICY, null,
SNITCH, null));
SNITCH, null,
WITH_COLLECTION, null,
COLOCATED_WITH, null));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

View File

@ -28,11 +28,11 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester;
import org.apache.solr.common.SolrException;
@ -89,8 +89,8 @@ public class ComputePlanAction extends TriggerActionBase {
log.trace("-- state: {}", clusterState);
}
try {
Suggester intialSuggester = getSuggester(session, event, context, cloudManager);
Suggester suggester = intialSuggester;
Suggester initialSuggester = getSuggester(session, event, context, cloudManager);
Suggester suggester = initialSuggester;
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
int requestedOperations = getRequestedNumOps(event);
if (requestedOperations > maxOperations) {
@ -119,13 +119,13 @@ public class ComputePlanAction extends TriggerActionBase {
// unless a specific number of ops was requested
// uncomment the following to log too many operations
/*if (opCount > 10) {
PolicyHelper.logState(cloudManager, intialSuggester);
PolicyHelper.logState(cloudManager, initialSuggester);
}*/
if (operation == null) {
if (requestedOperations < 0) {
//uncomment the following to log zero operations
// PolicyHelper.logState(cloudManager, intialSuggester);
// PolicyHelper.logState(cloudManager, initialSuggester);
break;
} else {
log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try.");
@ -225,6 +225,7 @@ public class ComputePlanAction extends TriggerActionBase {
for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
suggester = suggester.hint(e.getKey(), e.getValue());
}
suggester = suggester.forceOperation(true);
start++;
event.getProperties().put(START, start);
break;

View File

@ -104,7 +104,6 @@ import static org.apache.solr.client.solrj.response.RequestStatusState.NOT_FOUND
import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING;
import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED;
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
@ -137,9 +136,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_NAME;
import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_VALUE;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
@ -480,11 +481,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
TLOG_REPLICAS,
NRT_REPLICAS,
POLICY,
WAIT_FOR_FINAL_STATE);
WAIT_FOR_FINAL_STATE,
WITH_COLLECTION);
if (props.get(STATE_FORMAT) == null) {
props.put(STATE_FORMAT, "2");
}
props.putIfAbsent(STATE_FORMAT, "2");
if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) {
//TODO: Remove this in 8.0 . Keep this for SolrJ client back-compat. See SOLR-11676 for more details

View File

@ -0,0 +1,611 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
/**
* Tests for co-locating a collection with another collection such that any Collection API
* always ensures that the co-location is never broken.
*
* See SOLR-11990 for more details.
*/
@LogLevel("org.apache.solr.cloud.autoscaling=TRACE;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
public class TestWithCollection extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static SolrCloudManager cloudManager;
private static final int NUM_JETTIES = 2;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(NUM_JETTIES)
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@Override
public void setUp() throws Exception {
super.setUp();
if (zkClient().exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true)) {
zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(StandardCharsets.UTF_8), true);
}
ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
for (Map.Entry<String, ClusterState.CollectionRef> entry : clusterState.getCollectionStates().entrySet()) {
if (entry.getKey().contains("_xyz")) {
try {
CollectionAdminRequest.deleteCollection(entry.getKey()).process(cluster.getSolrClient());
} catch (Exception e) {
log.error("Exception while deleting collection: " + entry.getKey());
}
}
}
cluster.deleteAllCollections();
cluster.getSolrClient().setDefaultCollection(null);
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
LATCH = new CountDownLatch(1);
int jettys = cluster.getJettySolrRunners().size();
if (jettys < NUM_JETTIES) {
for (int i = jettys; i < NUM_JETTIES; i++) {
cluster.startJettySolrRunner();
}
} else {
for (int i = jettys; i > NUM_JETTIES; i--) {
cluster.stopJettySolrRunner(i - 1);
}
}
}
private void deleteChildrenRecursively(String path) throws Exception {
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
}
@Test
public void testCreateCollectionNoWithCollection() throws IOException, SolrServerException {
String prefix = "testCreateCollectionNoWithCollection";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
try {
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc).process(solrClient);
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("The 'withCollection' does not exist"));
}
CollectionAdminRequest.createCollection(abc, 2, 1)
.process(solrClient);
try {
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc).process(solrClient);
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("The `withCollection` must have only one shard, found: 2"));
}
}
public void testCreateCollection() throws Exception {
String prefix = "testCreateCollection";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
solrClient.request(req);
String chosenNode = cluster.getRandomJetty(random()).getNodeName();
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz);
assertNotNull(c1);
assertEquals(abc, c1.getStr(WITH_COLLECTION));
Replica replica = c1.getReplicas().get(0);
String nodeName = replica.getNodeName();
assertEquals(chosenNode, nodeName);
}
@Test
public void testDeleteWithCollection() throws IOException, SolrServerException, InterruptedException {
String prefix = "testDeleteWithCollection";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
CollectionAdminRequest.createCollection(abc, 1, 1)
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
try {
CollectionAdminRequest.deleteCollection(abc).process(solrClient);
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("is co-located with collection"));
}
// delete the co-located collection first
CollectionAdminRequest.deleteCollection(xyz).process(solrClient);
// deleting the with collection should succeed now
CollectionAdminRequest.deleteCollection(abc).process(solrClient);
xyz = xyz + "_2";
abc = abc + "_2";
CollectionAdminRequest.createCollection(abc, 1, 1)
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
// sanity check
try {
CollectionAdminRequest.deleteCollection(abc).process(solrClient);
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("is co-located with collection"));
}
CollectionAdminRequest.modifyCollection(xyz, null)
.unsetAttribute("withCollection")
.process(solrClient);
TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz);
if (c1.getStr("withCollection") == null) break;
Thread.sleep(200);
}
DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz);
assertNull(c1.getStr("withCollection"));
CollectionAdminRequest.deleteCollection(abc).process(solrClient);
}
@Test
public void testAddReplicaSimple() throws Exception {
String prefix = "testAddReplica";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
String chosenNode = cluster.getRandomJetty(random()).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode, abc);
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
String otherNode = null;
for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
if (!chosenNode.equals(jettySolrRunner.getNodeName())) {
otherNode = jettySolrRunner.getNodeName();
}
}
CollectionAdminRequest.addReplicaToShard(xyz, "shard1")
.setNode(otherNode)
.process(solrClient);
DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
assertTrue(collection.getReplicas().stream().noneMatch(replica -> withCollection.getReplicas(replica.getNodeName()).isEmpty()));
}
public void testAddReplicaWithPolicy() throws Exception {
String prefix = "testAddReplicaWithPolicy";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'node':'#ANY'}," +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
solrClient.request(req);
String chosenNode = cluster.getRandomJetty(random()).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode, abc);
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName());
// zkClient().printLayoutToStdOut();
CollectionAdminRequest.addReplicaToShard(xyz, "shard1")
.process(solrClient);
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
assertTrue(collection.getReplicas().stream().noneMatch(
replica -> withCollection.getReplicas(replica.getNodeName()) == null
|| withCollection.getReplicas(replica.getNodeName()).isEmpty()));
}
@Test
public void testMoveReplicaMainCollection() throws Exception {
String prefix = "testMoveReplicaMainCollection";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'node':'#ANY'}," +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
solrClient.request(req);
String chosenNode = cluster.getRandomJetty(random()).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode, abc);
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
String otherNode = null;
for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
if (!chosenNode.equals(jettySolrRunner.getNodeName())) {
otherNode = jettySolrRunner.getNodeName();
}
}
DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
assertNull(collection.getReplicas(otherNode)); // sanity check
assertNull(withCollection.getReplicas(otherNode)); // sanity check
new CollectionAdminRequest.MoveReplica(xyz, collection.getReplicas().iterator().next().getName(), otherNode)
.process(solrClient);
// zkClient().printLayoutToStdOut();
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); // refresh
DocCollection withCollectionRefreshed = solrClient.getZkStateReader().getClusterState().getCollection(abc); // refresh
assertTrue(collection.getReplicas().stream().noneMatch(
replica -> withCollectionRefreshed.getReplicas(replica.getNodeName()) == null
|| withCollectionRefreshed.getReplicas(replica.getNodeName()).isEmpty()));
}
@Test
public void testMoveReplicaWithCollection() throws Exception {
String prefix = "testMoveReplicaWithCollection";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'node':'#ANY'}," +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
solrClient.request(req);
String chosenNode = cluster.getRandomJetty(random()).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode, abc);
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName());
String otherNode = null;
for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
if (!chosenNode.equals(jettySolrRunner.getNodeName())) {
otherNode = jettySolrRunner.getNodeName();
}
}
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
assertNull(collection.getReplicas(otherNode)); // sanity check
assertNull(withCollection.getReplicas(otherNode)); // sanity check
try {
new CollectionAdminRequest.MoveReplica(abc, collection.getReplicas().iterator().next().getName(), otherNode)
.process(solrClient);
fail("Expected moving a replica of 'withCollection': " + abc + " to fail");
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("Collection: testMoveReplicaWithCollection_abc is co-located with collection: testMoveReplicaWithCollection_xyz"));
}
// zkClient().printLayoutToStdOut();
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); // refresh
DocCollection withCollectionRefreshed = solrClient.getZkStateReader().getClusterState().getCollection(abc); // refresh
// sanity check that the failed move operation didn't actually change our co-location guarantees
assertTrue(collection.getReplicas().stream().noneMatch(
replica -> withCollectionRefreshed.getReplicas(replica.getNodeName()) == null
|| withCollectionRefreshed.getReplicas(replica.getNodeName()).isEmpty()));
}
/**
* Tests that when a new node is added to the cluster and autoscaling framework
* moves replicas to the new node, we maintain all co-locating guarantees
*/
public void testNodeAdded() throws Exception {
String prefix = "testNodeAdded";
String xyz = prefix + "_xyz";
String abc = prefix + "_abc";
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'node':'#ANY'}," +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
solrClient.request(req);
String chosenNode = cluster.getRandomJetty(random()).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode, abc);
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName());
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger1'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '0s'," +
"'actions' : [" +
"{'name' : 'compute', 'class' : '" + ComputePlanAction.class.getName() + "'}" +
"{'name' : 'execute', 'class' : '" + ExecutePlanAction.class.getName() + "'}" +
"{'name' : 'compute', 'class' : '" + CapturingAction.class.getName() + "'}" +
"]" +
"}}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
solrClient.request(req);
Optional<JettySolrRunner> other = cluster.getJettySolrRunners()
.stream().filter(j -> !chosenNode.equals(j.getNodeName())).findAny();
String otherNode = other.orElseThrow(AssertionError::new).getNodeName();
// add an extra replica of abc collection on a different node
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(abc, "shard1")
.setNode(otherNode);
addReplica.setWaitForFinalState(true);
addReplica.process(solrClient);
// refresh
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
// sanity check
assertColocated(collection, otherNode, withCollection);
assertEquals(1, collection.getReplicas().size());
Replica xyzReplica = collection.getReplicas().get(0);
// start a new node
JettySolrRunner newNode = cluster.startJettySolrRunner();
assertTrue("Action was not fired till 30 seconds", LATCH.await(30, TimeUnit.SECONDS));
// refresh
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
// sanity check
assertColocated(collection, otherNode, withCollection);
// assert that the replica of xyz collection was not moved
assertNotNull(collection.getReplica(xyzReplica.getName()));
assertEquals(chosenNode, collection.getReplicas().get(0).getNodeName());
// add an extra replica of xyz collection -- this should be placed on the 'otherNode'
addReplica = CollectionAdminRequest.addReplicaToShard(xyz, "shard1");
addReplica.setWaitForFinalState(true);
addReplica.process(solrClient);
// refresh
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
List<Replica> replicas = collection.getReplicas(otherNode);
assertNotNull(replicas);
assertEquals(1, replicas.size());
replicas = withCollection.getReplicas(otherNode);
assertNotNull(replicas);
assertEquals(1, replicas.size());
// add an extra replica of xyz collection -- this should be placed on the 'newNode'
addReplica = CollectionAdminRequest.addReplicaToShard(xyz, "shard1");
addReplica.setWaitForFinalState(true);
addReplica.process(solrClient);
// refresh
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
assertNotNull(collection.getReplicas(newNode.getNodeName()));
replicas = collection.getReplicas(newNode.getNodeName());
assertNotNull(replicas);
assertEquals(1, replicas.size());
replicas = withCollection.getReplicas(newNode.getNodeName());
assertNotNull(replicas);
assertEquals(1, replicas.size());
}
public void testMultipleWithCollections() throws Exception {
String prefix = "testMultipleWithCollections";
String xyz = prefix + "_xyz";
String xyz2 = prefix + "_xyz2";
String abc = prefix + "_abc";
String abc2 = prefix + "_abc2";
// start 2 more nodes so we have 4 in total
cluster.startJettySolrRunner();
cluster.startJettySolrRunner();
cluster.waitForAllNodes(30);
CloudSolrClient solrClient = cluster.getSolrClient();
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
" {'replica':'<2', 'node':'#ANY'}," +
" ]" +
"}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
solrClient.request(req);
String chosenNode = cluster.getJettySolrRunner(0).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode, abc);
CollectionAdminRequest.createCollection(abc, 1, 1)
.setCreateNodeSet(chosenNode)
.process(solrClient);
CollectionAdminRequest.createCollection(xyz, 1, 1)
.setWithCollection(abc)
.process(solrClient);
String chosenNode2 = cluster.getJettySolrRunner(1).getNodeName();
log.info("Chosen node {} for collection {}", chosenNode2, abc2);
CollectionAdminRequest.createCollection(abc2, 1, 1)
.setCreateNodeSet(chosenNode2)
.process(solrClient);
CollectionAdminRequest.createCollection(xyz2, 1, 1)
.setWithCollection(abc2)
.process(solrClient);
// refresh
DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
DocCollection collection2 = solrClient.getZkStateReader().getClusterState().getCollection(xyz2);
DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
DocCollection withCollection2 = solrClient.getZkStateReader().getClusterState().getCollection(abc2);
// sanity check
assertColocated(collection, chosenNode2, withCollection); // no replica should be on chosenNode2
assertColocated(collection2, chosenNode, withCollection2); // no replica should be on chosenNode
String chosenNode3 = cluster.getJettySolrRunner(2).getNodeName();
CollectionAdminRequest.addReplicaToShard(xyz, "shard1")
.setNode(chosenNode3)
.process(solrClient);
String chosenNode4 = cluster.getJettySolrRunner(2).getNodeName();
CollectionAdminRequest.addReplicaToShard(xyz2, "shard1")
.setNode(chosenNode4)
.process(solrClient);
collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
collection2 = solrClient.getZkStateReader().getClusterState().getCollection(xyz2);
withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
withCollection2 = solrClient.getZkStateReader().getClusterState().getCollection(abc2);
// sanity check
assertColocated(collection, null, withCollection);
assertColocated(collection2, null, withCollection2);
}
/**
* Asserts that all replicas of collection are colocated with at least one
* replica of the withCollection and none of them should be on the given 'noneOnNode'.
*/
private void assertColocated(DocCollection collection, String noneOnNode, DocCollection withCollection) {
// sanity check
assertTrue(collection.getReplicas().stream().noneMatch(
replica -> withCollection.getReplicas(replica.getNodeName()) == null
|| withCollection.getReplicas(replica.getNodeName()).isEmpty()));
if (noneOnNode != null) {
assertTrue(collection.getReplicas().stream().noneMatch(
replica -> noneOnNode.equals(replica.getNodeName())));
}
}
private static CountDownLatch LATCH = new CountDownLatch(1);
public static class CapturingAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
LATCH.countDown();
}
}
}

View File

@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
@ -92,6 +93,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
@ -680,11 +682,37 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
List<String> nodeList = new ArrayList<>();
List<String> shardNames = new ArrayList<>();
final String collectionName = props.getStr(NAME);
String router = props.getStr("router.name", DocRouter.DEFAULT_NAME);
String policy = props.getStr(Policy.POLICY);
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
// fail fast if parameters are wrong or incomplete
List<String> shardNames = CreateCollectionCmd.populateShardNames(props, router);
CreateCollectionCmd.checkMaxShardsPerNode(props, usePolicyFramework);
CreateCollectionCmd.checkReplicaTypes(props);
// always force getting fresh state
collectionsStatesRef.set(null);
ClusterState clusterState = getClusterState();
final ClusterState clusterState = getClusterState();
String withCollection = props.getStr(CollectionAdminParams.WITH_COLLECTION);
String wcShard = null;
if (withCollection != null) {
if (!clusterState.hasCollection(withCollection)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection);
} else {
DocCollection collection = clusterState.getCollection(withCollection);
if (collection.getActiveSlices().size() > 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size());
}
wcShard = collection.getActiveSlices().iterator().next().getName();
}
}
final String withCollectionShard = wcShard;
ZkWriteCommand cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
if (cmd.noop) {
LOG.warn("Collection {} already exists. exit", collectionName);
@ -710,6 +738,35 @@ public class SimClusterStateProvider implements ClusterStateProvider {
final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size());
AtomicInteger replicaNum = new AtomicInteger(1);
replicaPositions.forEach(pos -> {
if (withCollection != null) {
// check that we have a replica of `withCollection` on this node and if not, create one
DocCollection collection = clusterState.getCollection(withCollection);
List<Replica> replicas = collection.getReplicas(pos.node);
if (replicas == null || replicas.isEmpty()) {
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", withCollection, withCollectionShard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
collection.getReplicas().size() + 1);
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
replicaProps.put("SEARCHER.searcher.numDocs", 0);
replicaProps.put("SEARCHER.searcher.maxDoc", 0);
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0),
coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
simAddReplica(pos.node, ri, false);
// do not count down the latch here
return true;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
@ -744,6 +801,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
});
});
// modify the `withCollection` and store this new collection's name with it
if (withCollection != null) {
ZkNodeProps message = new ZkNodeProps(
Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
ZkStateReader.COLLECTION_PROP, withCollection,
CollectionAdminParams.COLOCATED_WITH, collectionName);
cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
}
// force recreation of collection states
collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true);

View File

@ -112,6 +112,11 @@ Details of the snitch provider. See the section <<rule-based-replica-placement.a
`waitForFinalState`::
If `true`, the request will complete only when all affected replicas become active. The default is `false`, which means that the API will return the status of the single action, which may be before the new replica is online and active.
`withCollection`::
The name of the collection with which all replicas of this collection must be co-located. The collection must already exist and must have a single shard named `shard1`.
See <<colocating-collections.adoc#colocating-collections, Colocating collections>> for more details.
=== CREATE Response
The response will include the status of the request and the new core names. If the status is anything other than "success", an error message will explain why the request failed.
@ -181,6 +186,7 @@ The attributes that can be modified are:
* rule
* snitch
* policy
* withCollection
See the <<create,CREATE action>> section above for details on these attributes.

View File

@ -0,0 +1,76 @@
= Colocating Collections
:page-toclevels: 1
:page-tocclass: right
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
Solr provides a way to co-locate a collection with another so that cross-collection joins are always possible.
The co-location guarantee applies to all future Collection operations made either via Collections API or by Autoscaling
actions.
A collection may only be colocated with exactly one `withCollection`. However, arbitrarily many collections may be
_linked_ to the same `withCollection`.
== Create Collection using `withCollection`
The Create Collection API supports a parameter named `withCollection` which can be used to specify a collection
with which the replicas of the newly created collection should be co-located. See <<collections-api.adoc#create, Create Collection API>>
`/admin/collections?action=CREATE&name=techproducts&numShards=1&replicationFactor=2&withCollection=tech_categories`
In the above example, all replicas of the `techproducts` collection will be co-located on a node with at least one
replica of the `tech_categories` collection.
== Colocating existing collections
When collections already exist beforehand, the <<collections-api.adoc#modifycollection, Modify Collection API>> can be
used to set the `withCollection` parameter so that the two collections can be linked. This will *not* trigger
changes to the cluster automatically because moving a large number of replicas immediately might de-stabilize the system.
Instead, it is recommended that the Suggestions UI page should be consulted on the operations that can be performed
to change the cluster manually.
Example:
`/admin/collections?action=MODIFYCOLLECTION&collection=techproducts&withCollection=tech_categories`
== Deleting colocated collections
Deleting a collection which has been linked to another will fail unless the link itself is deleted first by using the
<<collections-api.adoc#modifycollection, Modify Collection API>> to un-set the `withCollection` attribute.
Example:
`/admin/collections?action=MODIFYCOLLECTION&collection=techproducts&withCollection=`
== Limitations and caveats
The collection being used as the `withCollection` must have one shard only and that shard should be named `shard1`. Note
that when using the default router, the shard name is always set to `shard1` but special care must be taken to name the
shard as `shard1` when using the implicit router.
In case new replicas of the `withCollection` have to be added to maintain the co-location guarantees then the new replicas
will be of type `NRT` only. Automatically creating replicas of `TLOG` or `PULL` types is not supported.
In case, replicas have to be moved from one node to another, perhaps in response to a node lost trigger, then the target
nodes will be chosen by preferring nodes that already have a replica of the `withCollection` so that the number of moves
is minimized. However, this also means that unless there are Autoscaling policy violations, Solr will continue to move
such replicas to already loaded nodes instead of preferring empty nodes. Therefore, it is advised to have policy rules
which can prevent such overloading by e.g. setting the maximum number of cores per node to a fixed value.
Example:
`{'cores' : '<8', 'node' : '#ANY'}`
The co-location guarantee is one-way only i.e. a collection 'X' co-located with 'Y' will always have one or more
replicas of 'Y' on any node that has a replica of 'X' but the reverse is not true. There may be nodes which have one or
more replicas of 'Y' but no replicas of 'X'. Such replicas of 'Y' will not be considered a violation of co-location
rules and will not be cleaned up automatically.

View File

@ -1,5 +1,5 @@
= SolrCloud
:page-children: getting-started-with-solrcloud, how-solrcloud-works, solrcloud-resilience, solrcloud-configuration-and-parameters, rule-based-replica-placement, cross-data-center-replication-cdcr, solrcloud-autoscaling
:page-children: getting-started-with-solrcloud, how-solrcloud-works, solrcloud-resilience, solrcloud-configuration-and-parameters, rule-based-replica-placement, cross-data-center-replication-cdcr, solrcloud-autoscaling, colocating-collections
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@ -46,3 +46,4 @@ In this section, we'll cover everything you need to know about using Solr in Sol
* <<rule-based-replica-placement.adoc#rule-based-replica-placement,Rule-based Replica Placement>>
* <<cross-data-center-replication-cdcr.adoc#cross-data-center-replication-cdcr,Cross Data Center Replication (CDCR)>>
* <<solrcloud-autoscaling.adoc#solrcloud-autoscaling,SolrCloud Autoscaling>>
* <<colocating-collections.adoc#colocating-collections,Colocating collections together>>

View File

@ -50,7 +50,7 @@ class AddReplicaSuggester extends Suggester {
for (int i = getMatrix().size() - 1; i >= 0; i--) {
Row row = getMatrix().get(i);
if (!isNodeSuitableForReplicaAddition(row)) continue;
Row tmpRow = row.addReplica(shard.first(), shard.second(), type);
Row tmpRow = row.addReplica(shard.first(), shard.second(), type, strict);
List<Violation> errs = testChangedMatrix(strict, tmpRow.session);
if (!containsNewErrors(errs)) {
@ -77,4 +77,4 @@ class AddReplicaSuggester extends Suggester {
public CollectionParams.CollectionAction getAction() {
return ADDREPLICA;
}
}
}

View File

@ -38,7 +38,6 @@ import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import static java.util.Collections.singletonMap;
import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN;
import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN;
@ -75,6 +74,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
this.strict = clause.strict;
}
// internal use only
Clause(Map<String, Object> original, Condition tag, Condition globalTag, boolean isStrict) {
this.original = original;
this.tag = tag;
this.globalTag = globalTag;
this.globalTag.clause = this;
this.type = null;
this.hasComputedValue = false;
this.strict = isStrict;
}
private Clause(Map<String, Object> m) {
this.original = Utils.getDeepCopy(m, 10);
String type = (String) m.get("type");
@ -374,7 +384,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
for (Row r : session.matrix) {
SealedClause sealedClause = getSealedClause(computedValueEvaluator);
if (!sealedClause.getGlobalTag().isPass(r)) {
ConditionType.CORES.addViolatingReplicas(ctx.reset(null, null,
sealedClause.getGlobalTag().varType.addViolatingReplicas(ctx.reset(null, null,
new Violation(sealedClause, null, null, r.node, r.getVal(sealedClause.globalTag.name), sealedClause.globalTag.delta(r.getVal(globalTag.name)), null)));
}
}
@ -553,21 +563,21 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
boolean isPass(Object inputVal) {
return isPass(inputVal, null);
}
boolean isPass(Object inputVal, Row row) {
if (computedType != null) {
throw new IllegalStateException("This is supposed to be called only from a Condition with no computed value or a SealedCondition");
}
if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(getClause().type);
if (varType == ConditionType.LAZY) { // we don't know the type
return op.match(parseString(val), parseString(inputVal)) == PASS;
} else {
return op.match(val, validate(name, inputVal, false)) == PASS;
}
return varType.match(inputVal, op, val, name, row);
}
boolean isPass(Row row) {
return isPass(row.getVal(name));
return isPass(row.getVal(name), row);
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.List;
@ -24,10 +25,13 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
public class MoveReplicaSuggester extends Suggester {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
SolrRequest init() {
@ -56,20 +60,28 @@ public class MoveReplicaSuggester extends Suggester {
targetRow = session.matrix.get(j);
if (targetRow.node.equals(fromRow.node)) continue;
if (!isNodeSuitableForReplicaAddition(targetRow)) continue;
targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType());//add replica to target first
Pair<Row, ReplicaInfo> pair = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
if (pair == null) continue;//should not happen
Row srcRowModified = pair.first();//this is the final state of the source row and session
targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType(), strict); // add replica to target first
Row srcRowModified = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
List<Violation> errs = testChangedMatrix(strict, srcRowModified.session);
srcRowModified.session.applyRules();// now resort the nodes with the new values
srcRowModified.session.applyRules(); // now resort the nodes with the new values
Policy.Session tmpSession = srcRowModified.session;
if (!containsNewErrors(errs) &&
isLessSerious(errs, leastSeriousViolation) &&
(force || (tmpSession.indexOf(srcRowModified.node) < tmpSession.indexOf(targetRow.node)))) {
leastSeriousViolation = errs;
bestSrcRow = srcRowModified;
sourceReplicaInfo = ri;
bestTargetRow = targetRow;
int result = -1;
if (!force && srcRowModified.isLive && targetRow.isLive) {
result = tmpSession.getPolicy().clusterPreferences.get(0).compare(srcRowModified, tmpSession.getNode(targetRow.node), true);
if (result == 0) result = tmpSession.getPolicy().clusterPreferences.get(0).compare(srcRowModified, tmpSession.getNode(targetRow.node), false);
}
if (result <= 0) {
leastSeriousViolation = errs;
bestSrcRow = srcRowModified;
sourceReplicaInfo = ri;
bestTargetRow = targetRow;
}
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.StrUtils;
@ -55,6 +56,8 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.NODE;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION;
/*The class that reads, parses and applies policies specified in
* autoscaling.json
@ -74,7 +77,7 @@ public class Policy implements MapWriter {
public static final String POLICIES = "policies";
public static final String CLUSTER_POLICY = "cluster-policy";
public static final String CLUSTER_PREFERENCES = "cluster-preferences";
public static final Set<String> GLOBAL_ONLY_TAGS = Collections.singleton("cores");
public static final Set<String> GLOBAL_ONLY_TAGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("cores", CollectionAdminParams.WITH_COLLECTION)));
public static final List<Preference> DEFAULT_PREFERENCES = Collections.unmodifiableList(
Arrays.asList(
new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}")),
@ -131,9 +134,12 @@ public class Policy implements MapWriter {
this.policies = Collections.unmodifiableMap(
policiesFromMap((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault(POLICIES, emptyMap()), newParams));
this.params = Collections.unmodifiableList(newParams.stream()
List<Pair<String, Suggestion.ConditionType>> params = newParams.stream()
.map(s -> new Pair<>(s, Suggestion.getTagType(s)))
.collect(toList()));
.collect(toList());
//let this be there always, there is no extra cost
params.add(new Pair<>(WITH_COLLECTION.tagName, WITH_COLLECTION));
this.params = Collections.unmodifiableList(params);
perReplicaAttributes = readPerReplicaAttrs();
}
@ -501,6 +507,21 @@ public class Policy implements MapWriter {
.filter(clause -> !clause.isPerCollectiontag())
.collect(Collectors.toList());
if (nodes.size() > 0) {
//if any collection has 'withCollection' irrespective of the node, the NodeStateProvider returns a map value
Map<String, Object> vals = nodeStateProvider.getNodeValues(nodes.get(0), Collections.singleton("withCollection"));
if (!vals.isEmpty() && vals.get("withCollection") != null) {
Map<String, String> withCollMap = (Map<String, String>) vals.get("withCollection");
if (!withCollMap.isEmpty()) {
Clause withCollClause = new Clause((Map<String,Object>)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") ,
new Clause.Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null),
new Clause.Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true
);
expandedClauses.add(withCollClause);
}
}
}
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
for (String c : collections) {
addClausesForCollection(stateProvider, c);
@ -594,4 +615,15 @@ public class Policy implements MapWriter {
throw new RuntimeException("NO such node found " + node);
}
}
static final Map<String, Suggestion.ConditionType> validatetypes = new HashMap<>();
static {
for (Suggestion.ConditionType t : Suggestion.ConditionType.values())
validatetypes.put(t.tagName, t);
}
public static ConditionType getTagType(String name) {
ConditionType info = validatetypes.get(name);
if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = ConditionType.STRING;
if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = ConditionType.LAZY;
return info;
}
}

View File

@ -93,7 +93,7 @@ public class PolicyHelper {
if (autoScalingConfig != null) {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
public AutoScalingConfig getAutoScalingConfig() {
return autoScalingConfig;
}
};
@ -135,7 +135,7 @@ public class PolicyHelper {
}
}
} catch (IOException e) {
/*ignore*/
log.warn("Exception while reading disk free metric values for nodes to be used for collection: " + collName, e);
}
@ -178,7 +178,7 @@ public class PolicyHelper {
}
public static final int SESSION_EXPIRY = 180;//3 seconds
public static final int SESSION_EXPIRY = 180; // 3 minutes
public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
@ -230,7 +230,7 @@ public class PolicyHelper {
/**Use this to dump the state of a system and to generate a testcase
*/
public static void logState(SolrCloudManager cloudManager, Suggester suggester) {
if(log.isTraceEnabled()) {
if (log.isTraceEnabled()) {
log.trace("LOGSTATE: {}",
Utils.toJSONString((MapWriter) ew -> {
ew.put("liveNodes", cloudManager.getClusterStateProvider().getLiveNodes());
@ -249,9 +249,9 @@ public class PolicyHelper {
public enum Status {
NULL,
//it is just created and not yet used or all operations on it has been competed fully
//it is just created and not yet used or all operations on it has been completed fully
UNUSED,
COMPUTING, EXECUTING;
COMPUTING, EXECUTING
}
/**
@ -265,7 +265,7 @@ public class PolicyHelper {
*/
static class SessionRef {
private final Object lockObj = new Object();
private SessionWrapper sessionWrapper = SessionWrapper.DEF_INST;
private SessionWrapper sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
public SessionRef() {
@ -286,7 +286,7 @@ public class PolicyHelper {
synchronized (lockObj) {
if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
log.debug("session set to NULL");
this.sessionWrapper = SessionWrapper.DEF_INST;
this.sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
} // else somebody created a new session b/c of expiry . So no need to do anything about it
}
}
@ -311,7 +311,7 @@ public class PolicyHelper {
//one thread who is waiting for this need to be notified.
lockObj.notify();
} else {
log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime);
log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
//else just ignore it
}
}
@ -343,7 +343,7 @@ public class PolicyHelper {
}
log.debug("out of waiting curr-time:{} time-elapsed {}", time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
// now this thread has woken up because it got timed out after 10 seconds or it is notified after
//the session was returned from another COMPUTING operation
// the session was returned from another COMPUTING operation
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
log.debug("Wait over. reusing the existing session ");
this.sessionWrapper.status = Status.COMPUTING;
@ -401,12 +401,12 @@ public class PolicyHelper {
public static class SessionWrapper {
public static final SessionWrapper DEF_INST = new SessionWrapper(null, null);
public static final SessionWrapper DEFAULT_INSTANCE = new SessionWrapper(null, null);
static {
DEF_INST.status = Status.NULL;
DEF_INST.createTime = -1l;
DEF_INST.lastUpdateTime = -1l;
DEFAULT_INSTANCE.status = Status.NULL;
DEFAULT_INSTANCE.createTime = -1L;
DEFAULT_INSTANCE.lastUpdateTime = -1L;
}
private long createTime;

View File

@ -18,10 +18,12 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -33,6 +35,8 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
@ -40,6 +44,8 @@ import static org.apache.solr.common.params.CoreAdminParams.NODE;
* Each instance represents a node in the cluster
*/
public class Row implements MapWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public final String node;
final Cell[] cells;
//this holds the details of each replica in the node
@ -109,6 +115,14 @@ public class Row implements MapWriter {
return jsonStr();
}
public Row addReplica(String coll, String shard, Replica.Type type) {
return addReplica(coll, shard, type, 0, true);
}
public Row addReplica(String coll, String shard, Replica.Type type, boolean strictMode) {
return addReplica(coll, shard, type, 0, strictMode);
}
/**
* this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica ,
* values of certain attributes will be modified, in this node as well as other nodes. Please note that
@ -117,9 +131,18 @@ public class Row implements MapWriter {
* @param coll collection name
* @param shard shard name
* @param type replica type
* @param recursionCount the number of times we have recursed to add more replicas
* @param strictMode whether suggester is operating in strict mode or not
*/
public Row addReplica(String coll, String shard, Replica.Type type) {
Row row = session.copy().getNode(this.node);
Row addReplica(String coll, String shard, Replica.Type type, int recursionCount, boolean strictMode) {
if (recursionCount > 3) {
log.error("more than 3 levels of recursion ", new RuntimeException());
return this;
}
List<OperationInfo> furtherOps = new LinkedList<>();
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
Row row = null;
row = session.copy().getNode(this.node);
if (row == null) throw new RuntimeException("couldn't get a row");
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>());
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
@ -128,12 +151,37 @@ public class Row implements MapWriter {
Utils.makeMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString()));
replicas.add(ri);
for (Cell cell : row.cells) {
cell.type.projectAddReplica(cell, ri);
cell.type.projectAddReplica(cell, ri, opCollector, strictMode);
}
for (OperationInfo op : furtherOps) {
if (op.isAdd) {
row = row.session.getNode(op.node).addReplica(op.coll, op.shard, op.type, recursionCount + 1, strictMode);
} else {
row.session.getNode(op.node).removeReplica(op.coll, op.shard, op.type, recursionCount+1);
}
}
return row;
}
static class OperationInfo {
final String coll, shard, node, cellName;
final boolean isAdd;// true =addReplica, false=removeReplica
final Replica.Type type;
OperationInfo(String coll, String shard, String node, String cellName, boolean isAdd, Replica.Type type) {
this.coll = coll;
this.shard = shard;
this.node = node;
this.cellName = cellName;
this.isAdd = isAdd;
this.type = type;
}
}
public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) {
Map<String, List<ReplicaInfo>> c = collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
@ -150,9 +198,18 @@ public class Row implements MapWriter {
if (idx == -1) return null;
return r.get(idx);
}
public Row removeReplica(String coll, String shard, Replica.Type type) {
return removeReplica(coll,shard, type, 0);
}
// this simulates removing a replica from a node
public Pair<Row, ReplicaInfo> removeReplica(String coll, String shard, Replica.Type type) {
public Row removeReplica(String coll, String shard, Replica.Type type, int recursionCount) {
if (recursionCount > 3) {
log.error("more than 3 levels of recursion ", new RuntimeException());
return this;
}
List<OperationInfo> furtherOps = new LinkedList<>();
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
Row row = session.copy().getNode(this.node);
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
@ -169,9 +226,9 @@ public class Row implements MapWriter {
if (idx == -1) return null;
ReplicaInfo removed = r.remove(idx);
for (Cell cell : row.cells) {
cell.type.projectRemoveReplica(cell, removed);
cell.type.projectRemoveReplica(cell, removed, opCollector);
}
return new Pair(row, removed);
return row;
}

View File

@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -27,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -34,13 +36,18 @@ import java.util.function.Predicate;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
/* A suggester is capable of suggesting a collection operation
* given a particular session. Before it suggests a new operation,
@ -50,6 +57,8 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Conditio
*
*/
public abstract class Suggester implements MapWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
Policy.Session session;
SolrRequest operation;
@ -94,34 +103,40 @@ public abstract class Suggester implements MapWriter {
abstract SolrRequest init();
@SuppressWarnings("unchecked")
public SolrRequest getSuggestion() {
if (!isInitialized) {
Set<String> collections = (Set<String>) hints.getOrDefault(Hint.COLL, Collections.emptySet());
Set<Pair<String, String>> s = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (!collections.isEmpty() || !s.isEmpty()) {
HashSet<Pair<String, String>> shards = new HashSet<>(s);
collections.stream().forEach(c -> shards.add(new Pair<>(c, null)));
ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
for (Pair<String, String> shard : shards) {
// if this is not a known collection from the existing clusterstate,
// then add it
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
session.addClausesForCollection(stateProvider, shard.first());
HashSet<Pair<String, String>> collectionShardPairs = new HashSet<>(s);
collections.forEach(c -> collectionShardPairs.add(new Pair<>(c, null)));
collections.forEach(c -> {
try {
getWithCollection(c).ifPresent(withCollection -> collectionShardPairs.add(new Pair<>(withCollection, null)));
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception while fetching 'withCollection' attribute for collection: " + c, e);
}
for (Row row : session.matrix) {
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
});
s.forEach(kv -> {
try {
getWithCollection(kv.first()).ifPresent(withCollection -> collectionShardPairs.add(new Pair<>(withCollection, null)));
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception while fetching 'withCollection' attribute for collection: " + kv.first(), e);
}
}
});
setupCollection(collectionShardPairs);
Collections.sort(session.expandedClauses);
}
Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
if (srcNodes != null && !srcNodes.isEmpty()) {
// the source node is dead so live nodes may not have it
for (String srcNode : srcNodes) {
if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) {
session.matrix.add(new Row(srcNode, session.getPolicy().params, session.getPolicy().perReplicaAttributes, session));
}
}
}
session.applyRules();
@ -135,6 +150,30 @@ public abstract class Suggester implements MapWriter {
return operation;
}
protected Optional<String> getWithCollection(String collectionName) throws IOException {
DocCollection collection = session.cloudManager.getClusterStateProvider().getCollection(collectionName);
if (collection != null) {
return Optional.ofNullable(collection.getStr(WITH_COLLECTION));
} else {
return Optional.empty();
}
}
private void setupCollection(HashSet<Pair<String, String>> collectionShardPairs) {
ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
for (Pair<String, String> shard : collectionShardPairs) {
// if this is not a known collection from the existing clusterstate,
// then add it
if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
session.addClausesForCollection(stateProvider, shard.first());
}
for (Row row : session.matrix) {
Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
}
}
}
public Policy.Session getSession() {
return session;
}
@ -355,4 +394,67 @@ public abstract class Suggester implements MapWriter {
ew.put("action", String.valueOf(getAction()));
ew.put("hints", (MapWriter) ew1 -> hints.forEach((hint, o) -> ew1.putNoEx(hint.toString(), o)));
}
protected Collection setupWithCollectionTargetNodes(Set<String> collections, Set<Pair<String, String>> s, String withCollection) {
Collection originalTargetNodesCopy = null;
if (withCollection != null) {
if (log.isDebugEnabled()) {
HashSet<String> set = new HashSet<>(collections);
s.forEach(kv -> set.add(kv.first()));
log.debug("Identified withCollection = {} for collection: {}", withCollection, set);
}
originalTargetNodesCopy = Utils.getDeepCopy((Collection) hints.get(Hint.TARGET_NODE), 10, true);
Set<String> withCollectionNodes = new HashSet<>();
for (Row row : getMatrix()) {
row.forEachReplica(r -> {
if (withCollection.equals(r.getCollection()) &&
"shard1".equals(r.getShard())) {
withCollectionNodes.add(r.getNode());
}
});
}
if (originalTargetNodesCopy != null && !originalTargetNodesCopy.isEmpty()) {
// find intersection of the set of target nodes with the set of 'withCollection' nodes
Set<String> set = (Set<String>) hints.computeIfAbsent(Hint.TARGET_NODE, h -> new HashSet<>());
set.retainAll(withCollectionNodes);
if (set.isEmpty()) {
// no nodes common between the sets, we have no choice but to restore the original target node hint
hints.put(Hint.TARGET_NODE, originalTargetNodesCopy);
}
} else if (originalTargetNodesCopy == null) {
hints.put(Hint.TARGET_NODE, withCollectionNodes);
}
}
return originalTargetNodesCopy;
}
protected String findWithCollection(Set<String> collections, Set<Pair<String, String>> s) {
List<String> withCollections = new ArrayList<>(1);
collections.forEach(c -> {
try {
getWithCollection(c).ifPresent(withCollections::add);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception while fetching 'withCollection' attribute for collection: " + c, e);
}
});
s.forEach(kv -> {
try {
getWithCollection(kv.first()).ifPresent(withCollections::add);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception while fetching 'withCollection' attribute for collection: " + kv.first(), e);
}
});
if (withCollections.size() > 1) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"The number of 'withCollection' attributes should be exactly 1 for any policy but found: " + withCollections);
}
return withCollections.isEmpty() ? null : withCollections.get(0);
}
}

View File

@ -25,12 +25,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -44,13 +43,14 @@ import org.apache.solr.common.util.StrUtils;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.parseString;
import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
public class Suggestion {
public static final String coreidxsize = "INDEX.sizeInGB";
static final Map<String, ConditionType> validatetypes = new HashMap<>();
private static final String NULL = "";
@Target(ElementType.FIELD)
@ -82,14 +82,13 @@ public class Suggestion {
String metricsKey() default NULL;
Class implementation() default void.class;
ComputedType[] computedValues() default ComputedType.NULL;
}
public static ConditionType getTagType(String name) {
ConditionType info = validatetypes.get(name);
if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = ConditionType.STRING;
if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = ConditionType.LAZY;
return info;
return Policy.getTagType(name);
}
private static Object getOperandAdjustedValue(Object val, Object original) {
@ -160,7 +159,9 @@ public class Suggestion {
/**
* Type details of each variable in policies
*/
public enum ConditionType {
public enum ConditionType implements VarType {
@Meta(name = "withCollection", type = String.class, isNodeSpecificVal = true, implementation = WithCollectionVarType.class)
WITH_COLLECTION(),
@Meta(name = "collection",
type = String.class)
@ -375,7 +376,7 @@ public class Suggestion {
//When a replica is added, freedisk should be incremented
@Override
public void projectAddReplica(Cell cell, ReplicaInfo ri) {
public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> ops, boolean strictMode) {
//go through other replicas of this shard and copy the index size value into this
for (Row row : cell.getRow().session.matrix) {
row.forEachReplica(replicaInfo -> {
@ -395,7 +396,7 @@ public class Suggestion {
}
@Override
public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false);
if (idxSize == null) return;
Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val;
@ -464,12 +465,12 @@ public class Suggestion {
}
@Override
public void projectAddReplica(Cell cell, ReplicaInfo ri) {
public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> ops, boolean strictMode) {
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1;
}
@Override
public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1;
}
},
@ -525,7 +526,12 @@ public class Suggestion {
LAZY() {
@Override
public Object validate(String name, Object val, boolean isRuleVal) {
return Clause.parseString(val);
return parseString(val);
}
@Override
public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
return op.match(parseString(val), parseString(inputVal)) == Clause.TestStatus.PASS;
}
@Override
@ -543,6 +549,8 @@ public class Suggestion {
public void getSuggestions(SuggestionCtx ctx) {
perNodeSuggestions(ctx);
}
};
public final String tagName;
@ -558,6 +566,7 @@ public class Suggestion {
public final Set<String> associatedPerNodeValues;
public final String metricsAttribute;
public final Set<ComputedType> supportedComputedTypes;
private final VarType impl;
ConditionType() {
@ -569,6 +578,15 @@ public class Suggestion {
} catch (NoSuchFieldException e) {
//cannot happen
}
if (meta.implementation() != void.class) {
try {
impl = (VarType) meta.implementation().newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate: " + meta.implementation().getName());
}
} else {
impl = null;
}
this.tagName = meta.name();
this.type = meta.type();
@ -583,6 +601,7 @@ public class Suggestion {
emptySet() :
unmodifiableSet(new HashSet(Arrays.asList(meta.computedValues())));
this.wildCards = readSet(meta.wildCards());
}
public String getTagName() {
@ -603,11 +622,21 @@ public class Suggestion {
return unmodifiableSet(new HashSet<>(Arrays.asList(vals)));
}
@Override
public void getSuggestions(SuggestionCtx ctx) {
if (impl != null) {
impl.getSuggestions(ctx);
return;
}
perNodeSuggestions(ctx);
}
@Override
public void addViolatingReplicas(ViolationCtx ctx) {
if (impl != null) {
impl.addViolatingReplicas(ctx);
return;
}
for (Row row : ctx.allRows) {
if (ctx.clause.tag.varType.meta.isNodeSpecificVal() && !row.node.equals(ctx.tagKey)) continue;
collectViolatingReplicas(ctx, row);
@ -669,21 +698,35 @@ public class Suggestion {
/**
* Simulate a replica addition to a node in the cluster
*/
public void projectAddReplica(Cell cell, ReplicaInfo ri) {
public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector, boolean strictMode) {
if (impl != null) impl.projectAddReplica(cell, ri, opCollector, strictMode);
}
public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
if (impl != null) {
impl.projectRemoveReplica(cell, ri, opCollector);
}
}
@Override
public int compareViolation(Violation v1, Violation v2) {
if (impl != null) return impl.compareViolation(v1, v2);
if (v2.replicaCountDelta == null || v1.replicaCountDelta == null) return 0;
if (Math.abs(v1.replicaCountDelta) == Math.abs(v2.replicaCountDelta)) return 0;
return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1;
}
@Override
public Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard, String node) {
if (impl != null) return impl.computeValue(session, condition, collection, shard, node);
return condition.val;
}
@Override
public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
if (impl != null) return impl.match(inputVal, op, val, name, row);
return op.match(val, validate(name, inputVal, false)) == Clause.TestStatus.PASS;
}
}
private static void collectViolatingReplicas(ViolationCtx ctx, Row row) {
@ -757,9 +800,4 @@ public class Suggestion {
}
}
static {
for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.function.Consumer;
/**
* A Variable Type used in Autoscaling policy rules
*/
public interface VarType {
boolean match(Object inputVal, Operand op, Object val, String name, Row row);
void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector, boolean strictMode);
void addViolatingReplicas(Suggestion.ViolationCtx ctx);
default void getSuggestions(Suggestion.SuggestionCtx ctx) {
}
default Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard, String node) {
return condition.val;
}
int compareViolation(Violation v1, Violation v2);
default void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
}
}

View File

@ -30,7 +30,7 @@ import org.apache.solr.common.util.Utils;
public class Violation implements MapWriter {
final String shard, coll, node;
final Object actualVal;
final Double replicaCountDelta;//how many extra replicas
Double replicaCountDelta;//how many extra replicas
final Object tagKey;
private final int hash;
private final Clause clause;

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.Pair;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
/**
* Implements the 'withCollection' variable type
*/
public class WithCollectionVarType implements VarType {
@Override
public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
Map<String, String> withCollectionMap = (Map<String, String>) inputVal;
if (withCollectionMap == null || withCollectionMap.isEmpty()) return true;
Set<String> uniqueColls = new HashSet<>();
row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection()));
for (Map.Entry<String, String> e : withCollectionMap.entrySet()) {
if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue())) return false;
}
return true;
}
public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector, boolean strictMode) {
if (strictMode) {
// we do not want to add a replica of the 'withCollection' in strict mode
return;
}
Map<String, String> withCollectionMap = (Map<String, String>) cell.val;
if (withCollectionMap == null || withCollectionMap.isEmpty()) return;
Set<String> uniqueColls = new HashSet<>();
Row row = cell.row;
row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection()));
for (Map.Entry<String, String> e : withCollectionMap.entrySet()) {
if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue())) {
String withCollection = e.getValue();
opCollector.accept(new Row.OperationInfo(withCollection, "shard1", row.node, cell.name, true, Replica.Type.NRT));
}
}
}
@Override
public int compareViolation(Violation v1, Violation v2) {
return Integer.compare(v1.getViolatingReplicas().size(), v2.getViolatingReplicas().size());
}
public void addViolatingReplicas(Suggestion.ViolationCtx ctx) {
String node = ctx.currentViolation.node;
for (Row row : ctx.allRows) {
if (node.equals(row.node)) {
Map<String, String> withCollectionMap = (Map<String, String>) row.getVal("withCollection");
if (withCollectionMap != null) {
row.forEachReplica(r -> {
String withCollection = withCollectionMap.get(r.getCollection());
if (withCollection != null) {
// test whether this row has at least 1 replica of withCollection, else there is a violation
Set<String> uniqueCollections = new HashSet<>();
row.forEachReplica(replicaInfo -> uniqueCollections.add(replicaInfo.getCollection()));
if (!uniqueCollections.contains(withCollection)) {
ctx.currentViolation.addReplica(new Violation.ReplicaInfoAndErr(r).withDelta(1.0d));
}
}
});
ctx.currentViolation.replicaCountDelta = (double) ctx.currentViolation.getViolatingReplicas().size();
}
}
}
}
@Override
public void getSuggestions(Suggestion.SuggestionCtx ctx) {
if (ctx.violation.getViolatingReplicas().isEmpty()) return;
Map<String, Object> nodeValues = ctx.session.nodeStateProvider.getNodeValues(ctx.violation.node, Collections.singleton("withCollection"));
Map<String, String> withCollectionsMap = (Map<String, String>) nodeValues.get("withCollection");
if (withCollectionsMap == null) return;
Set<String> uniqueCollections = new HashSet<>();
for (Violation.ReplicaInfoAndErr replicaInfoAndErr : ctx.violation.getViolatingReplicas()) {
uniqueCollections.add(replicaInfoAndErr.replicaInfo.getCollection());
}
collectionLoop:
for (String collection : uniqueCollections) {
String withCollection = withCollectionsMap.get(collection);
if (withCollection == null) continue;
// can we find a node from which we can move a replica of the `withCollection`
// without creating another violation?
for (Row row : ctx.session.matrix) {
if (ctx.violation.node.equals(row.node)) continue; // filter the violating node
Set<String> hostedCollections = new HashSet<>();
row.forEachReplica(replicaInfo -> hostedCollections.add(replicaInfo.getCollection()));
if (hostedCollections.contains(withCollection) && !hostedCollections.contains(collection)) {
// find the candidate replicas that we can move
List<ReplicaInfo> movableReplicas = new ArrayList<>();
row.forEachReplica(replicaInfo -> {
if (replicaInfo.getCollection().equals(withCollection)) {
movableReplicas.add(replicaInfo);
}
});
for (ReplicaInfo toMove : movableReplicas) {
// candidate source node for a move replica operation
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
.forceOperation(true)
.hint(Suggester.Hint.COLL_SHARD, new Pair<>(withCollection, "shard1"))
.hint(Suggester.Hint.SRC_NODE, row.node)
.hint(Suggester.Hint.REPLICA, toMove.getName())
.hint(Suggester.Hint.TARGET_NODE, ctx.violation.node);
if (ctx.addSuggestion(suggester) != null)
continue collectionLoop; // one suggestion is enough for this collection
}
}
}
// we could not find a valid move, so we suggest adding a replica
Suggester suggester = ctx.session.getSuggester(ADDREPLICA)
.forceOperation(true)
.hint(Suggester.Hint.COLL_SHARD, new Pair<>(withCollection, "shard1"))
.hint(Suggester.Hint.TARGET_NODE, ctx.violation.node);
ctx.addSuggestion(suggester);
}
}
}

View File

@ -45,6 +45,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.cloud.rule.SnitchContext;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@ -60,6 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.METRICS_PREFIX;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.TOTALDISK;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION;
/**
*
@ -75,6 +77,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
protected final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
private Map<String, Object> snitchSession = new HashMap<>();
private Map<String, Map> nodeVsTags = new HashMap<>();
private Map<String, String> withCollectionsMap = new HashMap<>();
public SolrClientNodeStateProvider(CloudSolrClient solrClient) {
this.solrClient = solrClient;
@ -100,6 +103,9 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
all.forEach((collName, ref) -> {
DocCollection coll = ref.get();
if (coll == null) return;
if (coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION) != null) {
withCollectionsMap.put(coll.getName(), (String) coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION));
}
coll.forEachReplica((shard, replica) -> {
Map<String, Map<String, List<ReplicaInfo>>> nodeData = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(replica.getNodeName(), k -> new HashMap<>());
Map<String, List<ReplicaInfo>> collData = nodeData.computeIfAbsent(collName, k -> new HashMap<>());
@ -114,13 +120,15 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
// ew.put("liveNodes", liveNodes);
ew.put("replicaInfo", Utils.getDeepCopy(nodeVsCollectionVsShardVsReplicaInfo, 5));
ew.put("nodeValues", nodeVsTags);
}
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
Map<String, Object> tagVals = fetchTagValues(node, tags);
nodeVsTags.put(node, tagVals);
if (tags.contains(WITH_COLLECTION.tagName)) {
tagVals.put(WITH_COLLECTION.tagName, withCollectionsMap);
}
return tagVals;
}

View File

@ -59,9 +59,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM;
import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
/**
* This class is experimental and subject to change.
@ -80,7 +82,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
MAX_SHARDS_PER_NODE,
AUTO_ADD_REPLICAS,
POLICY,
COLL_CONF);
COLL_CONF,
WITH_COLLECTION,
COLOCATED_WITH);
protected final CollectionAction action;
@ -417,10 +421,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
protected Integer pullReplicas;
protected Integer tlogReplicas;
private Properties properties;
protected Properties properties;
protected Boolean autoAddReplicas;
protected Integer stateFormat;
private String[] rule , snitch;
protected String[] rule , snitch;
protected String withCollection;
/** Constructor intended for typical use cases */
protected Create(String collection, String config, Integer numShards, Integer numNrtReplicas, Integer numTlogReplicas, Integer numPullReplicas) { // TODO: maybe add other constructors
@ -557,6 +562,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (rule != null) params.set(DocCollection.RULE, rule);
if (snitch != null) params.set(DocCollection.SNITCH, snitch);
params.setNonNull(POLICY, policy);
params.setNonNull(WITH_COLLECTION, withCollection);
return params;
}
@ -564,6 +570,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
this.policy = policy;
return this;
}
public String getWithCollection() {
return withCollection;
}
public Create setWithCollection(String withCollection) {
this.withCollection = withCollection;
return this;
}
}
/**

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.common.SolrException;
@ -174,6 +175,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return slices.get(sliceName);
}
/**
* @param consumer consume shardName vs. replica
*/
public void forEachReplica(BiConsumer<String, Replica> consumer) {
slices.forEach((shard, slice) -> slice.getReplicasMap().forEach((s, replica) -> consumer.accept(shard, replica)));
}
@ -321,7 +325,22 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
return replicas;
}
/**
* @param predicate test against shardName vs. replica
* @return the first replica that matches the predicate
*/
public Replica getReplica(BiPredicate<String, Replica> predicate) {
final Replica[] result = new Replica[1];
forEachReplica((s, replica) -> {
if (result[0] != null) return;
if (predicate.test(s, replica)) {
result[0] = replica;
}
});
return result[0];
}
public List<Replica> getReplicas(EnumSet<Replica.Type> s) {
List<Replica> replicas = new ArrayList<>();
for (Slice slice : this) {

View File

@ -79,4 +79,15 @@ public interface CollectionAdminParams {
* The name of the config set to be used for a collection
*/
String COLL_CONF = "collection.configName";
/**
* The name of the collection with which a collection is to be co-located
*/
String WITH_COLLECTION = "withCollection";
/**
* The reverse-link to WITH_COLLECTION flag. It is stored in the cluster state of the `withCollection`
* and points to the collection on which the `withCollection` was specified.
*/
String COLOCATED_WITH = "COLOCATED_WITH";
}

View File

@ -166,6 +166,564 @@ public class TestPolicy extends SolrTestCaseJ4 {
return result;
}
public void testWithCollection() {
String clusterStateStr = "{" +
" 'comments_coll':{" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards':{}," +
" 'withCollection' :'articles_coll'" +
" }," +
" 'articles_coll': {" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7574/solr'," +
" 'node_name': 'node2'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}";
ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'minimize': 'cores'}," +
" { 'maximize': 'freedisk', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
Set<String> nodes = new HashSet<>(2);
nodes.add(op.getParams().get("node"));
session = suggester.getSession();
suggester = session.getSuggester(ADDREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
op = suggester.getSuggestion();
assertNotNull(op);
nodes.add(op.getParams().get("node"));
assertEquals(2, nodes.size());
assertTrue("node1 should have been selected by add replica", nodes.contains("node1"));
assertTrue("node2 should have been selected by add replica", nodes.contains("node2"));
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
op = suggester.getSuggestion();
assertNull(op);
}
public void testWithCollectionSuggestions() {
String clusterStateStr = "{" +
" 'articles_coll':{" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards':{'shard1':{}}," +
" }," +
" 'comments_coll': {" +
" 'withCollection' :'articles_coll'," +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7574/solr'," +
" 'node_name': 'node2'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}";
ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores'}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies),
new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
assertNotNull(l);
assertEquals(2, l.size());
// collect the set of nodes to which replicas are being added
Set<String> nodes = new HashSet<>(2);
m = l.get(0).toMap(new LinkedHashMap<>());
assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta"));
assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method"));
assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(m, true, "operation/path"));
assertNotNull(Utils.getObjectByPath(m, false, "operation/command/add-replica"));
nodes.add((String) Utils.getObjectByPath(m, true, "operation/command/add-replica/node"));
m = l.get(1).toMap(new LinkedHashMap<>());
assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta"));
assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method"));
assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(m, true, "operation/path"));
assertNotNull(Utils.getObjectByPath(m, false, "operation/command/add-replica"));
nodes.add((String) Utils.getObjectByPath(m, true, "operation/command/add-replica/node"));
assertEquals(2, nodes.size());
assertTrue(nodes.contains("node1"));
assertTrue(nodes.contains("node2"));
}
public void testWithCollectionMoveVsAddSuggestions() {
String clusterStateStr = "{" +
" 'articles_coll':{" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7574/solr'," +
" 'node_name': 'node2'," +
" 'state': 'active'" +
" }," +
" 'r3': {" +
" 'core': 'r3'," +
" 'base_url': 'http://10.0.0.4:7579/solr'," +
" 'node_name': 'node6'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }," +
" 'comments_coll': {" +
" 'withCollection' :'articles_coll'," +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:7576/solr'," +
" 'node_name': 'node3'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7577/solr'," +
" 'node_name': 'node4'," +
" 'state': 'active'" +
" }," +
" 'r3': {" +
" 'core': 'r3'," +
" 'base_url': 'http://10.0.0.4:7578/solr'," +
" 'node_name': 'node5'," +
" 'state': 'active'" +
" }," +
" 'r4': {" +
" 'core': 'r4'," +
" 'base_url': 'http://10.0.0.4:7579/solr'," +
" 'node_name': 'node6'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}";
ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6"));
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores'}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies),
new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
assertNotNull(l);
assertEquals(3, l.size());
// collect the set of nodes to which replicas are being added
Set<String> nodes = new HashSet<>(2);
int numMoves = 0, numAdds = 0;
Set<String> addNodes = new HashSet<>();
Set<String> targetNodes = new HashSet<>();
Set<String> movedReplicas = new HashSet<>();
for (Suggester.SuggestionInfo suggestionInfo : l) {
Map s = suggestionInfo.toMap(new LinkedHashMap<>());
assertEquals("POST", Utils.getObjectByPath(s, true, "operation/method"));
if (Utils.getObjectByPath(s, false, "operation/command/add-replica") != null) {
numAdds++;
assertEquals(1.0d, Utils.getObjectByPath(s, true, "violation/violation/delta"));
assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(s, true, "operation/path"));
addNodes.add((String) Utils.getObjectByPath(s, true, "operation/command/add-replica/node"));
} else if (Utils.getObjectByPath(s, false, "operation/command/move-replica") != null) {
numMoves++;
assertEquals("/c/articles_coll", Utils.getObjectByPath(s, true, "operation/path"));
targetNodes.add((String) Utils.getObjectByPath(s, true, "operation/command/move-replica/targetNode"));
movedReplicas.add((String) Utils.getObjectByPath(s, true, "operation/command/move-replica/replica"));
} else {
fail("Unexpected operation type suggested for suggestion: " + suggestionInfo);
}
}
assertEquals(2, targetNodes.size());
assertEquals(1, addNodes.size());
assertEquals(2, movedReplicas.size());
Set<String> allTargetNodes = new HashSet<>(targetNodes);
allTargetNodes.addAll(addNodes);
assertEquals(3, allTargetNodes.size());
assertTrue(allTargetNodes.contains("node3"));
assertTrue(allTargetNodes.contains("node4"));
assertTrue(allTargetNodes.contains("node5"));
}
public void testWithCollectionMoveReplica() {
String clusterStateStr = "{" +
" 'comments_coll':{" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards':{" +
" 'shard1' : {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }" +
" }" +
" }" +
" }," +
" 'withCollection' :'articles_coll'" +
" }," +
" 'articles_coll': {" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7574/solr'," +
" 'node_name': 'node2'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}";
ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
ImmutableSet.of("node2", "node3", "node4", "node5"));
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'minimize': 'cores'}," +
" { 'maximize': 'freedisk', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
Suggester suggester = session.getSuggester(CollectionAction.MOVEREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
suggester.hint(Hint.SRC_NODE, "node1");
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
assertEquals("node2 should have been selected by move replica","node2",
op.getParams().get("targetNode"));
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
suggester.hint(Hint.SRC_NODE, "node1");
op = suggester.getSuggestion();
assertNull(op);
}
public void testValidate() {
expectError("replica", -1, "must be greater than");
expectError("replica", "hello", "not a valid number");
@ -1228,7 +1786,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertTrue(session.getPolicy() == config.getPolicy());
assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
sessionWrapper.release();
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
@ -1256,9 +1814,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertEquals(2, s1.getRefCount());
s2[0].release();
assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
s1.release();
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
}
@ -1479,7 +2037,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertEquals("node2", op.getNode());
}
private SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String clusterState) {
private SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String clusterS) {
return new SolrCloudManager() {
ObjectCache objectCache = new ObjectCache();
@ -1521,7 +2079,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterState);
return getReplicaDetails(node, clusterS);
}
};
}