SOLR-15055: Re-implement 'withCollection'.

This commit is contained in:
Andrzej Bialecki 2021-01-26 15:16:04 +01:00
parent 695e789891
commit 6e9185a33a
37 changed files with 970 additions and 145 deletions

View File

@ -25,6 +25,9 @@ New Features
* SOLR-15019: Replica placement API needs a way to fetch existing replica metrics. (ab, ilan) * SOLR-15019: Replica placement API needs a way to fetch existing replica metrics. (ab, ilan)
* SOLR-15055: Re-implement 'withCollection'. This also adds the placement plugin support
for rejecting replica / collection deletions that would violate placement constraints. (ab, ilan)
Improvements Improvements
---------------------- ----------------------
* LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta) * LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta)

View File

@ -29,7 +29,6 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator; import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.cloud.overseer.SliceMutator;
@ -40,6 +39,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -74,8 +74,8 @@ class ExclusiveSliceProperty {
ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) { ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
this.clusterState = clusterState; this.clusterState = clusterState;
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP); String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) { if (!StringUtils.startsWith(tmp, CollectionAdminParams.PROPERTY_PREFIX)) {
tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp; tmp = CollectionAdminParams.PROPERTY_PREFIX + tmp;
} }
this.property = tmp.toLowerCase(Locale.ROOT); this.property = tmp.toLowerCase(Locale.ROOT);
collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);

View File

@ -45,7 +45,6 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.ActiveReplicaWatcher; import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.common.SolrCloseableLatch; import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -61,6 +60,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -141,7 +141,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
} }
List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount, List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount,
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance()) ocmh.overseer.getCoreContainer())
.stream() .stream()
.map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition)) .map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -302,7 +302,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState, public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
String collectionName, ZkNodeProps message, String collectionName, ZkNodeProps message,
EnumMap<Replica.Type, Integer> replicaTypeVsCount, EnumMap<Replica.Type, Integer> replicaTypeVsCount,
PlacementPlugin placementPlugin) throws IOException, InterruptedException { CoreContainer coreContainer) throws IOException, InterruptedException {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false); boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
boolean skipNodeAssignment = message.getBool(CollectionAdminParams.SKIP_NODE_ASSIGNMENT, false); boolean skipNodeAssignment = message.getBool(CollectionAdminParams.SKIP_NODE_ASSIGNMENT, false);
String sliceName = message.getStr(SHARD_ID_PROP); String sliceName = message.getStr(SHARD_ID_PROP);
@ -326,7 +326,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (!skipCreateReplicaInClusterState && !skipNodeAssignment) { if (!skipCreateReplicaInClusterState && !skipNodeAssignment) {
positions = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas, positions = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas,
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager, placementPlugin); numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager, coreContainer);
} }
if (positions == null) { if (positions == null) {

View File

@ -51,6 +51,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.NumberUtils; import org.apache.solr.util.NumberUtils;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -270,7 +271,7 @@ public class Assign {
public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterState, String collectionName, public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
String shard, int nrtReplicas, int tlogReplicas, int pullReplicas, String shard, int nrtReplicas, int tlogReplicas, int pullReplicas,
Object createNodeSet, SolrCloudManager cloudManager, Object createNodeSet, SolrCloudManager cloudManager,
PlacementPlugin placementPlugin) throws IOException, InterruptedException, AssignmentException { CoreContainer coreContainer) throws IOException, InterruptedException, AssignmentException {
log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}" log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}"
, shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet); , shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet);
DocCollection coll = clusterState.getCollection(collectionName); DocCollection coll = clusterState.getCollection(collectionName);
@ -296,7 +297,7 @@ public class Assign {
.assignPullReplicas(pullReplicas) .assignPullReplicas(pullReplicas)
.onNodes(createNodeList) .onNodes(createNodeList)
.build(); .build();
AssignStrategy assignStrategy = createAssignStrategy(placementPlugin, clusterState, coll); AssignStrategy assignStrategy = createAssignStrategy(coreContainer, clusterState, coll);
return assignStrategy.assign(cloudManager, assignRequest); return assignStrategy.assign(cloudManager, assignRequest);
} }
@ -379,9 +380,46 @@ public class Assign {
} }
} }
/**
* Strategy for assigning replicas to nodes.
*/
public interface AssignStrategy { public interface AssignStrategy {
/**
* Assign new replicas to nodes.
* @param solrCloudManager current instance of {@link SolrCloudManager}.
* @param assignRequest assign request.
* @return list of {@link ReplicaPosition}-s for new replicas.
* @throws AssignmentException when assignment request cannot produce any valid assignments.
*/
List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest) List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest)
throws Assign.AssignmentException, IOException, InterruptedException; throws AssignmentException, IOException, InterruptedException;
/**
* Verify that deleting a collection doesn't violate the replica assignment constraints.
* @param solrCloudManager current instance of {@link SolrCloudManager}.
* @param collection collection to delete.
* @throws AssignmentException when deleting the collection would violate replica assignment constraints.
* @throws IOException on general errors.
*/
default void verifyDeleteCollection(SolrCloudManager solrCloudManager, DocCollection collection)
throws AssignmentException, IOException, InterruptedException {
}
/**
* Verify that deleting these replicas doesn't violate the replica assignment constraints.
* @param solrCloudManager current instance of {@link SolrCloudManager}.
* @param collection collection to delete replicas from.
* @param shardName shard name.
* @param replicas replicas to delete.
* @throws AssignmentException when deleting the replicas would violate replica assignment constraints.
* @throws IOException on general errors.
*/
default void verifyDeleteReplicas(SolrCloudManager solrCloudManager, DocCollection collection, String shardName, Set<Replica> replicas)
throws AssignmentException, IOException, InterruptedException {
}
} }
public static class AssignRequest { public static class AssignRequest {
@ -495,7 +533,8 @@ public class Assign {
* <p>If {@link PlacementPlugin} instance is null this call will return {@link LegacyAssignStrategy}, otherwise * <p>If {@link PlacementPlugin} instance is null this call will return {@link LegacyAssignStrategy}, otherwise
* {@link PlacementPluginAssignStrategy} will be used.</p> * {@link PlacementPluginAssignStrategy} will be used.</p>
*/ */
public static AssignStrategy createAssignStrategy(PlacementPlugin placementPlugin, ClusterState clusterState, DocCollection collection) { public static AssignStrategy createAssignStrategy(CoreContainer coreContainer, ClusterState clusterState, DocCollection collection) {
PlacementPlugin placementPlugin = coreContainer.getPlacementPluginFactory().createPluginInstance();
if (placementPlugin != null) { if (placementPlugin != null) {
// If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin) // If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin)
return new PlacementPluginAssignStrategy(collection, placementPlugin); return new PlacementPluginAssignStrategy(collection, placementPlugin);

View File

@ -41,7 +41,6 @@ import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.Aliases;
@ -63,6 +62,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.util.TimeOut; import org.apache.solr.util.TimeOut;
@ -169,8 +169,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
List<ReplicaPosition> replicaPositions = null; List<ReplicaPosition> replicaPositions = null;
try { try {
replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), replicaPositions = buildReplicaPositions(ocmh.overseer.getCoreContainer(), ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName),
message, shardNames, ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance()); message, shardNames);
} catch (Assign.AssignmentException e) { } catch (Assign.AssignmentException e) {
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName); ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results); new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@ -288,10 +288,10 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
} }
} }
private static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState, private static List<ReplicaPosition> buildReplicaPositions(CoreContainer coreContainer, SolrCloudManager cloudManager, ClusterState clusterState,
DocCollection docCollection, DocCollection docCollection,
ZkNodeProps message, ZkNodeProps message,
List<String> shardNames, PlacementPlugin placementPlugin) throws IOException, InterruptedException, Assign.AssignmentException { List<String> shardNames) throws IOException, InterruptedException, Assign.AssignmentException {
final String collectionName = message.getStr(NAME); final String collectionName = message.getStr(NAME);
// look at the replication factor and see if it matches reality // look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores // if it does not, find best nodes to create more cores
@ -330,7 +330,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
.assignPullReplicas(numPullReplicas) .assignPullReplicas(numPullReplicas)
.onNodes(nodeList) .onNodes(nodeList)
.build(); .build();
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(placementPlugin, clusterState, docCollection); Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(coreContainer, clusterState, docCollection);
replicaPositions = assignStrategy.assign(cloudManager, assignRequest); replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
} }
return replicaPositions; return replicaPositions;

View File

@ -92,6 +92,13 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
collection = extCollection; collection = extCollection;
} }
// verify the placement modifications caused by the deletion are allowed
DocCollection coll = state.getCollectionOrNull(collection);
if (coll != null) {
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.overseer.getCoreContainer(), state, coll);
assignStrategy.verifyDeleteCollection(ocmh.cloudManager, coll);
}
final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true); final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true);
boolean removeCounterNode = true; boolean removeCounterNode = true;

View File

@ -18,6 +18,7 @@
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -98,7 +99,7 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
List<ZkNodeProps> sourceReplicas, List<ZkNodeProps> sourceReplicas,
OverseerCollectionMessageHandler ocmh, OverseerCollectionMessageHandler ocmh,
String node, String node,
String async) throws InterruptedException { String async) throws IOException, InterruptedException {
CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size()); CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
for (ZkNodeProps sourceReplica : sourceReplicas) { for (ZkNodeProps sourceReplica : sourceReplicas) {
String coll = sourceReplica.getStr(COLLECTION_PROP); String coll = sourceReplica.getStr(COLLECTION_PROP);

View File

@ -23,6 +23,7 @@ import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES; import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -31,6 +32,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
@ -69,7 +71,7 @@ public class DeleteReplicaCmd implements Cmd {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
void deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete) void deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException { throws KeeperException, IOException, InterruptedException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("deleteReplica() : {}", Utils.toJSONString(message)); log.debug("deleteReplica() : {}", Utils.toJSONString(message));
} }
@ -101,9 +103,7 @@ public class DeleteReplicaCmd implements Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName); "Invalid shard name : " + shard + " in collection : " + collectionName);
} }
deleteCore(clusterState, coll, shard, replicaName, message, results, onComplete, parallel, true);
deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete, parallel);
} }
@ -117,7 +117,7 @@ public class DeleteReplicaCmd implements Cmd {
@SuppressWarnings({"rawtypes"})NamedList results, @SuppressWarnings({"rawtypes"})NamedList results,
Runnable onComplete, Runnable onComplete,
boolean parallel) boolean parallel)
throws KeeperException, InterruptedException { throws KeeperException, IOException, InterruptedException {
ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP); ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
int count = Integer.parseInt(message.getStr(COUNT_PROP)); int count = Integer.parseInt(message.getStr(COUNT_PROP));
String collectionName = message.getStr(COLLECTION_PROP); String collectionName = message.getStr(COLLECTION_PROP);
@ -147,6 +147,17 @@ public class DeleteReplicaCmd implements Cmd {
} }
} }
// verify that all replicas can be deleted
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.overseer.getCoreContainer(), clusterState, coll);
for (Map.Entry<Slice, Set<String>> entry : shardToReplicasMapping.entrySet()) {
Slice shardSlice = entry.getKey();
String shardId = shardSlice.getName();
Set<String> replicaNames = entry.getValue();
Set<Replica> replicas = replicaNames.stream()
.map(name -> shardSlice.getReplica(name)).collect(Collectors.toSet());
assignStrategy.verifyDeleteReplicas(ocmh.cloudManager, coll, shardId, replicas);
}
for (Map.Entry<Slice, Set<String>> entry : shardToReplicasMapping.entrySet()) { for (Map.Entry<Slice, Set<String>> entry : shardToReplicasMapping.entrySet()) {
Slice shardSlice = entry.getKey(); Slice shardSlice = entry.getKey();
String shardId = shardSlice.getName(); String shardId = shardSlice.getName();
@ -154,7 +165,8 @@ public class DeleteReplicaCmd implements Cmd {
//callDeleteReplica on all replicas //callDeleteReplica on all replicas
for (String replica: replicas) { for (String replica: replicas) {
log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count); log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count);
deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel); // don't verify with the placement plugin - we already did it
deleteCore(clusterState, coll, shardId, replica, message, results, onComplete, parallel, false);
} }
results.add("shard_id", shardId); results.add("shard_id", shardId);
results.add("replicas_deleted", replicas); results.add("replicas_deleted", replicas);
@ -212,25 +224,39 @@ public class DeleteReplicaCmd implements Cmd {
} }
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException { void deleteCore(ClusterState clusterState, DocCollection coll,
String shardId,
String replicaName,
ZkNodeProps message,
@SuppressWarnings({"rawtypes"})NamedList results,
Runnable onComplete,
boolean parallel,
boolean verifyPlacement) throws KeeperException, IOException, InterruptedException {
Slice slice = coll.getSlice(shardId);
Replica replica = slice.getReplica(replicaName); Replica replica = slice.getReplica(replicaName);
if (replica == null) { if (replica == null) {
ArrayList<String> l = new ArrayList<>(); ArrayList<String> l = new ArrayList<>();
for (Replica r : slice.getReplicas()) for (Replica r : slice.getReplicas())
l.add(r.getName()); l.add(r.getName());
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " +
shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ',')); shardId + "/" + coll.getName() + " available replicas are " + StrUtils.join(l, ','));
} }
// If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
// on the command. // on the command.
if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) { if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName + "Attempted to remove replica : " + coll.getName() + "/" + shardId + "/" + replicaName +
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'"); " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
} }
// verify that we are allowed to delete this replica
if (verifyPlacement) {
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.overseer.getCoreContainer(), clusterState, coll);
assignStrategy.verifyDeleteReplicas(ocmh.cloudManager, coll, shardId, Set.of(replica));
}
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String asyncId = message.getStr(ASYNC); String asyncId = message.getStr(ASYNC);
@ -256,12 +282,12 @@ public class DeleteReplicaCmd implements Cmd {
shardRequestTracker.processResponses(results, shardHandler, false, null); shardRequestTracker.processResponses(results, shardHandler, false, null);
//check if the core unload removed the corenode zk entry //check if the core unload removed the corenode zk entry
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE; if (ocmh.waitForCoreNodeGone(coll.getName(), shardId, replicaName, 30000)) return Boolean.TRUE;
} }
// try and ensure core info is removed from cluster state // try and ensure core info is removed from cluster state
ocmh.deleteCoreNode(collectionName, replicaName, replica, core); ocmh.deleteCoreNode(coll.getName(), replicaName, replica, core);
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE; if (ocmh.waitForCoreNodeGone(coll.getName(), shardId, replicaName, 30000)) return Boolean.TRUE;
return Boolean.FALSE; return Boolean.FALSE;
} catch (Exception e) { } catch (Exception e) {
results.add("failure", "Could not complete delete " + e.getMessage()); results.add("failure", "Could not complete delete " + e.getMessage());
@ -275,7 +301,7 @@ public class DeleteReplicaCmd implements Cmd {
try { try {
if (!callable.call()) if (!callable.call())
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); "Could not remove replica : " + coll.getName() + "/" + shardId + "/" + replicaName);
} catch (InterruptedException | KeeperException e) { } catch (InterruptedException | KeeperException e) {
throw e; throw e;
} catch (Exception ex) { } catch (Exception ex) {

View File

@ -304,7 +304,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
props.put(CoreAdminParams.NAME, tempCollectionReplica2); props.put(CoreAdminParams.NAME, tempCollectionReplica2);
// copy over property params: // copy over property params:
for (String key : message.keySet()) { for (String key : message.keySet()) {
if (key.startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) { if (key.startsWith(CollectionAdminParams.PROPERTY_PREFIX)) {
props.put(key, message.getStr(key)); props.put(key, message.getStr(key));
} }
} }

View File

@ -127,8 +127,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
public static final String REQUESTID = "requestid"; public static final String REQUESTID = "requestid";
public static final String COLL_PROP_PREFIX = "property.";
public static final String ONLY_IF_DOWN = "onlyIfDown"; public static final String ONLY_IF_DOWN = "onlyIfDown";
public static final String SHARD_UNIQUE = "shardUnique"; public static final String SHARD_UNIQUE = "shardUnique";
@ -561,7 +559,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) { void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
// Now add the property.key=value pairs // Now add the property.key=value pairs
for (String key : message.keySet()) { for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) { if (key.startsWith(CollectionAdminParams.PROPERTY_PREFIX)) {
params.set(key, message.getStr(key)); params.set(key, message.getStr(key));
} }
} }
@ -570,7 +568,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
void addPropertyParams(ZkNodeProps message, Map<String, Object> map) { void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
// Now add the property.key=value pairs // Now add the property.key=value pairs
for (String key : message.keySet()) { for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) { if (key.startsWith(CollectionAdminParams.PROPERTY_PREFIX)) {
map.put(key, message.getStr(key)); map.put(key, message.getStr(key));
} }
} }

View File

@ -121,7 +121,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
.onNodes(new ArrayList<>(ocmh.cloudManager.getClusterStateProvider().getLiveNodes())) .onNodes(new ArrayList<>(ocmh.cloudManager.getClusterStateProvider().getLiveNodes()))
.build(); .build();
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy( Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(), ocmh.overseer.getCoreContainer(),
clusterState, clusterState.getCollection(sourceCollection)); clusterState, clusterState.getCollection(sourceCollection));
targetNode = assignStrategy.assign(ocmh.cloudManager, assignRequest).get(0).node; targetNode = assignStrategy.assign(ocmh.cloudManager, assignRequest).get(0).node;
} }

View File

@ -230,7 +230,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
.onNodes(nodeList) .onNodes(nodeList)
.build(); .build();
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy( Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(), ocmh.overseer.getCoreContainer(),
clusterState, restoreCollection); clusterState, restoreCollection);
List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest); List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);

View File

@ -30,6 +30,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.*; import org.apache.solr.common.cloud.*;
import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams;
@ -304,7 +305,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
// copy over property params: // copy over property params:
for (String key : message.keySet()) { for (String key : message.keySet()) {
if (key.startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) { if (key.startsWith(CollectionAdminParams.PROPERTY_PREFIX)) {
propMap.put(key, message.getStr(key)); propMap.put(key, message.getStr(key));
} }
} }
@ -435,7 +436,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
.onNodes(new ArrayList<>(clusterState.getLiveNodes())) .onNodes(new ArrayList<>(clusterState.getLiveNodes()))
.build(); .build();
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy( Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(), ocmh.overseer.getCoreContainer(),
clusterState, collection); clusterState, collection);
List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest); List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);
t.stop(); t.stop();
@ -472,7 +473,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(CoreAdminParams.NAME, solrCoreName); propMap.put(CoreAdminParams.NAME, solrCoreName);
// copy over property params: // copy over property params:
for (String key : message.keySet()) { for (String key : message.keySet()) {
if (key.startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) { if (key.startsWith(CollectionAdminParams.PROPERTY_PREFIX)) {
propMap.put(key, message.getStr(key)); propMap.put(key, message.getStr(key));
} }
} }

View File

@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection; import org.apache.solr.util.TestInjection;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -121,8 +122,8 @@ public class ReplicaMutator {
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP); String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT); String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) { if (!StringUtils.startsWith(property, CollectionAdminParams.PROPERTY_PREFIX)) {
property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property; property = CollectionAdminParams.PROPERTY_PREFIX + property;
} }
property = property.toLowerCase(Locale.ROOT); property = property.toLowerCase(Locale.ROOT);
String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP); String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
@ -186,8 +187,8 @@ public class ReplicaMutator {
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP); String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP); String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT); String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) { if (StringUtils.startsWith(property, CollectionAdminParams.PROPERTY_PREFIX) == false) {
property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property; property = CollectionAdminParams.PROPERTY_PREFIX + property;
} }
DocCollection collection = clusterState.getCollection(collectionName); DocCollection collection = clusterState.getCollection(collectionName);
@ -319,7 +320,7 @@ public class ReplicaMutator {
replicaProps.put(ZkStateReader.REPLICA_TYPE, oldReplica.getType().toString()); replicaProps.put(ZkStateReader.REPLICA_TYPE, oldReplica.getType().toString());
// Move custom props over. // Move custom props over.
for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) { for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
if (ent.getKey().startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) { if (ent.getKey().startsWith(CollectionAdminParams.PROPERTY_PREFIX)) {
replicaProps.put(ent.getKey(), ent.getValue()); replicaProps.put(ent.getKey(), ent.getValue());
} }
} }

View File

@ -28,7 +28,6 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager; import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.Assign; import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStatesOps; import org.apache.solr.common.cloud.PerReplicaStatesOps;
@ -40,6 +39,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,7 +49,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
public class SliceMutator { public class SliceMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String PREFERRED_LEADER_PROP = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + "preferredleader"; public static final String PREFERRED_LEADER_PROP = CollectionAdminParams.PROPERTY_PREFIX + "preferredleader";
public static final Set<String> SLICE_UNIQUE_BOOLEAN_PROPERTIES = ImmutableSet.of(PREFERRED_LEADER_PROP); public static final Set<String> SLICE_UNIQUE_BOOLEAN_PROPERTIES = ImmutableSet.of(PREFERRED_LEADER_PROP);

View File

@ -41,8 +41,6 @@ import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cluster.events.ClusterEvent; import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener; import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.NodesDownEvent; import org.apache.solr.cluster.events.NodesDownEvent;
import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.ReplicaPosition;
@ -72,18 +70,18 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
private final SolrClient solrClient; private final SolrClient solrClient;
private final SolrCloudManager solrCloudManager; private final SolrCloudManager solrCloudManager;
private final CoreContainer cc;
private State state = State.STOPPED; private State state = State.STOPPED;
private int waitForSecond = DEFAULT_WAIT_FOR_SEC; private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
private ScheduledThreadPoolExecutor waitForExecutor; private ScheduledThreadPoolExecutor waitForExecutor;
private final PlacementPluginFactory<? extends PlacementPluginConfig> placementPluginFactory;
public CollectionsRepairEventListener(CoreContainer cc) { public CollectionsRepairEventListener(CoreContainer cc) {
this.cc = cc;
this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress()); this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
this.solrCloudManager = cc.getZkController().getSolrCloudManager(); this.solrCloudManager = cc.getZkController().getSolrCloudManager();
this.placementPluginFactory = cc.getPlacementPluginFactory();
} }
@VisibleForTesting @VisibleForTesting
@ -169,7 +167,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
.incrementAndGet(); .incrementAndGet();
} }
}); });
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(placementPluginFactory.createPluginInstance(), clusterState, coll); Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(cc, clusterState, coll);
lostReplicas.forEach((shard, types) -> { lostReplicas.forEach((shard, types) -> {
Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder() Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
.forCollection(coll.getName()) .forCollection(coll.getName())

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement;
/**
* Delete collection request.
*/
public interface DeleteCollectionRequest extends ModificationRequest {
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement;
import org.apache.solr.cluster.Replica;
import java.util.Set;
/**
* Delete replicas request.
*/
public interface DeleteReplicasRequest extends ModificationRequest {
Set<Replica> getReplicas();
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement;
import java.util.Set;
/**
* Delete shards request.
*/
public interface DeleteShardsRequest extends ModificationRequest {
Set<String> getShardNames();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement;
import org.apache.solr.cluster.SolrCollection;
/**
* Collection modification request.
*/
public interface ModificationRequest {
/**
* The {@link SolrCollection} to modify.
*/
SolrCollection getCollection();
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement;
import org.apache.solr.cluster.Cluster;
/**
* Placement context makes it easier to pass around and access main placement-related components.
*/
public interface PlacementContext {
/**
* Initial state of the cluster. Note there are {@link java.util.Set}'s and {@link java.util.Map}'s
* accessible from the {@link Cluster} and other reachable instances. These collection will not change
* while the plugin is executing and will be thrown away once the plugin is done. The plugin code can
* therefore modify them if needed.
*/
Cluster getCluster();
/**
* Factory used by the plugin to fetch additional attributes from the cluster nodes, such as
* count of cores, system properties etc..
*/
AttributeFetcher getAttributeFetcher();
/**
* Factory used to create instances of {@link PlacementPlan} to return computed decision.
*/
PlacementPlanFactory getPlacementPlanFactory();
}

View File

@ -0,0 +1,62 @@
package org.apache.solr.cluster.placement;
import java.util.HashMap;
import java.util.Map;
/**
* Exception thrown when a placement modification is rejected by the placement plugin.
* Additional details about the reasons are provided if available
* in {@link #getRejectedModifications()} or in the {@link #toString()} methods.
*/
public class PlacementModificationException extends PlacementException {
private final Map<String, String> rejectedModifications = new HashMap<>();
public PlacementModificationException() {
super();
}
public PlacementModificationException(String message) {
super(message);
}
public PlacementModificationException(String message, Throwable cause) {
super(message, cause);
}
public PlacementModificationException(Throwable cause) {
super(cause);
}
/**
* Add information about the modification that cause this exception.
* @param modification requested modification details
* @param reason reason for rejection
*/
public void addRejectedModification(String modification, String reason) {
rejectedModifications.put(modification, reason);
}
/**
* Return rejected modifications and reasons for rejections.
*/
public Map<String, String> getRejectedModifications() {
return rejectedModifications;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
if (!rejectedModifications.isEmpty()) {
sb.append(": ")
.append(rejectedModifications.size())
.append(" rejections:");
rejectedModifications.forEach((modification, reason) ->
sb.append("\n")
.append(modification)
.append("\t")
.append(reason));
}
return sb.toString();
}
}

View File

@ -17,8 +17,6 @@
package org.apache.solr.cluster.placement; package org.apache.solr.cluster.placement;
import org.apache.solr.cluster.Cluster;
/** /**
* <p>Implemented by external plugins to control replica placement and movement on the search cluster (as well as other things * <p>Implemented by external plugins to control replica placement and movement on the search cluster (as well as other things
* such as cluster elasticity?) when cluster changes are required (initiated elsewhere, most likely following a Collection * such as cluster elasticity?) when cluster changes are required (initiated elsewhere, most likely following a Collection
@ -36,16 +34,21 @@ public interface PlacementPlugin {
* *
* <p>Configuration is passed upon creation of a new instance of this class by {@link PlacementPluginFactory#createPluginInstance}. * <p>Configuration is passed upon creation of a new instance of this class by {@link PlacementPluginFactory#createPluginInstance}.
* *
* @param cluster initial state of the cluster. Note there are {@link java.util.Set}'s and {@link java.util.Map}'s
* accessible from the {@link Cluster} and other reachable instances. These collection will not change
* while the plugin is executing and will be thrown away once the plugin is done. The plugin code can
* therefore modify them if needed.
* @param placementRequest request for placing new replicas or moving existing replicas on the cluster. * @param placementRequest request for placing new replicas or moving existing replicas on the cluster.
* @param attributeFetcher Factory used by the plugin to fetch additional attributes from the cluster nodes, such as
* count of coresm ssytem properties etc..
* @param placementPlanFactory Factory used to create instances of {@link PlacementPlan} to return computed decision.
* @return plan satisfying the placement request. * @return plan satisfying the placement request.
*/ */
PlacementPlan computePlacement(Cluster cluster, PlacementRequest placementRequest, AttributeFetcher attributeFetcher, PlacementPlan computePlacement(PlacementRequest placementRequest, PlacementContext placementContext) throws PlacementException, InterruptedException;
PlacementPlanFactory placementPlanFactory) throws PlacementException, InterruptedException;
/**
* Verify that a collection layout modification doesn't violate constraints on replica placements
* required by this plugin. Default implementation is a no-op (any modifications are allowed).
* @param modificationRequest modification request.
* @param placementContext placement context.
* @throws PlacementModificationException if the requested modification would violate replica
* placement constraints.
*/
default void verifyAllowedModification(ModificationRequest modificationRequest, PlacementContext placementContext)
throws PlacementModificationException, InterruptedException {
}
} }

View File

@ -30,12 +30,7 @@ import java.util.Set;
* <p>The set of {@link Node}s on which the replicas should be placed * <p>The set of {@link Node}s on which the replicas should be placed
* is specified (defaults to being equal to the set returned by {@link Cluster#getLiveNodes()}). * is specified (defaults to being equal to the set returned by {@link Cluster#getLiveNodes()}).
*/ */
public interface PlacementRequest { public interface PlacementRequest extends ModificationRequest {
/**
* The {@link SolrCollection} to add {@link Replica}(s) to.
*/
SolrCollection getCollection();
/** /**
* <p>Shard name(s) for which new replicas placement should be computed. The shard(s) might exist or not (that's why this * <p>Shard name(s) for which new replicas placement should be computed. The shard(s) might exist or not (that's why this
* method returns a {@link Set} of {@link String}'s and not directly a set of {@link Shard} instances). * method returns a {@link Set} of {@link String}'s and not directly a set of {@link Shard} instances).

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement.impl;
import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.DeleteCollectionRequest;
import org.apache.solr.cluster.placement.DeleteReplicasRequest;
import org.apache.solr.cluster.placement.DeleteShardsRequest;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import java.util.HashSet;
import java.util.Set;
/**
* Helper class to create modification request instances.
*/
public class ModificationRequestImpl {
public static DeleteCollectionRequest createDeleteCollectionRequest(DocCollection docCollection) {
SolrCollection solrCollection = SimpleClusterAbstractionsImpl.SolrCollectionImpl.fromDocCollection(docCollection);
return () -> solrCollection;
}
/**
* Create a delete replicas request.
* @param collection collection to delete replicas from
* @param replicas replicas to delete
*/
public static DeleteReplicasRequest createDeleteReplicasRequest(SolrCollection collection, Set<Replica> replicas) {
return new DeleteReplicasRequest() {
@Override
public Set<Replica> getReplicas() {
return replicas;
}
@Override
public SolrCollection getCollection() {
return collection;
}
@Override
public String toString() {
return "DeleteReplicasRequest{collection=" + collection.getName() +
",replicas=" + replicas;
}
};
}
/**
* Create a delete replicas request using the internal Solr API.
* @param docCollection Solr collection.
* @param shardName shard name.
* @param replicas Solr replicas (belonging to the shard).
*/
public static DeleteReplicasRequest createDeleteReplicasRequest(DocCollection docCollection, String shardName, Set<org.apache.solr.common.cloud.Replica> replicas) {
SolrCollection solrCollection = SimpleClusterAbstractionsImpl.SolrCollectionImpl.fromDocCollection(docCollection);
Shard shard = solrCollection.getShard(shardName);
Slice slice = docCollection.getSlice(shardName);
Set<Replica> solrReplicas = new HashSet<>();
replicas.forEach(replica -> {
solrReplicas.add(shard.getReplica(replica.getName()));
});
return createDeleteReplicasRequest(solrCollection, solrReplicas);
}
public static DeleteShardsRequest createDeleteShardsRequest(SolrCollection collection, Set<String> shardNames) {
return new DeleteShardsRequest() {
@Override
public Set<String> getShardNames() {
return shardNames;
}
@Override
public SolrCollection getCollection() {
return collection;
}
@Override
public String toString() {
return "DeleteShardsRequest{collection=" + collection.getName() +
",shards=" + shardNames;
}
};
}
public static DeleteShardsRequest createDeleteShardsRequest(DocCollection docCollection, Set<String> shardNames) {
SolrCollection solrCollection = SimpleClusterAbstractionsImpl.SolrCollectionImpl.fromDocCollection(docCollection);
return createDeleteShardsRequest(solrCollection, shardNames);
}
}

View File

@ -19,15 +19,19 @@ package org.apache.solr.cluster.placement.impl;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.api.collections.Assign; import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.SolrCollection; import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.DeleteCollectionRequest;
import org.apache.solr.cluster.placement.DeleteReplicasRequest;
import org.apache.solr.cluster.placement.PlacementContext;
import org.apache.solr.cluster.placement.PlacementException; import org.apache.solr.cluster.placement.PlacementException;
import org.apache.solr.cluster.placement.PlacementPlugin; import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPlan; import org.apache.solr.cluster.placement.PlacementPlan;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.ReplicaPosition;
/** /**
@ -35,8 +39,6 @@ import org.apache.solr.common.cloud.ReplicaPosition;
*/ */
public class PlacementPluginAssignStrategy implements Assign.AssignStrategy { public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
private static final PlacementPlanFactoryImpl PLACEMENT_PLAN_FACTORY = new PlacementPlanFactoryImpl();
private final PlacementPlugin plugin; private final PlacementPlugin plugin;
private final DocCollection collection; private final DocCollection collection;
@ -53,18 +55,40 @@ public class PlacementPluginAssignStrategy implements Assign.AssignStrategy {
public List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, Assign.AssignRequest assignRequest) public List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, Assign.AssignRequest assignRequest)
throws Assign.AssignmentException, IOException, InterruptedException { throws Assign.AssignmentException, IOException, InterruptedException {
Cluster cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(solrCloudManager); PlacementContext placementContext = new SimplePlacementContextImpl(solrCloudManager);
SolrCollection solrCollection = new SimpleClusterAbstractionsImpl.SolrCollectionImpl(collection); SolrCollection solrCollection = placementContext.getCluster().getCollection(collection.getName());
PlacementRequestImpl placementRequest = PlacementRequestImpl.toPlacementRequest(cluster, solrCollection, assignRequest); PlacementRequestImpl placementRequest = PlacementRequestImpl.toPlacementRequest(placementContext.getCluster(), solrCollection, assignRequest);
final PlacementPlan placementPlan; final PlacementPlan placementPlan;
try { try {
placementPlan = plugin.computePlacement(cluster, placementRequest, new AttributeFetcherImpl(solrCloudManager), PLACEMENT_PLAN_FACTORY); placementPlan = plugin.computePlacement(placementRequest, placementContext);
} catch (PlacementException pe) { } catch (PlacementException pe) {
throw new Assign.AssignmentException(pe); throw new Assign.AssignmentException(pe);
} }
return ReplicaPlacementImpl.toReplicaPositions(placementPlan.getReplicaPlacements()); return ReplicaPlacementImpl.toReplicaPositions(placementPlan.getReplicaPlacements());
} }
@Override
public void verifyDeleteCollection(SolrCloudManager solrCloudManager, DocCollection collection) throws Assign.AssignmentException, IOException, InterruptedException {
PlacementContext placementContext = new SimplePlacementContextImpl(solrCloudManager);
DeleteCollectionRequest modificationRequest = ModificationRequestImpl.createDeleteCollectionRequest(collection);
try {
plugin.verifyAllowedModification(modificationRequest, placementContext);
} catch (PlacementException pe) {
throw new Assign.AssignmentException(pe);
}
}
@Override
public void verifyDeleteReplicas(SolrCloudManager solrCloudManager, DocCollection collection, String shardId, Set<Replica> replicas) throws Assign.AssignmentException, IOException, InterruptedException {
PlacementContext placementContext = new SimplePlacementContextImpl(solrCloudManager);
DeleteReplicasRequest modificationRequest = ModificationRequestImpl.createDeleteReplicasRequest(collection, shardId, replicas);
try {
plugin.verifyAllowedModification(modificationRequest, placementContext);
} catch (PlacementException pe) {
throw new Assign.AssignmentException(pe);
}
}
} }

View File

@ -26,9 +26,12 @@ import java.util.function.Function;
*/ */
public class ReplicaMetricImpl<T> extends MetricImpl<T> implements ReplicaMetric<T> { public class ReplicaMetricImpl<T> extends MetricImpl<T> implements ReplicaMetric<T> {
/** Replica index size in GB. */
public static final ReplicaMetricImpl<Double> INDEX_SIZE_GB = new ReplicaMetricImpl<>("sizeGB", "INDEX.sizeInBytes", BYTES_TO_GB_CONVERTER); public static final ReplicaMetricImpl<Double> INDEX_SIZE_GB = new ReplicaMetricImpl<>("sizeGB", "INDEX.sizeInBytes", BYTES_TO_GB_CONVERTER);
/** 1-min query rate of the /select handler. */
public static final ReplicaMetricImpl<Double> QUERY_RATE_1MIN = new ReplicaMetricImpl<>("queryRate", "QUERY./select.requestTimes:1minRate"); public static final ReplicaMetricImpl<Double> QUERY_RATE_1MIN = new ReplicaMetricImpl<>("queryRate", "QUERY./select.requestTimes:1minRate");
/** 1-min update rate of the /update handler. */
public static final ReplicaMetricImpl<Double> UPDATE_RATE_1MIN = new ReplicaMetricImpl<>("updateRate", "UPDATE./update.requestTimes:1minRate"); public static final ReplicaMetricImpl<Double> UPDATE_RATE_1MIN = new ReplicaMetricImpl<>("updateRate", "UPDATE./update.requestTimes:1minRate");
public ReplicaMetricImpl(String name, String internalName) { public ReplicaMetricImpl(String name, String internalName) {

View File

@ -0,0 +1,40 @@
package org.apache.solr.cluster.placement.impl;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.placement.AttributeFetcher;
import org.apache.solr.cluster.placement.PlacementContext;
import org.apache.solr.cluster.placement.PlacementPlanFactory;
import java.io.IOException;
/**
* Implementation of {@link PlacementContext} that uses {@link SimpleClusterAbstractionsImpl}
* to create components necessary for the placement plugins to use.
*/
public class SimplePlacementContextImpl implements PlacementContext {
private final Cluster cluster;
private final AttributeFetcher attributeFetcher;
private final PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl();
public SimplePlacementContextImpl(SolrCloudManager solrCloudManager) throws IOException {
cluster = new SimpleClusterAbstractionsImpl.ClusterImpl(solrCloudManager);
attributeFetcher = new AttributeFetcherImpl(solrCloudManager);
}
@Override
public Cluster getCluster() {
return cluster;
}
@Override
public AttributeFetcher getAttributeFetcher() {
return attributeFetcher;
}
@Override
public PlacementPlanFactory getPlacementPlanFactory() {
return placementPlanFactory;
}
}

View File

@ -20,12 +20,15 @@ package org.apache.solr.cluster.placement.plugins;
import org.apache.solr.cluster.placement.PlacementPluginConfig; import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.annotation.JsonProperty;
import java.util.Map;
import java.util.Objects;
/** /**
* Configuration bean for {@link AffinityPlacementFactory}. * Configuration bean for {@link AffinityPlacementFactory}.
*/ */
public class AffinityPlacementConfig implements PlacementPluginConfig { public class AffinityPlacementConfig implements PlacementPluginConfig {
public static final AffinityPlacementConfig DEFAULT = new AffinityPlacementConfig(); public static final AffinityPlacementConfig DEFAULT = new AffinityPlacementConfig(20L, 100L);
/** /**
* If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions. * If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions.
@ -43,14 +46,43 @@ public class AffinityPlacementConfig implements PlacementPluginConfig {
@JsonProperty @JsonProperty
public long prioritizedFreeDiskGB; public long prioritizedFreeDiskGB;
// no-arg public constructor required for deserialization /**
* This property defines an additional constraint that primary collections (keys) should be
* located on the same nodes as the secondary collections (values). The plugin will assume
* that the secondary collection replicas are already in place and ignore candidate nodes where
* they are not already present.
*/
@JsonProperty
public Map<String, String> withCollections;
/**
* Zero-arguments public constructor required for deserialization - don't use.
*/
public AffinityPlacementConfig() { public AffinityPlacementConfig() {
minimalFreeDiskGB = 20L; this(0L, 0L);
prioritizedFreeDiskGB = 100L;
} }
/**
* Configuration for the {@link AffinityPlacementFactory}.
* @param minimalFreeDiskGB minimal free disk GB.
* @param prioritizedFreeDiskGB prioritized free disk GB.
*/
public AffinityPlacementConfig(long minimalFreeDiskGB, long prioritizedFreeDiskGB) { public AffinityPlacementConfig(long minimalFreeDiskGB, long prioritizedFreeDiskGB) {
this(minimalFreeDiskGB, prioritizedFreeDiskGB, Map.of());
}
/**
* Configuration for the {@link AffinityPlacementFactory}.
* @param minimalFreeDiskGB minimal free disk GB.
* @param prioritizedFreeDiskGB prioritized free disk GB.
* @param withCollections configuration of co-located collections: keys are
* primary collection names and values are secondary
* collection names.
*/
public AffinityPlacementConfig(long minimalFreeDiskGB, long prioritizedFreeDiskGB, Map<String, String> withCollections) {
this.minimalFreeDiskGB = minimalFreeDiskGB; this.minimalFreeDiskGB = minimalFreeDiskGB;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB; this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
Objects.requireNonNull(withCollections);
this.withCollections = withCollections;
} }
} }

View File

@ -27,8 +27,10 @@ import org.apache.solr.common.util.SuppressForbidden;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -147,7 +149,7 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
@Override @Override
public PlacementPlugin createPluginInstance() { public PlacementPlugin createPluginInstance() {
return new AffinityPlacementPlugin(config.minimalFreeDiskGB, config.prioritizedFreeDiskGB); return new AffinityPlacementPlugin(config.minimalFreeDiskGB, config.prioritizedFreeDiskGB, config.withCollections);
} }
@Override @Override
@ -171,14 +173,29 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
private final long prioritizedFreeDiskGB; private final long prioritizedFreeDiskGB;
// primary to secondary (1:1)
private final Map<String, String> withCollections;
// secondary to primary (1:N)
private final Map<String, Set<String>> colocatedWith;
private final Random replicaPlacementRandom = new Random(); // ok even if random sequence is predictable. private final Random replicaPlacementRandom = new Random(); // ok even if random sequence is predictable.
/** /**
* The factory has decoded the configuration for the plugin instance and passes it the parameters it needs. * The factory has decoded the configuration for the plugin instance and passes it the parameters it needs.
*/ */
private AffinityPlacementPlugin(long minimalFreeDiskGB, long prioritizedFreeDiskGB) { private AffinityPlacementPlugin(long minimalFreeDiskGB, long prioritizedFreeDiskGB, Map<String, String> withCollections) {
this.minimalFreeDiskGB = minimalFreeDiskGB; this.minimalFreeDiskGB = minimalFreeDiskGB;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB; this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
Objects.requireNonNull(withCollections, "withCollections must not be null");
this.withCollections = withCollections;
if (withCollections.isEmpty()) {
colocatedWith = Map.of();
} else {
colocatedWith = new HashMap<>();
withCollections.forEach((primary, secondary) ->
colocatedWith.computeIfAbsent(secondary, s -> new HashSet<>())
.add(primary));
}
// We make things reproducible in tests by using test seed if any // We make things reproducible in tests by using test seed if any
String seed = System.getProperty("tests.seed"); String seed = System.getProperty("tests.seed");
@ -187,13 +204,16 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
} }
} }
@Override
@SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher, public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException {
PlacementPlanFactory placementPlanFactory) throws PlacementException {
Set<Node> nodes = request.getTargetNodes(); Set<Node> nodes = request.getTargetNodes();
SolrCollection solrCollection = request.getCollection(); SolrCollection solrCollection = request.getCollection();
nodes = filterNodesWithCollection(placementContext.getCluster(), request, nodes);
// Request all needed attributes // Request all needed attributes
AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
attributeFetcher.requestNodeSystemProperty(AVAILABILITY_ZONE_SYSPROP).requestNodeSystemProperty(REPLICA_TYPE_SYSPROP); attributeFetcher.requestNodeSystemProperty(AVAILABILITY_ZONE_SYSPROP).requestNodeSystemProperty(REPLICA_TYPE_SYSPROP);
attributeFetcher attributeFetcher
.requestNodeMetric(NodeMetricImpl.NUM_CORES) .requestNodeMetric(NodeMetricImpl.NUM_CORES)
@ -238,11 +258,94 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
// failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node). // failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node).
for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType), makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType),
attrValues, replicaTypeToNodes, nodesWithReplicas, coresOnNodes, placementPlanFactory, replicaPlacements); attrValues, replicaTypeToNodes, nodesWithReplicas, coresOnNodes, placementContext.getPlacementPlanFactory(), replicaPlacements);
} }
} }
return placementPlanFactory.createPlacementPlan(request, replicaPlacements); return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements);
}
@Override
public void verifyAllowedModification(ModificationRequest modificationRequest, PlacementContext placementContext) throws PlacementModificationException, InterruptedException {
if (modificationRequest instanceof DeleteShardsRequest) {
log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest);
} else if (modificationRequest instanceof DeleteCollectionRequest) {
verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext);
} else if (modificationRequest instanceof DeleteReplicasRequest) {
verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext);
} else {
log.warn("unsupported request type, skipping: {}", modificationRequest);
}
}
private void verifyDeleteCollection(DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext) throws PlacementModificationException, InterruptedException {
Cluster cluster = placementContext.getCluster();
Set<String> colocatedCollections = colocatedWith.getOrDefault(deleteCollectionRequest.getCollection().getName(), Set.of());
for (String primaryName : colocatedCollections) {
try {
if (cluster.getCollection(primaryName) != null) {
// still exists
throw new PlacementModificationException("colocated collection " + primaryName +
" of " + deleteCollectionRequest.getCollection().getName() + " still present");
}
} catch (IOException e) {
throw new PlacementModificationException("failed to retrieve colocated collection information", e);
}
}
}
private void verifyDeleteReplicas(DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext) throws PlacementModificationException, InterruptedException {
Cluster cluster = placementContext.getCluster();
SolrCollection secondaryCollection = deleteReplicasRequest.getCollection();
Set<String> colocatedCollections = colocatedWith.get(secondaryCollection.getName());
if (colocatedCollections == null) {
return;
}
Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new HashMap<>();
secondaryCollection.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
secondaryNodeShardReplicas.computeIfAbsent(replica.getNode(), n -> new HashMap<>())
.computeIfAbsent(replica.getShard().getShardName(), s -> new AtomicInteger())
.incrementAndGet();
}));
// find the colocated-with collections
Map<Node, Set<String>> colocatingNodes = new HashMap<>();
try {
for (String colocatedCollection : colocatedCollections) {
SolrCollection coll = cluster.getCollection(colocatedCollection);
coll.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
colocatingNodes.computeIfAbsent(replica.getNode(), n -> new HashSet<>())
.add(coll.getName());
}));
}
} catch (IOException ioe) {
throw new PlacementModificationException("failed to retrieve colocated collection information", ioe);
}
PlacementModificationException exception = null;
for (Replica replica : deleteReplicasRequest.getReplicas()) {
if (!colocatingNodes.containsKey(replica.getNode())) {
continue;
}
// check that there will be at least one replica remaining
AtomicInteger secondaryCount = secondaryNodeShardReplicas
.getOrDefault(replica.getNode(), Map.of())
.getOrDefault(replica.getShard().getShardName(), new AtomicInteger());
if (secondaryCount.get() > 1) {
// we can delete it - record the deletion
secondaryCount.decrementAndGet();
continue;
}
// fail - this replica cannot be removed
if (exception == null) {
exception = new PlacementModificationException("delete replica(s) rejected");
}
exception.addRejectedModification(replica.toString(), "co-located with replicas of " + colocatingNodes.get(replica.getNode()));
}
if (exception != null) {
throw exception;
}
} }
private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) { private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) {
@ -467,7 +570,7 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
if (candidateAzEntries == null) { if (candidateAzEntries == null) {
// This can happen because not enough nodes for the placement request or already too many nodes with replicas of // This can happen because not enough nodes for the placement request or already too many nodes with replicas of
// the shard that can't accept new replicas or not enough nodes with enough free disk space. // the shard that can't accept new replicas or not enough nodes with enough free disk space.
throw new PlacementException("Not enough nodes to place " + numReplicas + " replica(s) of type " + replicaType + throw new PlacementException("Not enough eligible nodes to place " + numReplicas + " replica(s) of type " + replicaType +
" for shard " + shardName + " of collection " + solrCollection.getName()); " for shard " + shardName + " of collection " + solrCollection.getName());
} }
@ -529,6 +632,32 @@ public class AffinityPlacementFactory implements PlacementPluginFactory<Affinity
} }
} }
private Set<Node> filterNodesWithCollection(Cluster cluster, PlacementRequest request, Set<Node> initialNodes) throws PlacementException {
// if there's a `withCollection` constraint for this collection then remove nodes
// that are not eligible
String withCollectionName = withCollections.get(request.getCollection().getName());
if (withCollectionName == null) {
return initialNodes;
}
SolrCollection withCollection;
try {
withCollection = cluster.getCollection(withCollectionName);
} catch (Exception e) {
throw new PlacementException("Error getting info of withCollection=" + withCollectionName, e);
}
Set<Node> withCollectionNodes = new HashSet<>();
withCollection.shards().forEach(s -> s.replicas().forEach(r -> withCollectionNodes.add(r.getNode())));
if (withCollectionNodes.isEmpty()) {
throw new PlacementException("Collection " + withCollection + " defined in `withCollection` has no replicas on eligible nodes.");
}
HashSet<Node> filteredNodes = new HashSet<>(initialNodes);
filteredNodes.retainAll(withCollectionNodes);
if (filteredNodes.isEmpty()) {
throw new PlacementException("Collection " + withCollection + " defined in `withCollection` has no replicas on eligible nodes.");
}
return filteredNodes;
}
/** /**
* Comparator implementing the placement strategy based on free space and number of cores: we want to place new replicas * Comparator implementing the placement strategy based on free space and number of cores: we want to place new replicas
* on nodes with the less number of cores, but only if they do have enough disk space (expressed as a threshold value). * on nodes with the less number of cores, but only if they do have enough disk space (expressed as a threshold value).

View File

@ -26,7 +26,6 @@ import java.util.Map;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap; import com.google.common.collect.TreeMultimap;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node; import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica; import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.SolrCollection; import org.apache.solr.cluster.SolrCollection;
@ -50,15 +49,15 @@ public class MinimizeCoresPlacementFactory implements PlacementPluginFactory<Pla
static private class MinimizeCoresPlacementPlugin implements PlacementPlugin { static private class MinimizeCoresPlacementPlugin implements PlacementPlugin {
@Override
@SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.") @SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher, public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException {
PlacementPlanFactory placementPlanFactory) throws PlacementException {
int totalReplicasPerShard = 0; int totalReplicasPerShard = 0;
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
totalReplicasPerShard += request.getCountReplicasToCreate(rt); totalReplicasPerShard += request.getCountReplicasToCreate(rt);
} }
if (cluster.getLiveNodes().size() < totalReplicasPerShard) { if (placementContext.getCluster().getLiveNodes().size() < totalReplicasPerShard) {
throw new PlacementException("Cluster size too small for number of replicas per shard"); throw new PlacementException("Cluster size too small for number of replicas per shard");
} }
@ -67,6 +66,7 @@ public class MinimizeCoresPlacementFactory implements PlacementPluginFactory<Pla
Set<Node> nodes = request.getTargetNodes(); Set<Node> nodes = request.getTargetNodes();
AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
attributeFetcher.requestNodeMetric(NodeMetricImpl.NUM_CORES); attributeFetcher.requestNodeMetric(NodeMetricImpl.NUM_CORES);
attributeFetcher.fetchFrom(nodes); attributeFetcher.fetchFrom(nodes);
AttributeValues attrValues = attributeFetcher.fetchAttributes(); AttributeValues attrValues = attributeFetcher.fetchAttributes();
@ -106,11 +106,11 @@ public class MinimizeCoresPlacementFactory implements PlacementPluginFactory<Pla
} }
for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) { for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
placeReplicas(request.getCollection(), nodeEntriesToAssign, placementPlanFactory, replicaPlacements, shardName, request, replicaType); placeReplicas(request.getCollection(), nodeEntriesToAssign, placementContext.getPlacementPlanFactory(), replicaPlacements, shardName, request, replicaType);
} }
} }
return placementPlanFactory.createPlacementPlan(request, replicaPlacements); return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements);
} }
private void placeReplicas(SolrCollection solrCollection, ArrayList<Map.Entry<Integer, Node>> nodeEntriesToAssign, private void placeReplicas(SolrCollection solrCollection, ArrayList<Map.Entry<Integer, Node>> nodeEntriesToAssign,

View File

@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node; import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica; import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.SolrCollection; import org.apache.solr.cluster.SolrCollection;
@ -53,14 +52,14 @@ public class RandomPlacementFactory implements PlacementPluginFactory<PlacementP
} }
} }
public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher, @Override
PlacementPlanFactory placementPlanFactory) throws PlacementException { public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException {
int totalReplicasPerShard = 0; int totalReplicasPerShard = 0;
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
totalReplicasPerShard += request.getCountReplicasToCreate(rt); totalReplicasPerShard += request.getCountReplicasToCreate(rt);
} }
if (cluster.getLiveNodes().size() < totalReplicasPerShard) { if (placementContext.getCluster().getLiveNodes().size() < totalReplicasPerShard) {
throw new PlacementException("Cluster size too small for number of replicas per shard"); throw new PlacementException("Cluster size too small for number of replicas per shard");
} }
@ -69,15 +68,15 @@ public class RandomPlacementFactory implements PlacementPluginFactory<PlacementP
// Now place randomly all replicas of all shards on available nodes // Now place randomly all replicas of all shards on available nodes
for (String shardName : request.getShardNames()) { for (String shardName : request.getShardNames()) {
// Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes // Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes
ArrayList<Node> nodesToAssign = new ArrayList<>(cluster.getLiveNodes()); ArrayList<Node> nodesToAssign = new ArrayList<>(placementContext.getCluster().getLiveNodes());
Collections.shuffle(nodesToAssign, replicaPlacementRandom); Collections.shuffle(nodesToAssign, replicaPlacementRandom);
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) { for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
placeForReplicaType(request.getCollection(), nodesToAssign, placementPlanFactory, replicaPlacements, shardName, request, rt); placeForReplicaType(request.getCollection(), nodesToAssign, placementContext.getPlacementPlanFactory(), replicaPlacements, shardName, request, rt);
} }
} }
return placementPlanFactory.createPlacementPlan(request, replicaPlacements); return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements);
} }
private void placeForReplicaType(SolrCollection solrCollection, ArrayList<Node> nodesToAssign, PlacementPlanFactory placementPlanFactory, private void placeForReplicaType(SolrCollection solrCollection, ArrayList<Node> nodesToAssign, PlacementPlanFactory placementPlanFactory,

View File

@ -95,7 +95,7 @@ import static org.apache.solr.client.solrj.response.RequestStatusState.NOT_FOUND
import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING; import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING;
import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED; import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED;
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION; import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_PROP_PREFIX; import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_PREFIX;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE; import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE;
@ -486,7 +486,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (shardsParam == null) h.copyFromClusterProp(props, NUM_SLICES); if (shardsParam == null) h.copyFromClusterProp(props, NUM_SLICES);
for (String prop : ImmutableSet.of(NRT_REPLICAS, PULL_REPLICAS, TLOG_REPLICAS)) for (String prop : ImmutableSet.of(NRT_REPLICAS, PULL_REPLICAS, TLOG_REPLICAS))
h.copyFromClusterProp(props, prop); h.copyFromClusterProp(props, prop);
copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); copyPropertiesWithPrefix(req.getParams(), props, PROPERTY_PREFIX);
return copyPropertiesWithPrefix(req.getParams(), props, "router."); return copyPropertiesWithPrefix(req.getParams(), props, "router.");
}), }),
@ -737,7 +737,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
SPLIT_FUZZ, SPLIT_FUZZ,
SPLIT_BY_PREFIX, SPLIT_BY_PREFIX,
FOLLOW_ALIASES); FOLLOW_ALIASES);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); return copyPropertiesWithPrefix(req.getParams(), map, PROPERTY_PREFIX);
}), }),
DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> { DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> {
Map<String, Object> map = copy(req.getParams().required(), null, Map<String, Object> map = copy(req.getParams().required(), null,
@ -775,7 +775,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CREATE_NODE_SET, CREATE_NODE_SET,
WAIT_FOR_FINAL_STATE, WAIT_FOR_FINAL_STATE,
FOLLOW_ALIASES); FOLLOW_ALIASES);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); return copyPropertiesWithPrefix(req.getParams(), map, PROPERTY_PREFIX);
}), }),
DELETEREPLICA_OP(DELETEREPLICA, (req, rsp, h) -> { DELETEREPLICA_OP(DELETEREPLICA, (req, rsp, h) -> {
Map<String, Object> map = copy(req.getParams().required(), null, Map<String, Object> map = copy(req.getParams().required(), null,
@ -917,7 +917,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CREATE_NODE_SET, CREATE_NODE_SET,
FOLLOW_ALIASES, FOLLOW_ALIASES,
SKIP_NODE_ASSIGNMENT); SKIP_NODE_ASSIGNMENT);
return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); return copyPropertiesWithPrefix(req.getParams(), props, PROPERTY_PREFIX);
}), }),
OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> new LinkedHashMap<>()), OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> new LinkedHashMap<>()),
@ -958,8 +958,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
PROPERTY_VALUE_PROP); PROPERTY_VALUE_PROP);
copy(req.getParams(), map, SHARD_UNIQUE); copy(req.getParams(), map, SHARD_UNIQUE);
String property = (String) map.get(PROPERTY_PROP); String property = (String) map.get(PROPERTY_PROP);
if (!property.startsWith(COLL_PROP_PREFIX)) { if (!property.startsWith(PROPERTY_PREFIX)) {
property = COLL_PROP_PREFIX + property; property = PROPERTY_PREFIX + property;
} }
boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE)); boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE));
@ -992,8 +992,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
PROPERTY_PROP); PROPERTY_PROP);
Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE)); Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE));
String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT); String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (!StringUtils.startsWith(prop, COLL_PROP_PREFIX)) { if (!StringUtils.startsWith(prop, PROPERTY_PREFIX)) {
prop = COLL_PROP_PREFIX + prop; prop = PROPERTY_PREFIX + prop;
} }
if (!shardUnique && !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) { if (!shardUnique && !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
@ -1011,7 +1011,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// XXX should this command support followAliases? // XXX should this command support followAliases?
MODIFYCOLLECTION_OP(MODIFYCOLLECTION, (req, rsp, h) -> { MODIFYCOLLECTION_OP(MODIFYCOLLECTION, (req, rsp, h) -> {
Map<String, Object> m = copy(req.getParams(), null, CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES); Map<String, Object> m = copy(req.getParams(), null, CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES);
copyPropertiesWithPrefix(req.getParams(), m, COLL_PROP_PREFIX); copyPropertiesWithPrefix(req.getParams(), m, PROPERTY_PREFIX);
if (m.isEmpty()) { if (m.isEmpty()) {
throw new SolrException(ErrorCode.BAD_REQUEST, throw new SolrException(ErrorCode.BAD_REQUEST,
formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString())); formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
@ -1139,7 +1139,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// from CREATE_OP: // from CREATE_OP:
copy(req.getParams(), params, COLL_CONF, REPLICATION_FACTOR, NRT_REPLICAS, TLOG_REPLICAS, copy(req.getParams(), params, COLL_CONF, REPLICATION_FACTOR, NRT_REPLICAS, TLOG_REPLICAS,
PULL_REPLICAS, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE); PULL_REPLICAS, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE);
copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX); copyPropertiesWithPrefix(req.getParams(), params, PROPERTY_PREFIX);
return params; return params;
}), }),
CREATESNAPSHOT_OP(CREATESNAPSHOT, (req, rsp, h) -> { CREATESNAPSHOT_OP(CREATESNAPSHOT, (req, rsp, h) -> {

View File

@ -22,6 +22,7 @@ import org.apache.solr.cluster.placement.impl.AttributeFetcherImpl;
import org.apache.solr.cluster.placement.impl.AttributeValuesImpl; import org.apache.solr.cluster.placement.impl.AttributeValuesImpl;
import org.apache.solr.cluster.placement.impl.CollectionMetricsBuilder; import org.apache.solr.cluster.placement.impl.CollectionMetricsBuilder;
import org.apache.solr.cluster.placement.impl.NodeMetricImpl; import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
import org.apache.solr.cluster.placement.impl.PlacementPlanFactoryImpl;
import org.apache.solr.cluster.placement.impl.ReplicaMetricImpl; import org.apache.solr.cluster.placement.impl.ReplicaMetricImpl;
import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Pair;
import org.junit.Assert; import org.junit.Assert;
@ -92,6 +93,29 @@ public class Builders {
return clusterCollections; return clusterCollections;
} }
private static final PlacementPlanFactory PLACEMENT_PLAN_FACTORY = new PlacementPlanFactoryImpl();
public PlacementContext buildPlacementContext() {
Cluster cluster = build();
AttributeFetcher attributeFetcher = buildAttributeFetcher();
return new PlacementContext() {
@Override
public Cluster getCluster() {
return cluster;
}
@Override
public AttributeFetcher getAttributeFetcher() {
return attributeFetcher;
}
@Override
public PlacementPlanFactory getPlacementPlanFactory() {
return PLACEMENT_PLAN_FACTORY;
}
};
}
public AttributeFetcher buildAttributeFetcher() { public AttributeFetcher buildAttributeFetcher() {
Map<String, Map<Node, String>> sysprops = new HashMap<>(); Map<String, Map<Node, String>> sysprops = new HashMap<>();
Map<NodeMetric<?>, Map<Node, Object>> metrics = new HashMap<>(); Map<NodeMetric<?>, Map<Node, Object>> metrics = new HashMap<>();

View File

@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -71,7 +72,7 @@ import static java.util.Collections.singletonMap;
public class PlacementPluginIntegrationTest extends SolrCloudTestCase { public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION = PlacementPluginIntegrationTest.class.getName() + "_collection"; private static final String COLLECTION = PlacementPluginIntegrationTest.class.getSimpleName() + "_collection";
private static SolrCloudManager cloudManager; private static SolrCloudManager cloudManager;
private static CoreContainer cc; private static CoreContainer cc;
@ -231,6 +232,88 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
assertNull("no factory should be present", factory); assertNull("no factory should be present", factory);
} }
@Test
public void testWithCollectionIntegration() throws Exception {
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
int version = wrapper.getVersion();
log.debug("--initial version={}", version);
Set<String> nodeSet = new HashSet<>();
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
if (nodeSet.size() > 1) {
break;
}
nodeSet.add(node);
}
String SECONDARY_COLLECTION = COLLECTION + "_secondary";
PluginMeta plugin = new PluginMeta();
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
plugin.klass = AffinityPlacementFactory.class.getName();
plugin.config = new AffinityPlacementConfig(1, 2, Map.of(COLLECTION, SECONDARY_COLLECTION));
V2Request req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.POST()
.withPayload(singletonMap("add", plugin))
.build();
req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10);
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(SECONDARY_COLLECTION, "conf", 1, 3)
.process(cluster.getSolrClient());
assertTrue(rsp.isSuccess());
cluster.waitForActiveCollection(SECONDARY_COLLECTION, 1, 3);
DocCollection secondary = cloudManager.getClusterStateProvider().getClusterState().getCollection(SECONDARY_COLLECTION);
Set<String> secondaryNodes = new HashSet<>();
secondary.forEachReplica((shard, replica) -> secondaryNodes.add(replica.getNodeName()));
rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
.setCreateNodeSet(String.join(",", nodeSet))
.process(cluster.getSolrClient());
assertTrue(rsp.isSuccess());
cluster.waitForActiveCollection(COLLECTION, 2, 4);
// make sure the primary replicas were placed on the nodeset
DocCollection primary = cloudManager.getClusterStateProvider().getClusterState().getCollection(COLLECTION);
primary.forEachReplica((shard, replica) ->
assertTrue("primary replica not on secondary node!", nodeSet.contains(replica.getNodeName())));
// try deleting secondary replica from node without the primary replica
Optional<String> onlySecondaryReplica = secondary.getReplicas().stream()
.filter(replica -> !nodeSet.contains(replica.getNodeName()))
.map(replica -> replica.getName()).findFirst();
assertTrue("no secondary node without primary replica", onlySecondaryReplica.isPresent());
rsp = CollectionAdminRequest.deleteReplica(SECONDARY_COLLECTION, "shard1", onlySecondaryReplica.get())
.process(cluster.getSolrClient());
assertTrue("delete of a lone secondary replica should succeed", rsp.isSuccess());
// try deleting secondary replica from node WITH the primary replica - should fail
Optional<String> secondaryWithPrimaryReplica = secondary.getReplicas().stream()
.filter(replica -> nodeSet.contains(replica.getNodeName()))
.map(replica -> replica.getName()).findFirst();
assertTrue("no secondary node with primary replica", secondaryWithPrimaryReplica.isPresent());
try {
rsp = CollectionAdminRequest.deleteReplica(SECONDARY_COLLECTION, "shard1", secondaryWithPrimaryReplica.get())
.process(cluster.getSolrClient());
fail("should have failed: " + rsp);
} catch (Exception e) {
assertTrue(e.toString(), e.toString().contains("co-located with replicas"));
}
// try deleting secondary collection
try {
rsp = CollectionAdminRequest.deleteCollection(SECONDARY_COLLECTION)
.process(cluster.getSolrClient());
fail("should have failed: " + rsp);
} catch (Exception e) {
assertTrue(e.toString(), e.toString().contains("colocated collection"));
}
}
@Test @Test
public void testAttributeFetcherImpl() throws Exception { public void testAttributeFetcherImpl() throws Exception {
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2) CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)

View File

@ -25,7 +25,7 @@ import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection; import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.*; import org.apache.solr.cluster.placement.*;
import org.apache.solr.cluster.placement.Builders; import org.apache.solr.cluster.placement.Builders;
import org.apache.solr.cluster.placement.impl.PlacementPlanFactoryImpl; import org.apache.solr.cluster.placement.impl.ModificationRequestImpl;
import org.apache.solr.cluster.placement.impl.PlacementRequestImpl; import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Pair;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -50,10 +50,15 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
private final static long MINIMAL_FREE_DISK_GB = 10L; private final static long MINIMAL_FREE_DISK_GB = 10L;
private final static long PRIORITIZED_FREE_DISK_GB = 50L; private final static long PRIORITIZED_FREE_DISK_GB = 50L;
private final static String secondaryCollectionName = "withCollection_secondary";
private final static String primaryCollectionName = "withCollection_primary";
@BeforeClass @BeforeClass
public static void setupPlugin() { public static void setupPlugin() {
AffinityPlacementConfig config = new AffinityPlacementConfig(MINIMAL_FREE_DISK_GB, PRIORITIZED_FREE_DISK_GB); AffinityPlacementConfig config = new AffinityPlacementConfig(
MINIMAL_FREE_DISK_GB,
PRIORITIZED_FREE_DISK_GB,
Map.of(primaryCollectionName, secondaryCollectionName));
AffinityPlacementFactory factory = new AffinityPlacementFactory(); AffinityPlacementFactory factory = new AffinityPlacementFactory();
factory.configure(config); factory.configure(config);
plugin = factory.createPluginInstance(); plugin = factory.createPluginInstance();
@ -93,8 +98,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of()); collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
} }
Cluster cluster = clusterBuilder.build(); PlacementContext placementContext = clusterBuilder.buildPlacementContext();
AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
SolrCollection solrCollection = collectionBuilder.build(); SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes(); List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@ -104,7 +108,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
Set.of(solrCollection.shards().iterator().next().getShardName()), new HashSet<>(liveNodes), Set.of(solrCollection.shards().iterator().next().getShardName()), new HashSet<>(liveNodes),
1, 0, 0); 1, 0, 0);
PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
assertEquals(1, pp.getReplicaPlacements().size()); assertEquals(1, pp.getReplicaPlacements().size());
ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next(); ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
@ -144,7 +148,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
2, 2, 2); 2, 2, 2);
PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
assertEquals(18, pp.getReplicaPlacements().size()); // 3 shards, 6 replicas total each assertEquals(18, pp.getReplicaPlacements().size()); // 3 shards, 6 replicas total each
Set<Pair<String, Node>> placements = new HashSet<>(); Set<Pair<String, Node>> placements = new HashSet<>();
@ -157,7 +161,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
// Verify that if we ask for 7 replicas, the placement will use the low free space node // Verify that if we ask for 7 replicas, the placement will use the low free space node
placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
7, 0, 0); 7, 0, 0);
pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
assertEquals(21, pp.getReplicaPlacements().size()); // 3 shards, 7 replicas each assertEquals(21, pp.getReplicaPlacements().size()); // 3 shards, 7 replicas each
placements = new HashSet<>(); placements = new HashSet<>();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) { for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
@ -170,7 +174,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
try { try {
placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
8, 0, 0); 8, 0, 0);
plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
fail("Placing 8 replicas should not be possible given only 7 nodes have enough space"); fail("Placing 8 replicas should not be possible given only 7 nodes have enough space");
} catch (PlacementException e) { } catch (PlacementException e) {
// expected // expected
@ -214,7 +218,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
// The replicas must be placed on the most appropriate nodes, i.e. those that do not already have a replica for the // The replicas must be placed on the most appropriate nodes, i.e. those that do not already have a replica for the
// shard and then on the node with the lowest number of cores. // shard and then on the node with the lowest number of cores.
// NRT are placed first and given the cluster state here the placement is deterministic (easier to test, only one good placement). // NRT are placed first and given the cluster state here the placement is deterministic (easier to test, only one good placement).
PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Each expected placement is represented as a string "shard replica-type node" // Each expected placement is represented as a string "shard replica-type node"
Set<String> expectedPlacements = Set.of("1 NRT 1", "1 TLOG 2", "2 NRT 0", "2 TLOG 4"); Set<String> expectedPlacements = Set.of("1 NRT 1", "1 TLOG 2", "2 NRT 0", "2 TLOG 4");
@ -312,7 +316,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
// Add 2 NRT and one TLOG to each shard. // Add 2 NRT and one TLOG to each shard.
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
2, 1, 0); 2, 1, 0);
PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Shard 1: The NRT's should go to the med cores node on AZ2 and low core on az3 (even though // Shard 1: The NRT's should go to the med cores node on AZ2 and low core on az3 (even though
// a low core node can take the replica in az1, there's already an NRT replica there and we want spreading across AZ's), // a low core node can take the replica in az1, there's already an NRT replica there and we want spreading across AZ's),
// the TLOG to the TLOG node on AZ2 (because the tlog node on AZ1 has low free disk) // the TLOG to the TLOG node on AZ2 (because the tlog node on AZ1 has low free disk)
@ -326,7 +330,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
// If we add instead 2 PULL replicas to each shard // If we add instead 2 PULL replicas to each shard
placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
0, 0, 2); 0, 0, 2);
pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Shard 1: Given node AZ3_TLOGPULL is taken by the TLOG replica, the PULL should go to AZ1_TLOGPULL_LOWFREEDISK and AZ2_TLOGPULL // Shard 1: Given node AZ3_TLOGPULL is taken by the TLOG replica, the PULL should go to AZ1_TLOGPULL_LOWFREEDISK and AZ2_TLOGPULL
// Shard 2: Similarly AZ2_TLOGPULL is taken. Replicas should go to AZ1_TLOGPULL_LOWFREEDISK and AZ3_TLOGPULL // Shard 2: Similarly AZ2_TLOGPULL is taken. Replicas should go to AZ1_TLOGPULL_LOWFREEDISK and AZ3_TLOGPULL
expectedPlacements = Set.of("1 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "1 PULL " + AZ2_TLOGPULL, expectedPlacements = Set.of("1 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "1 PULL " + AZ2_TLOGPULL,
@ -373,7 +377,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
for (int countNrtToPlace = 1; countNrtToPlace <= 9; countNrtToPlace++) { for (int countNrtToPlace = 1; countNrtToPlace <= 9; countNrtToPlace++) {
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes), PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
countNrtToPlace, 0, 0); countNrtToPlace, 0, 0);
PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
verifyPlacements(placements.get(countNrtToPlace - 1), pp, collectionBuilder.getShardBuilders(), liveNodes); verifyPlacements(placements.get(countNrtToPlace - 1), pp, collectionBuilder.getShardBuilders(), liveNodes);
} }
} }
@ -409,7 +413,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, Set.of(solrCollection.iterator().next().getShardName()), new HashSet<>(liveNodes), PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, Set.of(solrCollection.iterator().next().getShardName()), new HashSet<>(liveNodes),
0, 0, 1); 0, 0, 1);
PlacementPlan pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Each expected placement is represented as a string "shard replica-type node" // Each expected placement is represented as a string "shard replica-type node"
// Node 0 has less cores than node 1 (0 vs 1) so the placement should go there. // Node 0 has less cores than node 1 (0 vs 1) so the placement should go there.
@ -422,7 +426,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
it.next(); // skip first shard to do placement for the second one... it.next(); // skip first shard to do placement for the second one...
placementRequest = new PlacementRequestImpl(solrCollection, Set.of(it.next().getShardName()), new HashSet<>(liveNodes), placementRequest = new PlacementRequestImpl(solrCollection, Set.of(it.next().getShardName()), new HashSet<>(liveNodes),
0, 0, 1); 0, 0, 1);
pp = plugin.computePlacement(clusterBuilder.build(), placementRequest, clusterBuilder.buildAttributeFetcher(), new PlacementPlanFactoryImpl()); pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
expectedPlacements = Set.of("2 PULL 0"); expectedPlacements = Set.of("2 PULL 0");
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes); verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
} }
@ -505,7 +509,8 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders()); collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder); clusterBuilder.addCollection(collectionBuilder);
Cluster cluster = clusterBuilder.build(); PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection solrCollection = cluster.getCollection(collectionName); SolrCollection solrCollection = cluster.getCollection(collectionName);
@ -514,14 +519,12 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
.map(Shard::getShardName).collect(Collectors.toSet()), .map(Shard::getShardName).collect(Collectors.toSet()),
cluster.getLiveNodes(), 2, 2, 2); cluster.getLiveNodes(), 2, 2, 2);
PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl(); PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, placementPlanFactory);
// 2 shards, 6 replicas // 2 shards, 6 replicas
assertEquals(12, pp.getReplicaPlacements().size()); assertEquals(12, pp.getReplicaPlacements().size());
// shard -> AZ -> replica count // shard -> AZ -> replica count
Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>(); Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>();
AttributeValues attributeValues = attributeFetcher.fetchAttributes(); AttributeValues attributeValues = placementContext.getAttributeFetcher().fetchAttributes();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) { for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
Optional<String> azOptional = attributeValues.getSystemProperty(rp.getNode(), AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP); Optional<String> azOptional = attributeValues.getSystemProperty(rp.getNode(), AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP);
if (!azOptional.isPresent()) { if (!azOptional.isPresent()) {
@ -565,7 +568,8 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders()); collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder); clusterBuilder.addCollection(collectionBuilder);
Cluster cluster = clusterBuilder.build(); PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection solrCollection = cluster.getCollection(collectionName); SolrCollection solrCollection = cluster.getCollection(collectionName);
@ -574,14 +578,12 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
.map(Shard::getShardName).collect(Collectors.toSet()), .map(Shard::getShardName).collect(Collectors.toSet()),
cluster.getLiveNodes(), 2, 2, 2); cluster.getLiveNodes(), 2, 2, 2);
PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl(); PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, placementPlanFactory);
// 2 shards, 6 replicas // 2 shards, 6 replicas
assertEquals(12, pp.getReplicaPlacements().size()); assertEquals(12, pp.getReplicaPlacements().size());
// shard -> group -> replica count // shard -> group -> replica count
Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>(); Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>();
AttributeValues attributeValues = attributeFetcher.fetchAttributes(); AttributeValues attributeValues = placementContext.getAttributeFetcher().fetchAttributes();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) { for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
Optional<String> groupOptional = attributeValues.getSystemProperty(rp.getNode(), "group"); Optional<String> groupOptional = attributeValues.getSystemProperty(rp.getNode(), "group");
if (!groupOptional.isPresent()) { if (!groupOptional.isPresent()) {
@ -632,7 +634,8 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders()); collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder); clusterBuilder.addCollection(collectionBuilder);
Cluster cluster = clusterBuilder.build(); PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection solrCollection = cluster.getCollection(collectionName); SolrCollection solrCollection = cluster.getCollection(collectionName);
@ -641,15 +644,104 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
.map(Shard::getShardName).collect(Collectors.toSet()), .map(Shard::getShardName).collect(Collectors.toSet()),
cluster.getLiveNodes(), 1, 0, 1); cluster.getLiveNodes(), 1, 0, 1);
PlacementPlanFactory placementPlanFactory = new PlacementPlanFactoryImpl(); PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, placementPlanFactory);
assertEquals(4, pp.getReplicaPlacements().size()); assertEquals(4, pp.getReplicaPlacements().size());
for (ReplicaPlacement rp : pp.getReplicaPlacements()) { for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
assertFalse("should not put any replicas on " + smallNode, rp.getNode().equals(smallNode)); assertFalse("should not put any replicas on " + smallNode, rp.getNode().equals(smallNode));
} }
} }
@Test
public void testWithCollectionPlacement() throws Exception {
int NUM_NODES = 3;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(secondaryCollectionName);
collectionBuilder.initializeShardsReplicas(1, 2, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
collectionBuilder = Builders.newCollectionBuilder(primaryCollectionName);
collectionBuilder.initializeShardsReplicas(0, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection secondaryCollection = cluster.getCollection(secondaryCollectionName);
SolrCollection primaryCollection = cluster.getCollection(primaryCollectionName);
Set<Node> secondaryNodes = new HashSet<>();
secondaryCollection.shards().forEach(s -> s.replicas().forEach(r -> secondaryNodes.add(r.getNode())));
PlacementRequestImpl placementRequest = new PlacementRequestImpl(primaryCollection,
Set.of("shard1", "shard2"), cluster.getLiveNodes(), 1, 0, 0);
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
assertEquals(2, pp.getReplicaPlacements().size());
// verify that all placements are on nodes with the secondary replica
pp.getReplicaPlacements().forEach(placement ->
assertTrue("placement node " + placement.getNode() + " not in secondary=" + secondaryNodes,
secondaryNodes.contains(placement.getNode())));
placementRequest = new PlacementRequestImpl(primaryCollection,
Set.of("shard1"), cluster.getLiveNodes(), 3, 0, 0);
try {
pp = plugin.computePlacement(placementRequest, placementContext);
fail("should generate 'Not enough eligible nodes' failure here");
} catch (PlacementException pe) {
assertTrue(pe.toString().contains("Not enough eligible nodes"));
}
}
@Test
public void testWithCollectionModificationRejected() throws Exception {
int NUM_NODES = 2;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(secondaryCollectionName);
collectionBuilder.initializeShardsReplicas(1, 4, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
collectionBuilder = Builders.newCollectionBuilder(primaryCollectionName);
collectionBuilder.initializeShardsReplicas(2, 2, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection secondaryCollection = cluster.getCollection(secondaryCollectionName);
SolrCollection primaryCollection = cluster.getCollection(primaryCollectionName);
Node node = cluster.getLiveNodes().iterator().next();
Set<Replica> secondaryReplicas = new HashSet<>();
secondaryCollection.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
if (secondaryReplicas.size() < 1 && replica.getNode().equals(node)) {
secondaryReplicas.add(replica);
}
}));
DeleteReplicasRequest deleteReplicasRequest = ModificationRequestImpl.createDeleteReplicasRequest(secondaryCollection, secondaryReplicas);
try {
plugin.verifyAllowedModification(deleteReplicasRequest, placementContext);
} catch (PlacementException pe) {
fail("should have succeeded: " + pe.toString());
}
secondaryCollection.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
if (secondaryReplicas.size() < 2 && replica.getNode().equals(node)) {
secondaryReplicas.add(replica);
}
}));
deleteReplicasRequest = ModificationRequestImpl.createDeleteReplicasRequest(secondaryCollection, secondaryReplicas);
try {
plugin.verifyAllowedModification(deleteReplicasRequest, placementContext);
fail("should have failed: " + deleteReplicasRequest);
} catch (PlacementException pe) {
}
}
@Test @Slow @Test @Slow
public void testScalability() throws Exception { public void testScalability() throws Exception {
log.info("==== numNodes ===="); log.info("==== numNodes ====");
@ -684,9 +776,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName); Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
collectionBuilder.initializeShardsReplicas(numShards, 0, 0, 0, List.of()); collectionBuilder.initializeShardsReplicas(numShards, 0, 0, 0, List.of());
Cluster cluster = clusterBuilder.build(); PlacementContext placementContext = clusterBuilder.buildPlacementContext();
AttributeFetcher attributeFetcher = clusterBuilder.buildAttributeFetcher();
SolrCollection solrCollection = collectionBuilder.build(); SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes(); List<Node> liveNodes = clusterBuilder.buildLiveNodes();
@ -695,7 +785,7 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
new HashSet<>(liveNodes), nrtReplicas, tlogReplicas, pullReplicas); new HashSet<>(liveNodes), nrtReplicas, tlogReplicas, pullReplicas);
long start = System.nanoTime(); long start = System.nanoTime();
PlacementPlan pp = plugin.computePlacement(cluster, placementRequest, attributeFetcher, new PlacementPlanFactoryImpl()); PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
long end = System.nanoTime(); long end = System.nanoTime();
final int REPLICAS_PER_SHARD = nrtReplicas + tlogReplicas + pullReplicas; final int REPLICAS_PER_SHARD = nrtReplicas + tlogReplicas + pullReplicas;

View File

@ -115,4 +115,9 @@ public interface CollectionAdminParams {
* for the add replica API. If set to true, a valid "node" should be specified. * for the add replica API. If set to true, a valid "node" should be specified.
*/ */
String SKIP_NODE_ASSIGNMENT = "skipNodeAssignment"; String SKIP_NODE_ASSIGNMENT = "skipNodeAssignment";
/**
* Prefix for arbitrary collection or replica properties.
*/
String PROPERTY_PREFIX = "property.";
} }