mirror of https://github.com/apache/lucene.git
SOLR-15016: Replica placement plugins should use container plugins API / configs.
This commit is contained in:
parent
21b8890f58
commit
cced5078ea
|
@ -178,6 +178,8 @@ Other Changes
|
|||
* SOLR-14789: Docker: Migrate docker image creation from docker-solr repo to solr/docker.
|
||||
(Houston Putman, Martijn Koster, Tim Potter, David Smiley, janhoy, Mike Drob)
|
||||
|
||||
* SOLR-15016: Replica placement plugins should use container plugins API / configs. (ab, ilan)
|
||||
|
||||
Bug Fixes
|
||||
---------------------
|
||||
* SOLR-14546: Fix for a relatively hard to hit issue in OverseerTaskProcessor that could lead to out of order execution
|
||||
|
|
|
@ -25,8 +25,8 @@ import org.apache.solr.common.MapWriter;
|
|||
*/
|
||||
public interface ConfigurablePlugin<T extends MapWriter> {
|
||||
|
||||
/**This is invoked soon after the Object is initialized
|
||||
*
|
||||
/**
|
||||
* This is invoked soon after the Object is initialized.
|
||||
* @param cfg value deserialized from JSON
|
||||
*/
|
||||
void configure(T cfg);
|
||||
|
|
|
@ -143,7 +143,7 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void refresh() {
|
||||
Map<String, Object> pluginInfos = null;
|
||||
Map<String, Object> pluginInfos;
|
||||
try {
|
||||
pluginInfos = ContainerPluginsApi.plugins(coreContainer.zkClientSupplier);
|
||||
} catch (IOException e) {
|
||||
|
@ -181,9 +181,8 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
} else {
|
||||
//ADDED or UPDATED
|
||||
PluginMetaHolder info = newState.get(e.getKey());
|
||||
ApiInfo apiInfo = null;
|
||||
List<String> errs = new ArrayList<>();
|
||||
apiInfo = new ApiInfo(info,errs);
|
||||
ApiInfo apiInfo = new ApiInfo(info,errs);
|
||||
if (!errs.isEmpty()) {
|
||||
log.error(StrUtils.join(errs, ','));
|
||||
continue;
|
||||
|
@ -239,8 +238,7 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private static Map<String, String> getTemplateVars(PluginMeta pluginMeta) {
|
||||
Map result = makeMap("plugin-name", pluginMeta.name, "path-prefix", pluginMeta.pathPrefix);
|
||||
return result;
|
||||
return (Map) makeMap("plugin-name", pluginMeta.name, "path-prefix", pluginMeta.pathPrefix);
|
||||
}
|
||||
|
||||
private static class ApiHolder extends Api {
|
||||
|
@ -273,7 +271,7 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
private final PluginMetaHolder holder;
|
||||
|
||||
@JsonProperty
|
||||
private PluginMeta info;
|
||||
private final PluginMeta info;
|
||||
|
||||
@JsonProperty(value = "package")
|
||||
public final String pkg;
|
||||
|
@ -392,8 +390,8 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
}
|
||||
if (instance instanceof ConfigurablePlugin) {
|
||||
Class<? extends MapWriter> c = getConfigClass((ConfigurablePlugin<? extends MapWriter>) instance);
|
||||
if (c != null) {
|
||||
MapWriter initVal = mapper.readValue(Utils.toJSON(holder.original), c);
|
||||
if (c != null && holder.meta.config != null) {
|
||||
MapWriter initVal = mapper.readValue(Utils.toJSON(holder.meta.config), c);
|
||||
((ConfigurablePlugin) instance).configure(initVal);
|
||||
}
|
||||
}
|
||||
|
@ -412,7 +410,8 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
|
||||
}
|
||||
|
||||
/**Get the generic type of a {@link ConfigurablePlugin}
|
||||
/**
|
||||
* Get the generic type of a {@link ConfigurablePlugin}
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static Class getConfigClass(ConfigurablePlugin<?> o) {
|
||||
|
@ -422,7 +421,10 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
for (Type type : interfaces) {
|
||||
if (type instanceof ParameterizedType) {
|
||||
ParameterizedType parameterizedType = (ParameterizedType) type;
|
||||
if (parameterizedType.getRawType() == ConfigurablePlugin.class) {
|
||||
Type rawType = parameterizedType.getRawType();
|
||||
if (rawType == ConfigurablePlugin.class ||
|
||||
// or if a super interface is a ConfigurablePlugin
|
||||
((rawType instanceof Class) && ConfigurablePlugin.class.isAssignableFrom((Class) rawType))) {
|
||||
return (Class) parameterizedType.getActualTypeArguments()[0];
|
||||
}
|
||||
}
|
||||
|
@ -442,10 +444,10 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
|
|||
}
|
||||
|
||||
public enum Diff {
|
||||
ADDED, REMOVED, UNCHANGED, UPDATED;
|
||||
ADDED, REMOVED, UNCHANGED, UPDATED
|
||||
}
|
||||
|
||||
public static Map<String, Diff> compareMaps(Map<String,? extends Object> a, Map<String,? extends Object> b) {
|
||||
public static Map<String, Diff> compareMaps(Map<String, ?> a, Map<String, ?> b) {
|
||||
if(a.isEmpty() && b.isEmpty()) return null;
|
||||
Map<String, Diff> result = new HashMap<>(Math.max(a.size(), b.size()));
|
||||
a.forEach((k, v) -> {
|
||||
|
|
|
@ -20,12 +20,7 @@ package org.apache.solr.cloud.api.collections;
|
|||
|
||||
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.*;
|
||||
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
|
||||
import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
|
@ -50,6 +45,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
|||
import org.apache.solr.cloud.ActiveReplicaWatcher;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
|
||||
import org.apache.solr.cluster.placement.PlacementPlugin;
|
||||
import org.apache.solr.common.SolrCloseableLatch;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
|
@ -144,7 +140,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
}
|
||||
}
|
||||
|
||||
List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount)
|
||||
List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount,
|
||||
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance())
|
||||
.stream()
|
||||
.map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition))
|
||||
.collect(Collectors.toList());
|
||||
|
@ -304,7 +301,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
|
||||
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
|
||||
String collectionName, ZkNodeProps message,
|
||||
EnumMap<Replica.Type, Integer> replicaTypeVsCount) throws IOException, InterruptedException {
|
||||
EnumMap<Replica.Type, Integer> replicaTypeVsCount,
|
||||
PlacementPlugin placementPlugin) throws IOException, InterruptedException {
|
||||
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
|
||||
boolean skipNodeAssignment = message.getBool(CollectionAdminParams.SKIP_NODE_ASSIGNMENT, false);
|
||||
String sliceName = message.getStr(SHARD_ID_PROP);
|
||||
|
@ -328,7 +326,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
if (!skipCreateReplicaInClusterState && !skipNodeAssignment) {
|
||||
|
||||
positions = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas,
|
||||
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager);
|
||||
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager, placementPlugin);
|
||||
}
|
||||
|
||||
if (positions == null) {
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.solr.client.solrj.cloud.BadVersionException;
|
|||
import org.apache.solr.client.solrj.cloud.VersionedData;
|
||||
import org.apache.solr.cluster.placement.PlacementPlugin;
|
||||
import org.apache.solr.cluster.placement.impl.PlacementPluginAssignStrategy;
|
||||
import org.apache.solr.cluster.placement.impl.PlacementPluginConfigImpl;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -270,7 +269,8 @@ public class Assign {
|
|||
@SuppressWarnings({"unchecked"})
|
||||
public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
|
||||
String shard, int nrtReplicas, int tlogReplicas, int pullReplicas,
|
||||
Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException, AssignmentException {
|
||||
Object createNodeSet, SolrCloudManager cloudManager,
|
||||
PlacementPlugin placementPlugin) throws IOException, InterruptedException, AssignmentException {
|
||||
log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}"
|
||||
, shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet);
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
|
@ -296,7 +296,7 @@ public class Assign {
|
|||
.assignPullReplicas(pullReplicas)
|
||||
.onNodes(createNodeList)
|
||||
.build();
|
||||
AssignStrategy assignStrategy = createAssignStrategy(cloudManager, clusterState, coll);
|
||||
AssignStrategy assignStrategy = createAssignStrategy(placementPlugin, clusterState, coll);
|
||||
return assignStrategy.assign(cloudManager, assignRequest);
|
||||
}
|
||||
|
||||
|
@ -492,13 +492,13 @@ public class Assign {
|
|||
/**
|
||||
* Creates the appropriate instance of {@link AssignStrategy} based on how the cluster and/or individual collections are
|
||||
* configured.
|
||||
* <p>If {@link PlacementPlugin} instance is null this call will return {@link LegacyAssignStrategy}, otherwise
|
||||
* {@link PlacementPluginAssignStrategy} will be used.</p>
|
||||
*/
|
||||
public static AssignStrategy createAssignStrategy(SolrCloudManager solrCloudManager, ClusterState clusterState, DocCollection collection) {
|
||||
PlacementPlugin plugin = PlacementPluginConfigImpl.getPlacementPlugin(solrCloudManager);
|
||||
|
||||
if (plugin != null) {
|
||||
public static AssignStrategy createAssignStrategy(PlacementPlugin placementPlugin, ClusterState clusterState, DocCollection collection) {
|
||||
if (placementPlugin != null) {
|
||||
// If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin)
|
||||
return new PlacementPluginAssignStrategy(collection, plugin);
|
||||
return new PlacementPluginAssignStrategy(collection, placementPlugin);
|
||||
} else {
|
||||
return new LegacyAssignStrategy();
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.solr.cloud.Overseer;
|
|||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cluster.placement.PlacementPlugin;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
|
@ -168,7 +169,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
|||
|
||||
List<ReplicaPosition> replicaPositions = null;
|
||||
try {
|
||||
replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), message, shardNames);
|
||||
replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName),
|
||||
message, shardNames, ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance());
|
||||
} catch (Assign.AssignmentException e) {
|
||||
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
|
||||
new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
|
||||
|
@ -286,10 +288,10 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
|||
}
|
||||
}
|
||||
|
||||
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
|
||||
DocCollection docCollection,
|
||||
ZkNodeProps message,
|
||||
List<String> shardNames) throws IOException, InterruptedException, Assign.AssignmentException {
|
||||
private static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
|
||||
DocCollection docCollection,
|
||||
ZkNodeProps message,
|
||||
List<String> shardNames, PlacementPlugin placementPlugin) throws IOException, InterruptedException, Assign.AssignmentException {
|
||||
final String collectionName = message.getStr(NAME);
|
||||
// look at the replication factor and see if it matches reality
|
||||
// if it does not, find best nodes to create more cores
|
||||
|
@ -328,7 +330,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
|
|||
.assignPullReplicas(numPullReplicas)
|
||||
.onNodes(nodeList)
|
||||
.build();
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(cloudManager, clusterState, docCollection);
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(placementPlugin, clusterState, docCollection);
|
||||
replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
|
||||
}
|
||||
return replicaPositions;
|
||||
|
|
|
@ -120,7 +120,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
.assignPullReplicas(numPullReplicas)
|
||||
.onNodes(new ArrayList<>(ocmh.cloudManager.getClusterStateProvider().getLiveNodes()))
|
||||
.build();
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.cloudManager, clusterState, clusterState.getCollection(sourceCollection));
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
|
||||
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
|
||||
clusterState, clusterState.getCollection(sourceCollection));
|
||||
targetNode = assignStrategy.assign(ocmh.cloudManager, assignRequest).get(0).node;
|
||||
}
|
||||
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode);
|
||||
|
|
|
@ -229,7 +229,9 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
.assignPullReplicas(numPullReplicas)
|
||||
.onNodes(nodeList)
|
||||
.build();
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.cloudManager, clusterState, restoreCollection);
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
|
||||
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
|
||||
clusterState, restoreCollection);
|
||||
List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());
|
||||
|
|
|
@ -434,7 +434,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
.assignPullReplicas(numPull.get())
|
||||
.onNodes(new ArrayList<>(clusterState.getLiveNodes()))
|
||||
.build();
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.cloudManager, clusterState, collection);
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
|
||||
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance(),
|
||||
clusterState, collection);
|
||||
List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);
|
||||
t.stop();
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.io.Closeable;
|
|||
public interface ClusterEventProducer extends ClusterSingleton, Closeable {
|
||||
|
||||
/** Unique name for the registration of a plugin-based implementation. */
|
||||
String PLUGIN_NAME = "cluster-event-producer";
|
||||
String PLUGIN_NAME = ".cluster-event-producer";
|
||||
|
||||
@Override
|
||||
default String getName() {
|
||||
|
|
|
@ -128,13 +128,17 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
|
|||
ClusterEventListener listener = (ClusterEventListener) instance;
|
||||
clusterEventProducer.registerListener(listener);
|
||||
} else if (instance instanceof ClusterEventProducer) {
|
||||
// replace the existing impl
|
||||
if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
|
||||
((DelegatingClusterEventProducer) cc.getClusterEventProducer())
|
||||
.setDelegate((ClusterEventProducer) instance);
|
||||
if (ClusterEventProducer.PLUGIN_NAME.equals(plugin.getInfo().name)) {
|
||||
// replace the existing impl
|
||||
if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
|
||||
((DelegatingClusterEventProducer) cc.getClusterEventProducer())
|
||||
.setDelegate((ClusterEventProducer) instance);
|
||||
} else {
|
||||
log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
|
||||
" using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
|
||||
}
|
||||
} else {
|
||||
log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
|
||||
" using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
|
||||
log.warn("Ignoring ClusterEventProducer config with non-standard name: {}", plugin.getInfo());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -149,21 +153,25 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
|
|||
ClusterEventListener listener = (ClusterEventListener) instance;
|
||||
clusterEventProducer.unregisterListener(listener);
|
||||
} else if (instance instanceof ClusterEventProducer) {
|
||||
// replace the existing impl with NoOp
|
||||
if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
|
||||
((DelegatingClusterEventProducer) cc.getClusterEventProducer())
|
||||
.setDelegate(new NoOpProducer(cc));
|
||||
if (ClusterEventProducer.PLUGIN_NAME.equals(plugin.getInfo().name)) {
|
||||
// replace the existing impl with NoOp
|
||||
if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
|
||||
((DelegatingClusterEventProducer) cc.getClusterEventProducer())
|
||||
.setDelegate(new NoOpProducer(cc));
|
||||
} else {
|
||||
log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
|
||||
" using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
|
||||
}
|
||||
} else {
|
||||
log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
|
||||
" using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
|
||||
log.warn("Ignoring ClusterEventProducer config with non-standard name: {}", plugin.getInfo());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
|
||||
added(replacement);
|
||||
deleted(old);
|
||||
added(replacement);
|
||||
}
|
||||
};
|
||||
plugins.registerListener(pluginListener);
|
||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.solr.cloud.api.collections.Assign;
|
|||
import org.apache.solr.cluster.events.ClusterEvent;
|
||||
import org.apache.solr.cluster.events.ClusterEventListener;
|
||||
import org.apache.solr.cluster.events.NodesDownEvent;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
|
@ -76,10 +78,12 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
|
||||
|
||||
private ScheduledThreadPoolExecutor waitForExecutor;
|
||||
private final PlacementPluginFactory<? extends PlacementPluginConfig> placementPluginFactory;
|
||||
|
||||
public CollectionsRepairEventListener(CoreContainer cc) {
|
||||
this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
|
||||
this.solrCloudManager = cc.getZkController().getSolrCloudManager();
|
||||
this.placementPluginFactory = cc.getPlacementPluginFactory();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -110,7 +114,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
}
|
||||
}
|
||||
|
||||
private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
|
||||
private final Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
|
||||
|
||||
private void handleNodesDown(NodesDownEvent event) {
|
||||
|
||||
|
@ -121,9 +125,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
|
||||
trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
|
||||
// add any new lost nodes (old lost nodes are skipped)
|
||||
event.getNodeNames().forEachRemaining(lostNode -> {
|
||||
nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
|
||||
});
|
||||
event.getNodeNames().forEachRemaining(lostNode -> nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs()));
|
||||
}
|
||||
|
||||
private void runRepair() {
|
||||
|
@ -167,7 +169,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
.incrementAndGet();
|
||||
}
|
||||
});
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(solrCloudManager, clusterState, coll);
|
||||
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(placementPluginFactory.createPluginInstance(), clusterState, coll);
|
||||
lostReplicas.forEach((shard, types) -> {
|
||||
Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
|
||||
.forCollection(coll.getName())
|
||||
|
@ -191,7 +193,6 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
newPositions.put(coll.getName(), positions);
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception computing positions for {}/{}: {}", coll.getName(), shard, e);
|
||||
return;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -206,15 +207,13 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
// send ADDREPLICA admin requests for each lost replica
|
||||
// XXX should we use 'async' for that, to avoid blocking here?
|
||||
List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
|
||||
newPositions.forEach((collection, positions) -> {
|
||||
positions.forEach(position -> {
|
||||
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
|
||||
.addReplicaToShard(collection, position.shard, position.type);
|
||||
addReplica.setNode(position.node);
|
||||
addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
|
||||
addReplicas.add(addReplica);
|
||||
});
|
||||
});
|
||||
newPositions.forEach((collection, positions) -> positions.forEach(position -> {
|
||||
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
|
||||
.addReplicaToShard(collection, position.shard, position.type);
|
||||
addReplica.setNode(position.node);
|
||||
addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
|
||||
addReplicas.add(addReplica);
|
||||
}));
|
||||
addReplicas.forEach(addReplica -> {
|
||||
try {
|
||||
solrClient.request(addReplica);
|
||||
|
@ -231,7 +230,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
|
|||
new SolrNamedThreadFactory("collectionsRepair_waitFor"));
|
||||
waitForExecutor.setRemoveOnCancelPolicy(true);
|
||||
waitForExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||
waitForExecutor.scheduleAtFixedRate(() -> runRepair(), 0, waitForSecond, TimeUnit.SECONDS);
|
||||
waitForExecutor.scheduleAtFixedRate(this::runRepair, 0, waitForSecond, TimeUnit.SECONDS);
|
||||
state = State.RUNNING;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,112 +14,15 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cluster.placement;
|
||||
|
||||
import org.apache.solr.common.util.ReflectMapWriter;
|
||||
|
||||
/**
|
||||
* <p>Configuration passed by Solr to {@link PlacementPluginFactory#createPluginInstance(PlacementPluginConfig)} so that plugin instances
|
||||
* ({@link PlacementPlugin}) created by the factory can easily retrieve their configuration.</p>
|
||||
*
|
||||
* <p>A plugin writer decides the names and the types of the configurable parameters it needs. Available types are
|
||||
* {@link String}, {@link Long}, {@link Boolean}, {@link Double}. This configuration currently lives in the {@code /clusterprops.json}
|
||||
* file in Zookeeper (this could change in the future, the plugin code will not change but the way to store its configuration
|
||||
* in the cluster might). {@code clusterprops.json} also contains the name of the plugin factory class implementing
|
||||
* {@link org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory}.</p>
|
||||
*
|
||||
* <p>In order to configure a plugin to be used for placement decisions, the following {@code curl} command (or something
|
||||
* equivalent) has to be executed once the cluster is already running to set the configuration.
|
||||
* Replace {@code localhost:8983} by one of your servers' IP address and port.</p>
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* curl -X POST -H 'Content-type:application/json' -d '{
|
||||
* "set-placement-plugin": {
|
||||
* "class": "factory.class.name$inner",
|
||||
* "myfirstString": "a text value",
|
||||
* "aLong": 50,
|
||||
* "aDoubleConfig": 3.1415928,
|
||||
* "shouldIStay": true
|
||||
* }
|
||||
* }' http://localhost:8983/api/cluster
|
||||
* </pre>
|
||||
*
|
||||
* <p>The consequence will be the creation (or replacement if it exists) of an element in the Zookeeper file
|
||||
* {@code /clusterprops.json} as follows:</p>
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* "placement-plugin":{
|
||||
* "class":"factory.class.name$inner",
|
||||
* "myfirstString": "a text value",
|
||||
* "aLong": 50,
|
||||
* "aDoubleConfig": 3.1415928,
|
||||
* "shouldIStay": true}
|
||||
* </pre>
|
||||
*
|
||||
* <p>In order to delete the placement-plugin section from {@code /clusterprops.json} (and to fallback to either Legacy
|
||||
* or rule based placement if so configured for a collection), execute:</p>
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* curl -X POST -H 'Content-type:application/json' -d '{
|
||||
* "set-placement-plugin" : null
|
||||
* }' http://localhost:8983/api/cluster
|
||||
* </pre>
|
||||
* Configuration beans should use this interface to define public
|
||||
* (mutable) configuration properties. Implementations must have a
|
||||
* public zero-args constructor. Class fields may be optionally
|
||||
* annotated with {@link org.apache.solr.common.annotation.JsonProperty} if needed.
|
||||
*/
|
||||
public interface PlacementPluginConfig {
|
||||
|
||||
/**
|
||||
* The key in {@code clusterprops.json} under which the plugin factory and the plugin configuration are defined.
|
||||
*/
|
||||
String PLACEMENT_PLUGIN_CONFIG_KEY = "placement-plugin";
|
||||
/**
|
||||
* Name of the property containing the factory class
|
||||
*/
|
||||
String FACTORY_CLASS = "class";
|
||||
|
||||
/**
|
||||
* @return the configured {@link String} value corresponding to {@code configName} if one exists (could be the empty
|
||||
* string) and {@code null} otherwise.
|
||||
*/
|
||||
String getStringConfig(String configName);
|
||||
|
||||
/**
|
||||
* @return the configured {@link String} value corresponding to {@code configName} if one exists (could be the empty
|
||||
* string) and {@code defaultValue} otherwise.
|
||||
*/
|
||||
String getStringConfig(String configName, String defaultValue);
|
||||
|
||||
/**
|
||||
* @return the configured {@link Boolean} value corresponding to {@code configName} if one exists, {@code null} otherwise.
|
||||
*/
|
||||
Boolean getBooleanConfig(String configName);
|
||||
|
||||
/**
|
||||
* @return the configured {@link Boolean} value corresponding to {@code configName} if one exists, a boxed {@code defaultValue}
|
||||
* otherwise (this method never returns {@code null}.
|
||||
*/
|
||||
Boolean getBooleanConfig(String configName, boolean defaultValue);
|
||||
|
||||
/**
|
||||
* @return the configured {@link Long} value corresponding to {@code configName} if one exists, {@code null} otherwise.
|
||||
*/
|
||||
Long getLongConfig(String configName);
|
||||
|
||||
/**
|
||||
* @return the configured {@link Long} value corresponding to {@code configName} if one exists, a boxed {@code defaultValue}
|
||||
* otherwise (this method never returns {@code null}.
|
||||
*/
|
||||
Long getLongConfig(String configName, long defaultValue);
|
||||
|
||||
/**
|
||||
* @return the configured {@link Double} value corresponding to {@code configName} if one exists, {@code null} otherwise.
|
||||
*/
|
||||
Double getDoubleConfig(String configName);
|
||||
|
||||
/**
|
||||
* @return the configured {@link Double} value corresponding to {@code configName} if one exists, a boxed {@code defaultValue}
|
||||
* otherwise (this method never returns {@code null}.
|
||||
*/
|
||||
Double getDoubleConfig(String configName, double defaultValue);
|
||||
public interface PlacementPluginConfig extends ReflectMapWriter {
|
||||
}
|
||||
|
|
|
@ -17,15 +17,53 @@
|
|||
|
||||
package org.apache.solr.cluster.placement;
|
||||
|
||||
import org.apache.solr.api.ConfigurablePlugin;
|
||||
|
||||
/**
|
||||
* Factory implemented by client code and configured in {@code solr.xml} allowing the creation of instances of
|
||||
* Factory implemented by client code and configured in container plugins
|
||||
* (see {@link org.apache.solr.handler.admin.ContainerPluginsApi#editAPI})
|
||||
* allowing the creation of instances of
|
||||
* {@link PlacementPlugin} to be used for replica placement computation.
|
||||
* <p>Note: configurable factory implementations should also implement
|
||||
* {@link org.apache.solr.api.ConfigurablePlugin} with the appropriate configuration
|
||||
* bean type.</p>
|
||||
*/
|
||||
public interface PlacementPluginFactory {
|
||||
public interface PlacementPluginFactory<T extends PlacementPluginConfig> extends ConfigurablePlugin<T> {
|
||||
/**
|
||||
* Returns an instance of the plugin that will be repeatedly (and concurrently) be called to compute placement. Multiple
|
||||
* The key in the plugins registry under which this plugin and its configuration are defined.
|
||||
*/
|
||||
String PLUGIN_NAME = ".placement-plugin";
|
||||
|
||||
/**
|
||||
* Returns an instance of the plugin that will be repeatedly (and concurrently) called to compute placement. Multiple
|
||||
* instances of a plugin can be used in parallel (for example if configuration has to change, but plugin instances with
|
||||
* the previous configuration are still being used).
|
||||
* <p>If this method returns null then a simple legacy assignment strategy will be used
|
||||
* (see {@link org.apache.solr.cloud.api.collections.Assign.LegacyAssignStrategy}).</p>
|
||||
*/
|
||||
PlacementPlugin createPluginInstance(PlacementPluginConfig config);
|
||||
PlacementPlugin createPluginInstance();
|
||||
|
||||
/**
|
||||
* Default implementation is a no-op. Override to provide meaningful
|
||||
* behavior if needed.
|
||||
* @param cfg value deserialized from JSON, not null.
|
||||
*/
|
||||
@Override
|
||||
default void configure(T cfg) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configuration of the plugin.
|
||||
* Default implementation returns null.
|
||||
*/
|
||||
default T getConfig() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Useful type for plugins that don't use any configuration.
|
||||
*/
|
||||
class NoConfig implements PlacementPluginConfig {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cluster.placement.impl;
|
||||
|
||||
import org.apache.solr.cluster.placement.PlacementPlugin;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
|
||||
/**
|
||||
* Helper class to support dynamic reloading of plugin implementations.
|
||||
*/
|
||||
public final class DelegatingPlacementPluginFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> {
|
||||
|
||||
private volatile PlacementPluginFactory<? extends PlacementPluginConfig> delegate;
|
||||
// support for tests to make sure the update is completed
|
||||
private volatile int version;
|
||||
|
||||
@Override
|
||||
public PlacementPlugin createPluginInstance() {
|
||||
if (delegate != null) {
|
||||
return delegate.createPluginInstance();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setDelegate(PlacementPluginFactory<? extends PlacementPluginConfig> delegate) {
|
||||
this.delegate = delegate;
|
||||
this.version++;
|
||||
}
|
||||
|
||||
public PlacementPluginFactory<? extends PlacementPluginConfig> getDelegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
public int getVersion() {
|
||||
return version;
|
||||
}
|
||||
}
|
|
@ -24,6 +24,9 @@ import org.apache.solr.cluster.placement.*;
|
|||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Simple implementation of {@link PlacementPlanFactory}.
|
||||
*/
|
||||
public class PlacementPlanFactoryImpl implements PlacementPlanFactory {
|
||||
@Override
|
||||
public PlacementPlan createPlacementPlan(PlacementRequest request, Set<ReplicaPlacement> replicaPlacements) {
|
||||
|
|
|
@ -1,198 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cluster.placement.impl;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.cluster.placement.PlacementPlugin;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
import org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
||||
/**
|
||||
* <p>This concrete class is implementing the config as visible by the placement plugins and contains the code transforming the
|
||||
* plugin configuration (currently stored in {@code clusterprops.json} into a strongly typed abstraction (that will not
|
||||
* change if internally plugin configuration is moved to some other place).</p>
|
||||
*
|
||||
* <p>This class also contains the (static) code dealing with instantiating the plugin factory config (it is config, even though
|
||||
* of a slightly different type). This code is not accessed by the plugin code but used from the
|
||||
* {@link org.apache.solr.cloud.api.collections.Assign} class.</p>
|
||||
*/
|
||||
public class PlacementPluginConfigImpl implements PlacementPluginConfig {
|
||||
|
||||
// Separating configs into typed maps based on the element names in solr.xml
|
||||
private final Map<String, String> stringConfigs;
|
||||
private final Map<String, Long> longConfigs;
|
||||
private final Map<String, Boolean> boolConfigs;
|
||||
private final Map<String, Double> doubleConfigs;
|
||||
|
||||
|
||||
private PlacementPluginConfigImpl(Map<String, String> stringConfigs,
|
||||
Map<String, Long> longConfigs,
|
||||
Map<String, Boolean> boolConfigs,
|
||||
Map<String, Double> doubleConfigs) {
|
||||
this.stringConfigs = stringConfigs;
|
||||
this.longConfigs = longConfigs;
|
||||
this.boolConfigs = boolConfigs;
|
||||
this.doubleConfigs = doubleConfigs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStringConfig(String configName) {
|
||||
return stringConfigs.get(configName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStringConfig(String configName, String defaultValue) {
|
||||
String retval = stringConfigs.get(configName);
|
||||
return retval != null ? retval : defaultValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean getBooleanConfig(String configName) {
|
||||
return boolConfigs.get(configName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean getBooleanConfig(String configName, boolean defaultValue) {
|
||||
Boolean retval = boolConfigs.get(configName);
|
||||
return retval != null ? retval : defaultValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLongConfig(String configName) {
|
||||
return longConfigs.get(configName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLongConfig(String configName, long defaultValue) {
|
||||
Long retval = longConfigs.get(configName);
|
||||
return retval != null ? retval : defaultValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getDoubleConfig(String configName) {
|
||||
return doubleConfigs.get(configName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getDoubleConfig(String configName, double defaultValue) {
|
||||
Double retval = doubleConfigs.get(configName);
|
||||
return retval != null ? retval : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Parses the {@link Map} obtained as the value for key {@link #PLACEMENT_PLUGIN_CONFIG_KEY} from
|
||||
* the {@code clusterprops.json} configuration {@link Map} (obtained by calling
|
||||
* {@link org.apache.solr.client.solrj.impl.ClusterStateProvider#getClusterProperties()}) and translates it into a
|
||||
* configuration consumable by the plugin (and that will not change as Solr changes internally how and where it stores
|
||||
* configuration).</p>
|
||||
*
|
||||
* <p>Configuration properties {@code class} and {@code name} are reserved: for defining the plugin factory class and
|
||||
* a human readable plugin name. All other properties are plugin specific.</p>
|
||||
*
|
||||
* <p>See configuration example and how-to in {@link AffinityPlacementFactory}.</p>
|
||||
*/
|
||||
public static PlacementPluginConfig createConfigFromProperties(Map<String, Object> pluginConfig) {
|
||||
final Map<String, String> stringConfigs = new HashMap<>();
|
||||
final Map<String, Long> longConfigs = new HashMap<>();
|
||||
final Map<String, Boolean> boolConfigs = new HashMap<>();
|
||||
final Map<String, Double> doubleConfigs = new HashMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> e : pluginConfig.entrySet()) {
|
||||
String key = e.getKey();
|
||||
if (PlacementPluginConfig.FACTORY_CLASS.equals(key)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (key == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config name attribute in parameter of " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
|
||||
}
|
||||
|
||||
Object value = e.getValue();
|
||||
|
||||
if (value == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing config value for parameter " + key + " of " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
|
||||
}
|
||||
|
||||
if (value instanceof String) {
|
||||
stringConfigs.put(key, (String) value);
|
||||
} else if (value instanceof Long) {
|
||||
longConfigs.put(key, (Long) value);
|
||||
} else if (value instanceof Boolean) {
|
||||
boolConfigs.put(key, (Boolean) value);
|
||||
} else if (value instanceof Double) {
|
||||
doubleConfigs.put(key, (Double) value);
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unsupported config type " + value.getClass().getName() +
|
||||
" for parameter " + key + " of " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
|
||||
}
|
||||
}
|
||||
|
||||
return new PlacementPluginConfigImpl(stringConfigs, longConfigs, boolConfigs, doubleConfigs);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>This is where the plugin configuration is being read (from wherever in Solr it lives, and this will likely change with time),
|
||||
* a {@link org.apache.solr.cluster.placement.PlacementPluginFactory} (as configured) instantiated and a plugin instance
|
||||
* created from this factory.</p>
|
||||
*
|
||||
* <p>The initial implementation you see here is crude! the configuration is read anew each time and the factory class
|
||||
* as well as the plugin class instantiated each time.
|
||||
* This has to be changed once the code is accepted overall, to register a listener that is notified when the configuration
|
||||
* changes (see {@link org.apache.solr.common.cloud.ZkStateReader#registerClusterPropertiesListener})
|
||||
* and that will either create a new instance of the plugin with new configuration using the existing factory (if the factory
|
||||
* class has not changed - we need to keep track of this one) of create a new factory altogether (then a new plugin instance).</p>
|
||||
*/
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public static PlacementPlugin getPlacementPlugin(SolrCloudManager solrCloudManager) {
|
||||
Map<String, Object> props = solrCloudManager.getClusterStateProvider().getClusterProperties();
|
||||
Map<String, Object> pluginConfigMap = (Map<String, Object>) props.get(PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
|
||||
|
||||
if (pluginConfigMap == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String pluginFactoryClassName = (String) pluginConfigMap.get(PlacementPluginConfig.FACTORY_CLASS);
|
||||
|
||||
// Get the configured plugin factory class. Is there a way to load a resource in Solr without being in the context of
|
||||
// CoreContainer? Here the placement code is unrelated to the presence of cores (and one can imagine it running on
|
||||
// specialized nodes not having a CoreContainer). I guess the loading code below is not totally satisfying (although
|
||||
// it's not the only place in Solr doing it that way), but I didn't find more satisfying alternatives. Open to suggestions.
|
||||
PlacementPluginFactory placementPluginFactory;
|
||||
try {
|
||||
Class<? extends PlacementPluginFactory> factoryClazz =
|
||||
Class.forName(pluginFactoryClassName, true, PlacementPluginConfigImpl.class.getClassLoader())
|
||||
.asSubclass(PlacementPluginFactory.class);
|
||||
|
||||
placementPluginFactory = factoryClazz.getConstructor().newInstance(); // no args constructor - that's why we introduced a factory...
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to instantiate placement-plugin factory: " +
|
||||
Utils.toJSONString(pluginConfigMap) + " please review /clusterprops.json config for " + PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY, e);
|
||||
}
|
||||
|
||||
// Translate the config from the properties where they are defined into the abstraction seen by the plugin
|
||||
PlacementPluginConfig pluginConfig = createConfigFromProperties(pluginConfigMap);
|
||||
|
||||
return placementPluginFactory.createPluginInstance(pluginConfig);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cluster.placement.impl;
|
||||
|
||||
import org.apache.solr.api.ContainerPluginsRegistry;
|
||||
import org.apache.solr.client.solrj.request.beans.PluginMeta;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
/**
|
||||
* Utility class to load the configured {@link PlacementPluginFactory} plugin and
|
||||
* then keep it up to date as the plugin configuration changes.
|
||||
*/
|
||||
public class PlacementPluginFactoryLoader {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public static void load(DelegatingPlacementPluginFactory pluginFactory, ContainerPluginsRegistry plugins) {
|
||||
ContainerPluginsRegistry.ApiInfo pluginFactoryInfo = plugins.getPlugin(PlacementPluginFactory.PLUGIN_NAME);
|
||||
if (pluginFactoryInfo != null && (pluginFactoryInfo.getInstance() instanceof PlacementPluginFactory)) {
|
||||
pluginFactory.setDelegate((PlacementPluginFactory<? extends PlacementPluginConfig>) pluginFactoryInfo.getInstance());
|
||||
}
|
||||
ContainerPluginsRegistry.PluginRegistryListener pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
|
||||
@Override
|
||||
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
|
||||
if (plugin == null || plugin.getInstance() == null) {
|
||||
return;
|
||||
}
|
||||
Object instance = plugin.getInstance();
|
||||
if (instance instanceof PlacementPluginFactory) {
|
||||
setDelegate(plugin.getInfo(), (PlacementPluginFactory<? extends PlacementPluginConfig>) instance);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
|
||||
if (plugin == null || plugin.getInstance() == null) {
|
||||
return;
|
||||
}
|
||||
Object instance = plugin.getInstance();
|
||||
if (instance instanceof PlacementPluginFactory) {
|
||||
setDelegate(plugin.getInfo(), null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
|
||||
added(replacement);
|
||||
}
|
||||
|
||||
private void setDelegate(PluginMeta pluginMeta, PlacementPluginFactory<? extends PlacementPluginConfig> factory) {
|
||||
if (PlacementPluginFactory.PLUGIN_NAME.equals(pluginMeta.name)) {
|
||||
pluginFactory.setDelegate(factory);
|
||||
} else {
|
||||
log.warn("Ignoring PlacementPluginFactory plugin with non-standard name: {}", pluginMeta);
|
||||
}
|
||||
}
|
||||
};
|
||||
plugins.registerListener(pluginListener);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cluster.placement.plugins;
|
||||
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.common.annotation.JsonProperty;
|
||||
|
||||
/**
|
||||
* Configuration bean for {@link AffinityPlacementFactory}.
|
||||
*/
|
||||
public class AffinityPlacementConfig implements PlacementPluginConfig {
|
||||
|
||||
public static final AffinityPlacementConfig DEFAULT = new AffinityPlacementConfig();
|
||||
|
||||
/**
|
||||
* If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions.
|
||||
* Set to 0 or less to disable.
|
||||
*/
|
||||
@JsonProperty
|
||||
public long minimalFreeDiskGB;
|
||||
|
||||
/**
|
||||
* Replica allocation will assign replicas to nodes with at least this number of GB of free disk space regardless
|
||||
* of the number of cores on these nodes rather than assigning replicas to nodes with less than this amount of free
|
||||
* disk space if that's an option (if that's not an option, replicas can still be assigned to nodes with less than this
|
||||
* amount of free space).
|
||||
*/
|
||||
@JsonProperty
|
||||
public long prioritizedFreeDiskGB;
|
||||
|
||||
// no-arg public constructor required for deserialization
|
||||
public AffinityPlacementConfig() {
|
||||
minimalFreeDiskGB = 20L;
|
||||
prioritizedFreeDiskGB = 100L;
|
||||
}
|
||||
|
||||
public AffinityPlacementConfig(long minimalFreeDiskGB, long prioritizedFreeDiskGB) {
|
||||
this.minimalFreeDiskGB = minimalFreeDiskGB;
|
||||
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
|
||||
}
|
||||
}
|
|
@ -115,7 +115,7 @@ import java.util.stream.Collectors;
|
|||
* make it relatively easy to adapt it to (somewhat) different assumptions. Configuration options could be introduced
|
||||
* to allow configuration base option selection as well...</p>
|
||||
*/
|
||||
public class AffinityPlacementFactory implements PlacementPluginFactory {
|
||||
public class AffinityPlacementFactory implements PlacementPluginFactory<AffinityPlacementConfig> {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
/**
|
||||
|
@ -140,19 +140,7 @@ public class AffinityPlacementFactory implements PlacementPluginFactory {
|
|||
*/
|
||||
public static final String UNDEFINED_AVAILABILITY_ZONE = "uNd3f1NeD";
|
||||
|
||||
/**
|
||||
* If a node has strictly less GB of free disk than this value, the node is excluded from assignment decisions.
|
||||
* Set to 0 or less to disable.
|
||||
*/
|
||||
public static final String MINIMAL_FREE_DISK_GB = "minimalFreeDiskGB";
|
||||
|
||||
/**
|
||||
* Replica allocation will assign replicas to nodes with at least this number of GB of free disk space regardless
|
||||
* of the number of cores on these nodes rather than assigning replicas to nodes with less than this amount of free
|
||||
* disk space if that's an option (if that's not an option, replicas can still be assigned to nodes with less than this
|
||||
* amount of free space).
|
||||
*/
|
||||
public static final String PRIORITIZED_FREE_DISK_GB = "prioritizedFreeDiskGB";
|
||||
private AffinityPlacementConfig config = AffinityPlacementConfig.DEFAULT;
|
||||
|
||||
/**
|
||||
* Empty public constructor is used to instantiate this factory. Using a factory pattern to allow the factory to do one
|
||||
|
@ -164,10 +152,19 @@ public class AffinityPlacementFactory implements PlacementPluginFactory {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
|
||||
final long minimalFreeDiskGB = config.getLongConfig(MINIMAL_FREE_DISK_GB, 20L);
|
||||
final long prioritizedFreeDiskGB = config.getLongConfig(PRIORITIZED_FREE_DISK_GB, 100L);
|
||||
return new AffinityPlacementPlugin(minimalFreeDiskGB, prioritizedFreeDiskGB);
|
||||
public PlacementPlugin createPluginInstance() {
|
||||
return new AffinityPlacementPlugin(config.minimalFreeDiskGB, config.prioritizedFreeDiskGB);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(AffinityPlacementConfig cfg) {
|
||||
Objects.requireNonNull(cfg, "configuration must never be null");
|
||||
this.config = cfg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AffinityPlacementConfig getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -40,10 +40,10 @@ import org.apache.solr.common.util.SuppressForbidden;
|
|||
*
|
||||
* <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.</p>
|
||||
*/
|
||||
public class MinimizeCoresPlacementFactory implements PlacementPluginFactory {
|
||||
public class MinimizeCoresPlacementFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> {
|
||||
|
||||
@Override
|
||||
public PlacementPlugin createPluginInstance(PlacementPluginConfig config) {
|
||||
public PlacementPlugin createPluginInstance() {
|
||||
return new MinimizeCoresPlacementPlugin();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cluster.placement.plugins;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.cluster.Cluster;
|
||||
import org.apache.solr.cluster.Node;
|
||||
import org.apache.solr.cluster.Replica;
|
||||
import org.apache.solr.cluster.SolrCollection;
|
||||
import org.apache.solr.cluster.placement.*;
|
||||
|
||||
/**
|
||||
* <p>Factory for creating {@link RandomPlacementPlugin}, a placement plugin implementing random placement for new
|
||||
* collection creation while preventing two replicas of same shard from being placed on same node..</p>
|
||||
*
|
||||
* <p>See {@link AffinityPlacementFactory} for a more realistic example and documentation.</p>
|
||||
*/
|
||||
public class RandomPlacementFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> {
|
||||
|
||||
@Override
|
||||
public PlacementPlugin createPluginInstance() {
|
||||
return new RandomPlacementPlugin();
|
||||
}
|
||||
|
||||
public static class RandomPlacementPlugin implements PlacementPlugin {
|
||||
private final Random replicaPlacementRandom = new Random(); // ok even if random sequence is predictable.
|
||||
|
||||
private RandomPlacementPlugin() {
|
||||
// We make things reproducible in tests by using test seed if any
|
||||
String seed = System.getProperty("tests.seed");
|
||||
if (seed != null) {
|
||||
replicaPlacementRandom.setSeed(seed.hashCode());
|
||||
}
|
||||
}
|
||||
|
||||
public PlacementPlan computePlacement(Cluster cluster, PlacementRequest request, AttributeFetcher attributeFetcher,
|
||||
PlacementPlanFactory placementPlanFactory) throws PlacementException {
|
||||
int totalReplicasPerShard = 0;
|
||||
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
|
||||
totalReplicasPerShard += request.getCountReplicasToCreate(rt);
|
||||
}
|
||||
|
||||
if (cluster.getLiveNodes().size() < totalReplicasPerShard) {
|
||||
throw new PlacementException("Cluster size too small for number of replicas per shard");
|
||||
}
|
||||
|
||||
Set<ReplicaPlacement> replicaPlacements = new HashSet<>(totalReplicasPerShard * request.getShardNames().size());
|
||||
|
||||
// Now place randomly all replicas of all shards on available nodes
|
||||
for (String shardName : request.getShardNames()) {
|
||||
// Shuffle the nodes for each shard so that replicas for a shard are placed on distinct yet random nodes
|
||||
ArrayList<Node> nodesToAssign = new ArrayList<>(cluster.getLiveNodes());
|
||||
Collections.shuffle(nodesToAssign, replicaPlacementRandom);
|
||||
|
||||
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
|
||||
placeForReplicaType(request.getCollection(), nodesToAssign, placementPlanFactory, replicaPlacements, shardName, request, rt);
|
||||
}
|
||||
}
|
||||
|
||||
return placementPlanFactory.createPlacementPlan(request, replicaPlacements);
|
||||
}
|
||||
|
||||
private void placeForReplicaType(SolrCollection solrCollection, ArrayList<Node> nodesToAssign, PlacementPlanFactory placementPlanFactory,
|
||||
Set<ReplicaPlacement> replicaPlacements,
|
||||
String shardName, PlacementRequest request, Replica.ReplicaType replicaType) {
|
||||
for (int replica = 0; replica < request.getCountReplicasToCreate(replicaType); replica++) {
|
||||
Node node = nodesToAssign.remove(0);
|
||||
|
||||
replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, node, replicaType));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -74,6 +74,10 @@ import org.apache.solr.cloud.OverseerTaskQueue;
|
|||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cluster.events.ClusterEventProducer;
|
||||
import org.apache.solr.cluster.events.impl.ClusterEventProducerFactory;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
import org.apache.solr.cluster.placement.impl.DelegatingPlacementPluginFactory;
|
||||
import org.apache.solr.cluster.placement.impl.PlacementPluginFactoryLoader;
|
||||
import org.apache.solr.common.AlreadyClosedException;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
|
@ -255,8 +259,8 @@ public class CoreContainer {
|
|||
!getZkController().getOverseer().isClosed(),
|
||||
(r) -> this.runAsync(r));
|
||||
|
||||
// initially these are the same to collect the plugin-based listeners during init
|
||||
private ClusterEventProducer clusterEventProducer;
|
||||
private volatile ClusterEventProducer clusterEventProducer;
|
||||
private final DelegatingPlacementPluginFactory placementPluginFactory = new DelegatingPlacementPluginFactory();
|
||||
|
||||
private PackageStoreAPI packageStoreAPI;
|
||||
private PackageLoader packageLoader;
|
||||
|
@ -896,6 +900,10 @@ public class CoreContainer {
|
|||
containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
|
||||
containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
|
||||
|
||||
// initialize the placement plugin factory wrapper
|
||||
// with the plugin configuration from the registry
|
||||
PlacementPluginFactoryLoader.load(placementPluginFactory, containerPluginsRegistry);
|
||||
|
||||
// create target ClusterEventProducer (possibly from plugins)
|
||||
clusterEventProducer = clusterEventProducerFactory.create(containerPluginsRegistry);
|
||||
|
||||
|
@ -2180,6 +2188,10 @@ public class CoreContainer {
|
|||
return clusterEventProducer;
|
||||
}
|
||||
|
||||
public PlacementPluginFactory<? extends PlacementPluginConfig> getPlacementPluginFactory() {
|
||||
return placementPluginFactory;
|
||||
}
|
||||
|
||||
static {
|
||||
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
|
||||
}
|
||||
|
|
|
@ -27,8 +27,6 @@ import org.apache.solr.client.solrj.request.beans.ClusterPropInfo;
|
|||
import org.apache.solr.client.solrj.request.beans.CreateConfigInfo;
|
||||
import org.apache.solr.client.solrj.request.beans.RateLimiterMeta;
|
||||
import org.apache.solr.cloud.OverseerConfigSetMessageHandler;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.common.MapWriterMap;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.annotation.JsonProperty;
|
||||
import org.apache.solr.common.cloud.ClusterProperties;
|
||||
|
@ -243,26 +241,6 @@ public class ClusterAPI {
|
|||
collectionsHandler.handleRequestBody(wrapParams(obj.getRequest(), m), obj.getResponse());
|
||||
}
|
||||
|
||||
@Command(name = "set-placement-plugin")
|
||||
public void setPlacementPlugin(PayloadObj<Map<String, Object>> obj) {
|
||||
Map<String, Object> placementPluginConfig = obj.getDataMap();
|
||||
if(placementPluginConfig.isEmpty()) placementPluginConfig = null;
|
||||
ClusterProperties clusterProperties = new ClusterProperties(getCoreContainer().getZkController().getZkClient());
|
||||
// When the json contains { "set-placement-plugin" : null }, the map is empty, not null.
|
||||
// Very basic sanity check. Real validation will be done when the config is used...
|
||||
if (!(placementPluginConfig == null) && !placementPluginConfig.containsKey(PlacementPluginConfig.FACTORY_CLASS)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Must contain " + PlacementPluginConfig.FACTORY_CLASS + " attribute (or be null)");
|
||||
}
|
||||
try {
|
||||
clusterProperties.update(placementPluginConfig == null?
|
||||
null:
|
||||
new MapWriterMap(placementPluginConfig),
|
||||
PlacementPluginConfig.PLACEMENT_PLUGIN_CONFIG_KEY);
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error in API", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "set-ratelimiter")
|
||||
public void setRateLimiters(PayloadObj<RateLimiterMeta> payLoad) {
|
||||
RateLimiterMeta rateLimiterConfig = payLoad.get();
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.solr.client.solrj.impl.ClusterStateProvider;
|
|||
import org.apache.solr.cloud.Overseer.LeaderStatus;
|
||||
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
|
||||
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
|
@ -122,8 +123,10 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
private static CoreContainer coreContainerMock;
|
||||
private static UpdateShardHandler updateShardHandlerMock;
|
||||
private static HttpClient httpClientMock;
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static PlacementPluginFactory placementPluginFactoryMock;
|
||||
private static SolrMetricsContext solrMetricsContextMock;
|
||||
|
||||
|
||||
private static ObjectCache objectCache;
|
||||
private Map<String, byte[]> zkClientData = new HashMap<>();
|
||||
private final Map<String, ClusterState.CollectionRef> collectionsSet = new HashMap<>();
|
||||
|
@ -183,6 +186,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
coreContainerMock = mock(CoreContainer.class);
|
||||
updateShardHandlerMock = mock(UpdateShardHandler.class);
|
||||
httpClientMock = mock(HttpClient.class);
|
||||
placementPluginFactoryMock = mock(PlacementPluginFactory.class);
|
||||
solrMetricsContextMock = mock(SolrMetricsContext.class);
|
||||
}
|
||||
|
||||
|
@ -208,6 +212,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
coreContainerMock = null;
|
||||
updateShardHandlerMock = null;
|
||||
httpClientMock = null;
|
||||
placementPluginFactoryMock = null;
|
||||
solrMetricsContextMock = null;
|
||||
}
|
||||
|
||||
|
@ -238,6 +243,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
reset(coreContainerMock);
|
||||
reset(updateShardHandlerMock);
|
||||
reset(httpClientMock);
|
||||
reset(placementPluginFactoryMock);
|
||||
reset(solrMetricsContextMock);
|
||||
|
||||
zkClientData.clear();
|
||||
|
@ -250,7 +256,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
stopComponentUnderTest();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Set<String> commonMocks(int liveNodesCount) throws Exception {
|
||||
when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock);
|
||||
when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> {
|
||||
|
@ -367,6 +374,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
|
|||
when(overseerMock.getSolrCloudManager()).thenReturn(cloudDataProviderMock);
|
||||
when(overseerMock.getCoreContainer()).thenReturn(coreContainerMock);
|
||||
when(coreContainerMock.getUpdateShardHandler()).thenReturn(updateShardHandlerMock);
|
||||
when(coreContainerMock.getPlacementPluginFactory()).thenReturn(placementPluginFactoryMock);
|
||||
when(updateShardHandlerMock.getDefaultHttpClient()).thenReturn(httpClientMock);
|
||||
|
||||
when(zkControllerMock.getSolrCloudManager()).thenReturn(cloudDataProviderMock);
|
||||
|
|
|
@ -20,20 +20,35 @@ package org.apache.solr.cluster.placement.impl;
|
|||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.client.solrj.request.beans.PluginMeta;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.client.solrj.response.V2Response;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginConfig;
|
||||
import org.apache.solr.cluster.placement.PlacementPluginFactory;
|
||||
import org.apache.solr.cluster.placement.plugins.AffinityPlacementConfig;
|
||||
import org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory;
|
||||
import org.apache.solr.common.cloud.ClusterProperties;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
@ -41,12 +56,14 @@ import static java.util.Collections.singletonMap;
|
|||
/**
|
||||
* Test for {@link MinimizeCoresPlacementFactory} using a {@link MiniSolrCloudCluster}.
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cluster.placement.impl=DEBUG")
|
||||
public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String COLLECTION = PlacementPluginIntegrationTest.class.getName() + "_collection";
|
||||
|
||||
private static ClusterProperties clusterProperties;
|
||||
private static SolrCloudManager cloudManager;
|
||||
private static CoreContainer cc;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
|
@ -55,29 +72,37 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
|
|||
configureCluster(3)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
clusterProperties = new ClusterProperties(cluster.getZkClient());
|
||||
cc = cluster.getJettySolrRunner(0).getCoreContainer();
|
||||
cloudManager = cc.getZkController().getSolrCloudManager();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
V2Request req = new V2Request.Builder("/cluster")
|
||||
V2Request req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload(singletonMap("set-placement-plugin", Map.of()))
|
||||
.GET()
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
|
||||
V2Response rsp = req.process(cluster.getSolrClient());
|
||||
if (rsp._get(Arrays.asList("plugin", PlacementPluginFactory.PLUGIN_NAME), null) != null) {
|
||||
req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinimizeCores() throws Exception {
|
||||
Map<String, Object> config = Map.of(PlacementPluginConfig.FACTORY_CLASS, MinimizeCoresPlacementFactory.class.getName());
|
||||
V2Request req = new V2Request.Builder("/cluster")
|
||||
PluginMeta plugin = new PluginMeta();
|
||||
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
|
||||
plugin.klass = MinimizeCoresPlacementFactory.class.getName();
|
||||
V2Request req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload(singletonMap("set-placement-plugin", config))
|
||||
.withPayload(singletonMap("add", plugin))
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
|
||||
|
@ -90,9 +115,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
|
|||
DocCollection collection = clusterState.getCollectionOrNull(COLLECTION);
|
||||
assertNotNull(collection);
|
||||
Map<String, AtomicInteger> coresByNode = new HashMap<>();
|
||||
collection.forEachReplica((shard, replica) -> {
|
||||
coresByNode.computeIfAbsent(replica.getNodeName(), n -> new AtomicInteger()).incrementAndGet();
|
||||
});
|
||||
collection.forEachReplica((shard, replica) -> coresByNode.computeIfAbsent(replica.getNodeName(), n -> new AtomicInteger()).incrementAndGet());
|
||||
int maxCores = 0;
|
||||
int minCores = Integer.MAX_VALUE;
|
||||
for (Map.Entry<String, AtomicInteger> entry : coresByNode.entrySet()) {
|
||||
|
@ -109,4 +132,108 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
|
|||
assertEquals("min cores too low", 1, minCores);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDynamicReconfiguration() throws Exception {
|
||||
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
|
||||
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
|
||||
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
|
||||
|
||||
int version = wrapper.getVersion();
|
||||
log.debug("--initial version={}", version);
|
||||
|
||||
PluginMeta plugin = new PluginMeta();
|
||||
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
|
||||
plugin.klass = MinimizeCoresPlacementFactory.class.getName();
|
||||
V2Request req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload(singletonMap("add", plugin))
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
|
||||
version = waitForVersionChange(version, wrapper, 10);
|
||||
|
||||
assertTrue("wrong version " + version, version > 0);
|
||||
PlacementPluginFactory<? extends PlacementPluginConfig> factory = wrapper.getDelegate();
|
||||
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof MinimizeCoresPlacementFactory);
|
||||
|
||||
// reconfigure
|
||||
plugin.klass = AffinityPlacementFactory.class.getName();
|
||||
plugin.config = new AffinityPlacementConfig(1, 2);
|
||||
req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload(singletonMap("update", plugin))
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
|
||||
version = waitForVersionChange(version, wrapper, 10);
|
||||
|
||||
factory = wrapper.getDelegate();
|
||||
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
|
||||
AffinityPlacementConfig config = ((AffinityPlacementFactory) factory).getConfig();
|
||||
assertEquals("minimalFreeDiskGB", 1, config.minimalFreeDiskGB);
|
||||
assertEquals("prioritizedFreeDiskGB", 2, config.prioritizedFreeDiskGB);
|
||||
|
||||
// change plugin config
|
||||
plugin.config = new AffinityPlacementConfig(3, 4);
|
||||
req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload(singletonMap("update", plugin))
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
|
||||
version = waitForVersionChange(version, wrapper, 10);
|
||||
factory = wrapper.getDelegate();
|
||||
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
|
||||
config = ((AffinityPlacementFactory) factory).getConfig();
|
||||
assertEquals("minimalFreeDiskGB", 3, config.minimalFreeDiskGB);
|
||||
assertEquals("prioritizedFreeDiskGB", 4, config.prioritizedFreeDiskGB);
|
||||
|
||||
// add plugin of the right type but with the wrong name
|
||||
plugin.name = "myPlugin";
|
||||
req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload(singletonMap("add", plugin))
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
try {
|
||||
int newVersion = waitForVersionChange(version, wrapper, 5);
|
||||
if (newVersion != version) {
|
||||
fail("factory configuration updated but plugin name was wrong: " + plugin);
|
||||
}
|
||||
} catch (TimeoutException te) {
|
||||
// expected
|
||||
}
|
||||
// remove plugin
|
||||
req = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
.withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
|
||||
.build();
|
||||
req.process(cluster.getSolrClient());
|
||||
waitForVersionChange(version, wrapper, 10);
|
||||
factory = wrapper.getDelegate();
|
||||
assertNull("no factory should be present", factory);
|
||||
}
|
||||
|
||||
private int waitForVersionChange(int currentVersion, DelegatingPlacementPluginFactory wrapper, int timeoutSec) throws Exception {
|
||||
TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
|
||||
while (!timeout.hasTimedOut()) {
|
||||
int newVersion = wrapper.getVersion();
|
||||
if (newVersion < currentVersion) {
|
||||
throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
|
||||
" newVersion=" + newVersion);
|
||||
} else if (currentVersion < newVersion) {
|
||||
log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
|
||||
return newVersion;
|
||||
}
|
||||
timeout.sleep(200);
|
||||
}
|
||||
throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.solr.cluster.SolrCollection;
|
|||
import org.apache.solr.cluster.placement.*;
|
||||
import org.apache.solr.cluster.placement.Builders;
|
||||
import org.apache.solr.cluster.placement.impl.PlacementPlanFactoryImpl;
|
||||
import org.apache.solr.cluster.placement.impl.PlacementPluginConfigImpl;
|
||||
import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -54,9 +53,10 @@ public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
|
|||
|
||||
@BeforeClass
|
||||
public static void setupPlugin() {
|
||||
PlacementPluginConfig config = PlacementPluginConfigImpl.createConfigFromProperties(
|
||||
Map.of("minimalFreeDiskGB", MINIMAL_FREE_DISK_GB, "prioritizedFreeDiskGB", PRIORITIZED_FREE_DISK_GB));
|
||||
plugin = new AffinityPlacementFactory().createPluginInstance(config);
|
||||
AffinityPlacementConfig config = new AffinityPlacementConfig(MINIMAL_FREE_DISK_GB, PRIORITIZED_FREE_DISK_GB);
|
||||
AffinityPlacementFactory factory = new AffinityPlacementFactory();
|
||||
factory.configure(config);
|
||||
plugin = factory.createPluginInstance();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -193,12 +193,14 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC1()));
|
||||
assertEquals( CConfig.class, ContainerPluginsRegistry.getConfigClass(new CC2()));
|
||||
|
||||
CConfig p = new CConfig();
|
||||
p.boolVal = Boolean.TRUE;
|
||||
p.strVal = "Something";
|
||||
p.longVal = 1234L;
|
||||
CConfig cfg = new CConfig();
|
||||
cfg.boolVal = Boolean.TRUE;
|
||||
cfg.strVal = "Something";
|
||||
cfg.longVal = 1234L;
|
||||
PluginMeta p = new PluginMeta();
|
||||
p.name = "hello";
|
||||
p.klass = CC.class.getName();
|
||||
p.config = cfg;
|
||||
|
||||
new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
|
@ -213,7 +215,7 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
.build().process(cluster.getSolrClient()),
|
||||
ImmutableMap.of("/config/boolVal", "true", "/config/strVal", "Something","/config/longVal", "1234" ));
|
||||
|
||||
p.strVal = "Something else";
|
||||
cfg.strVal = "Something else";
|
||||
new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.POST()
|
||||
|
@ -226,7 +228,7 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
.forceV2(true)
|
||||
.GET()
|
||||
.build().process(cluster.getSolrClient()),
|
||||
ImmutableMap.of("/config/boolVal", "true", "/config/strVal", p.strVal,"/config/longVal", "1234" ));
|
||||
ImmutableMap.of("/config/boolVal", "true", "/config/strVal", cfg.strVal,"/config/longVal", "1234" ));
|
||||
|
||||
// kill the Overseer leader
|
||||
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||
|
@ -391,12 +393,6 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
|
||||
@JsonProperty
|
||||
public Boolean boolVal;
|
||||
|
||||
@JsonProperty
|
||||
public String name;
|
||||
|
||||
@JsonProperty(value = "class", required = true)
|
||||
public String klass;
|
||||
}
|
||||
|
||||
public static class C6 implements ClusterSingleton {
|
||||
|
|
|
@ -26,24 +26,33 @@ import org.apache.solr.common.util.ReflectMapWriter;
|
|||
* POJO for a plugin metadata used in container plugins
|
||||
*/
|
||||
public class PluginMeta implements ReflectMapWriter {
|
||||
/** Unique plugin name, required. */
|
||||
@JsonProperty(required = true)
|
||||
public String name;
|
||||
|
||||
/** Plugin implementation class, required. */
|
||||
@JsonProperty(value = "class", required = true)
|
||||
public String klass;
|
||||
|
||||
/** Plugin version. */
|
||||
@JsonProperty
|
||||
public String version;
|
||||
|
||||
/** Plugin API path prefix, optional. */
|
||||
@JsonProperty("path-prefix")
|
||||
public String pathPrefix;
|
||||
|
||||
/** Plugin configuration object, optional. */
|
||||
@JsonProperty
|
||||
public Object config;
|
||||
|
||||
|
||||
public PluginMeta copy() {
|
||||
PluginMeta result = new PluginMeta();
|
||||
result.name = name;
|
||||
result.klass = klass;
|
||||
result.version = version;
|
||||
result.config = config;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -53,7 +62,8 @@ public class PluginMeta implements ReflectMapWriter {
|
|||
PluginMeta that = (PluginMeta) obj;
|
||||
return Objects.equals(this.name, that.name) &&
|
||||
Objects.equals(this.klass, that.klass) &&
|
||||
Objects.equals(this.version, that.version);
|
||||
Objects.equals(this.version, that.version) &&
|
||||
Objects.equals(this.config, that.config);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -61,4 +71,9 @@ public class PluginMeta implements ReflectMapWriter {
|
|||
public int hashCode() {
|
||||
return Objects.hash(name, version, klass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return jsonStr();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue