mirror of https://github.com/apache/lucene.git
SOLR-11833: Allow searchRate trigger to delete replicas.
This commit is contained in:
parent
1409ab8f84
commit
0d969ab85d
|
@ -182,9 +182,11 @@ Bug Fixes
|
|||
|
||||
* SOLR-12250: NegativeArraySizeException on TransactionLog if previous document more than 1.9GB (Cao Manh Dat)
|
||||
|
||||
|
||||
* SOLR-12253: Remove optimize button from the core admin page (Erick Erickson)
|
||||
|
||||
* SOLR-11833: Allow searchRate trigger to delete replicas. Improve configurability of the trigger by specifying
|
||||
upper / lower thresholds and respective actions (ab)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -143,11 +143,11 @@ public class IndexSizeTrigger extends TriggerBase {
|
|||
String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
|
||||
aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
|
||||
if (aboveOp == null) {
|
||||
throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of " + ABOVE_OP_PROP + ": '" + aboveOpStr + "'");
|
||||
throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of: '" + aboveOpStr + "'");
|
||||
}
|
||||
belowOp = CollectionParams.CollectionAction.get(belowOpStr);
|
||||
if (belowOp == null) {
|
||||
throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of " + BELOW_OP_PROP + ": '" + belowOpStr + "'");
|
||||
throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of: '" + belowOpStr + "'");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,17 +16,21 @@
|
|||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
|
@ -34,9 +38,13 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.metrics.SolrCoreMetricManager;
|
||||
|
@ -49,11 +57,43 @@ import org.slf4j.LoggerFactory;
|
|||
public class SearchRateTrigger extends TriggerBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private String handler;
|
||||
private String collection;
|
||||
public static final String COLLECTIONS_PROP = "collections";
|
||||
public static final String METRIC_PROP = "metric";
|
||||
public static final String MAX_OPS_PROP = "maxOps";
|
||||
public static final String MIN_REPLICAS_PROP = "minReplicas";
|
||||
public static final String ABOVE_RATE_PROP = "aboveRate";
|
||||
public static final String BELOW_RATE_PROP = "belowRate";
|
||||
public static final String ABOVE_OP_PROP = "aboveOp";
|
||||
public static final String BELOW_OP_PROP = "belowOp";
|
||||
public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp";
|
||||
public static final String BELOW_NODE_OP_PROP = "belowNodeOp";
|
||||
|
||||
// back-compat
|
||||
public static final String BC_COLLECTION_PROP = "collection";
|
||||
public static final String BC_RATE_PROP = "rate";
|
||||
|
||||
|
||||
public static final String HOT_NODES = "hotNodes";
|
||||
public static final String HOT_COLLECTIONS = "hotCollections";
|
||||
public static final String HOT_SHARDS = "hotShards";
|
||||
public static final String HOT_REPLICAS = "hotReplicas";
|
||||
public static final String COLD_NODES = "coldNodes";
|
||||
public static final String COLD_COLLECTIONS = "coldCollections";
|
||||
public static final String COLD_SHARDS = "coldShards";
|
||||
public static final String COLD_REPLICAS = "coldReplicas";
|
||||
|
||||
public static final int DEFAULT_MAX_OPS = 3;
|
||||
public static final String DEFAULT_METRIC = "QUERY./select.requestTimes:1minRate";
|
||||
|
||||
private String metric;
|
||||
private int maxOps;
|
||||
private Integer minReplicas = null;
|
||||
private final Set<String> collections = new HashSet<>();
|
||||
private String shard;
|
||||
private String node;
|
||||
private double rate;
|
||||
private double aboveRate;
|
||||
private double belowRate;
|
||||
private CollectionParams.CollectionAction aboveOp, belowOp, aboveNodeOp, belowNodeOp;
|
||||
private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
|
||||
private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
|
||||
private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
|
||||
|
@ -66,27 +106,136 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
this.state.put("lastNodeEvent", lastNodeEvent);
|
||||
this.state.put("lastShardEvent", lastShardEvent);
|
||||
this.state.put("lastReplicaEvent", lastReplicaEvent);
|
||||
TriggerUtils.requiredProperties(requiredProperties, validProperties, "rate");
|
||||
TriggerUtils.validProperties(validProperties,
|
||||
COLLECTIONS_PROP, AutoScalingParams.SHARD, AutoScalingParams.NODE,
|
||||
METRIC_PROP,
|
||||
MAX_OPS_PROP,
|
||||
MIN_REPLICAS_PROP,
|
||||
ABOVE_OP_PROP,
|
||||
BELOW_OP_PROP,
|
||||
ABOVE_NODE_OP_PROP,
|
||||
BELOW_NODE_OP_PROP,
|
||||
ABOVE_RATE_PROP,
|
||||
BELOW_RATE_PROP,
|
||||
// back-compat props
|
||||
BC_COLLECTION_PROP,
|
||||
BC_RATE_PROP);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
|
||||
super.configure(loader, cloudManager, properties);
|
||||
// parse config options
|
||||
collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
|
||||
String collectionsStr = (String)properties.get(COLLECTIONS_PROP);
|
||||
if (collectionsStr != null) {
|
||||
collections.addAll(StrUtils.splitSmart(collectionsStr, ','));
|
||||
}
|
||||
// check back-compat collection prop
|
||||
collectionsStr = (String)properties.get(BC_COLLECTION_PROP);
|
||||
if (collectionsStr != null) {
|
||||
if (!collectionsStr.equals(Policy.ANY)) {
|
||||
collections.add(collectionsStr);
|
||||
}
|
||||
}
|
||||
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
|
||||
if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
|
||||
throw new TriggerValidationException("shard", "When 'shard' is other than #ANY then collection name must be also other than #ANY");
|
||||
if (!shard.equals(Policy.ANY) && (collections.isEmpty() || collections.size() > 1)) {
|
||||
throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then exactly one collection name must be set");
|
||||
}
|
||||
node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
|
||||
handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
|
||||
metric = (String)properties.getOrDefault(METRIC_PROP, DEFAULT_METRIC);
|
||||
|
||||
String rateString = String.valueOf(properties.get("rate"));
|
||||
String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS));
|
||||
try {
|
||||
rate = Double.parseDouble(rateString);
|
||||
maxOps = Integer.parseInt(maxOpsStr);
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(name, "rate", "Invalid 'rate' configuration value: '" + rateString + "': " + e.toString());
|
||||
throw new TriggerValidationException(name, MAX_OPS_PROP, "invalid value '" + maxOpsStr + "': " + e.toString());
|
||||
}
|
||||
|
||||
Object o = properties.get(MIN_REPLICAS_PROP);
|
||||
if (o != null) {
|
||||
try {
|
||||
minReplicas = Integer.parseInt(o.toString());
|
||||
if (minReplicas < 1) {
|
||||
throw new Exception("must be at least 1, or not set to use 'replicationFactor'");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(name, MIN_REPLICAS_PROP, "invalid value '" + o + "': " + e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
Object above = properties.get(ABOVE_RATE_PROP);
|
||||
Object below = properties.get(BELOW_RATE_PROP);
|
||||
// back-compat rate prop
|
||||
if (properties.containsKey(BC_RATE_PROP)) {
|
||||
above = properties.get(BC_RATE_PROP);
|
||||
}
|
||||
if (above == null && below == null) {
|
||||
throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" +
|
||||
ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set");
|
||||
}
|
||||
if (above != null) {
|
||||
try {
|
||||
aboveRate = Double.parseDouble(String.valueOf(above));
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(name, ABOVE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString());
|
||||
}
|
||||
} else {
|
||||
aboveRate = Double.MAX_VALUE;
|
||||
}
|
||||
if (below != null) {
|
||||
try {
|
||||
belowRate = Double.parseDouble(String.valueOf(below));
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(name, BELOW_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString());
|
||||
}
|
||||
} else {
|
||||
belowRate = -1;
|
||||
}
|
||||
|
||||
String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.ADDREPLICA.toLower()));
|
||||
String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.DELETEREPLICA.toLower()));
|
||||
aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
|
||||
if (aboveOp == null) {
|
||||
throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'");
|
||||
}
|
||||
belowOp = CollectionParams.CollectionAction.get(belowOpStr);
|
||||
if (belowOp == null) {
|
||||
throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value: '" + belowOpStr + "'");
|
||||
}
|
||||
Object aboveNodeObj = properties.getOrDefault(ABOVE_NODE_OP_PROP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
|
||||
// do NOT set the default to DELETENODE
|
||||
Object belowNodeObj = properties.get(BELOW_NODE_OP_PROP);
|
||||
try {
|
||||
aboveNodeOp = CollectionParams.CollectionAction.get(String.valueOf(aboveNodeObj));
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(getName(), ABOVE_NODE_OP_PROP, "unrecognized value: '" + aboveNodeObj + "'");
|
||||
}
|
||||
if (belowNodeObj != null) {
|
||||
try {
|
||||
belowNodeOp = CollectionParams.CollectionAction.get(String.valueOf(belowNodeObj));
|
||||
} catch (Exception e) {
|
||||
throw new TriggerValidationException(getName(), BELOW_NODE_OP_PROP, "unrecognized value: '" + belowNodeObj + "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<String, Object> getConfig() {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("name", name);
|
||||
config.put(COLLECTIONS_PROP, collections);
|
||||
config.put(AutoScalingParams.SHARD, shard);
|
||||
config.put(AutoScalingParams.NODE, node);
|
||||
config.put(METRIC_PROP, metric);
|
||||
config.put(MAX_OPS_PROP, maxOps);
|
||||
config.put(MIN_REPLICAS_PROP, minReplicas);
|
||||
config.put(ABOVE_RATE_PROP, aboveRate);
|
||||
config.put(BELOW_RATE_PROP, belowRate);
|
||||
config.put(ABOVE_OP_PROP, aboveOp);
|
||||
config.put(ABOVE_NODE_OP_PROP, aboveNodeOp);
|
||||
config.put(BELOW_OP_PROP, belowOp);
|
||||
config.put(BELOW_NODE_OP_PROP, belowNodeOp);
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -146,26 +295,42 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
return;
|
||||
}
|
||||
|
||||
// collection, shard, list(replica + rate)
|
||||
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
|
||||
// node, rate
|
||||
Map<String, AtomicDouble> nodeRates = new HashMap<>();
|
||||
Map<String, Integer> replicationFactors = new HashMap<>();
|
||||
// this replication factor only considers replica types that are searchable
|
||||
// collection, shard, RF
|
||||
Map<String, Map<String, AtomicInteger>> searchableReplicationFactors = new HashMap<>();
|
||||
|
||||
ClusterState clusterState = null;
|
||||
try {
|
||||
clusterState = cloudManager.getClusterStateProvider().getClusterState();
|
||||
} catch (IOException e) {
|
||||
log.warn("Error getting ClusterState", e);
|
||||
return;
|
||||
}
|
||||
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
|
||||
Map<String, ReplicaInfo> metricTags = new HashMap<>();
|
||||
// coll, shard, replica
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
|
||||
infos.forEach((coll, shards) -> {
|
||||
replicationFactors.computeIfAbsent(coll, c -> shards.size());
|
||||
Map<String, AtomicInteger> replPerShard = searchableReplicationFactors.computeIfAbsent(coll, c -> new HashMap<>());
|
||||
shards.forEach((sh, replicas) -> {
|
||||
AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger());
|
||||
replicas.forEach(replica -> {
|
||||
// skip non-active replicas
|
||||
if (replica.getState() != Replica.State.ACTIVE) {
|
||||
return;
|
||||
}
|
||||
repl.incrementAndGet();
|
||||
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
|
||||
String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore());
|
||||
if (replicaName == null) { // should never happen???
|
||||
replicaName = replica.getName(); // which is actually coreNode name...
|
||||
}
|
||||
String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
|
||||
String tag = "metrics:" + registry
|
||||
+ ":QUERY." + handler + ".requestTimes:1minRate";
|
||||
String tag = "metrics:" + registry + ":" + metric;
|
||||
metricTags.put(tag, replica);
|
||||
});
|
||||
});
|
||||
|
@ -191,48 +356,100 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
}
|
||||
|
||||
long now = cloudManager.getTimeSource().getTimeNs();
|
||||
Map<String, Double> hotNodes = new HashMap<>();
|
||||
Map<String, Double> coldNodes = new HashMap<>();
|
||||
// check for exceeded rates and filter out those with less than waitFor from previous events
|
||||
Map<String, Double> hotNodes = nodeRates.entrySet().stream()
|
||||
nodeRates.entrySet().stream()
|
||||
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
|
||||
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
|
||||
.filter(entry -> entry.getValue().get() > rate)
|
||||
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
|
||||
.forEach(entry -> {
|
||||
if (entry.getValue().get() > aboveRate) {
|
||||
if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
|
||||
hotNodes.put(entry.getKey(), entry.getValue().get());
|
||||
}
|
||||
} else if (entry.getValue().get() < belowRate) {
|
||||
if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
|
||||
coldNodes.put(entry.getKey(), entry.getValue().get());
|
||||
}
|
||||
} else {
|
||||
// no violation - clear waitForElapsed
|
||||
// (violation is only valid if it persists throughout waitFor)
|
||||
lastNodeEvent.remove(entry.getKey());
|
||||
}
|
||||
});
|
||||
|
||||
Map<String, Map<String, Double>> hotShards = new HashMap<>();
|
||||
Map<String, Map<String, Double>> coldShards = new HashMap<>();
|
||||
List<ReplicaInfo> hotReplicas = new ArrayList<>();
|
||||
List<ReplicaInfo> coldReplicas = new ArrayList<>();
|
||||
collectionRates.forEach((coll, shardRates) -> {
|
||||
shardRates.forEach((sh, replicaRates) -> {
|
||||
double shardRate = replicaRates.stream()
|
||||
.map(r -> {
|
||||
if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) &&
|
||||
((Double)r.getVariable(AutoScalingParams.RATE) > rate)) {
|
||||
hotReplicas.add(r);
|
||||
String elapsedKey = r.getCollection() + "." + r.getCore();
|
||||
if ((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate) {
|
||||
if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
|
||||
hotReplicas.add(r);
|
||||
}
|
||||
} else if ((Double)r.getVariable(AutoScalingParams.RATE) < belowRate) {
|
||||
if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
|
||||
coldReplicas.add(r);
|
||||
}
|
||||
} else {
|
||||
// no violation - clear waitForElapsed
|
||||
lastReplicaEvent.remove(elapsedKey);
|
||||
}
|
||||
return r;
|
||||
})
|
||||
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
|
||||
if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
|
||||
(shardRate > rate) &&
|
||||
(collection.equals(Policy.ANY) || collection.equals(coll)) &&
|
||||
String elapsedKey = coll + "." + sh;
|
||||
if ((collections.isEmpty() || collections.contains(coll)) &&
|
||||
(shard.equals(Policy.ANY) || shard.equals(sh))) {
|
||||
hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
|
||||
if (shardRate > aboveRate) {
|
||||
if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
|
||||
hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
|
||||
}
|
||||
} else if (shardRate < belowRate) {
|
||||
if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
|
||||
coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
|
||||
}
|
||||
} else {
|
||||
// no violation - clear waitForElapsed
|
||||
lastShardEvent.remove(elapsedKey);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Map<String, Double> hotCollections = new HashMap<>();
|
||||
Map<String, Double> coldCollections = new HashMap<>();
|
||||
collectionRates.forEach((coll, shardRates) -> {
|
||||
double total = shardRates.entrySet().stream()
|
||||
.mapToDouble(e -> e.getValue().stream()
|
||||
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
|
||||
if (waitForElapsed(coll, now, lastCollectionEvent) &&
|
||||
(total > rate) &&
|
||||
(collection.equals(Policy.ANY) || collection.equals(coll))) {
|
||||
hotCollections.put(coll, total);
|
||||
if (collections.isEmpty() || collections.contains(coll)) {
|
||||
if (total > aboveRate) {
|
||||
if (waitForElapsed(coll, now, lastCollectionEvent)) {
|
||||
hotCollections.put(coll, total);
|
||||
}
|
||||
} else if (total < belowRate) {
|
||||
if (waitForElapsed(coll, now, lastCollectionEvent)) {
|
||||
coldCollections.put(coll, total);
|
||||
}
|
||||
} else {
|
||||
// no violation - clear waitForElapsed
|
||||
lastCollectionEvent.remove(coll);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) {
|
||||
if (hotCollections.isEmpty() &&
|
||||
hotShards.isEmpty() &&
|
||||
hotReplicas.isEmpty() &&
|
||||
hotNodes.isEmpty() &&
|
||||
coldCollections.isEmpty() &&
|
||||
coldShards.isEmpty() &&
|
||||
coldReplicas.isEmpty() &&
|
||||
coldNodes.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -246,6 +463,12 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
coldCollections.forEach((c, r) -> {
|
||||
long time = lastCollectionEvent.get(c);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
hotShards.forEach((c, shards) -> {
|
||||
shards.forEach((s, r) -> {
|
||||
long time = lastShardEvent.get(c + "." + s);
|
||||
|
@ -254,27 +477,83 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
}
|
||||
});
|
||||
});
|
||||
coldShards.forEach((c, shards) -> {
|
||||
shards.forEach((s, r) -> {
|
||||
long time = lastShardEvent.get(c + "." + s);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
});
|
||||
hotReplicas.forEach(r -> {
|
||||
long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
coldReplicas.forEach(r -> {
|
||||
long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
hotNodes.forEach((n, r) -> {
|
||||
long time = lastNodeEvent.get(n);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
coldNodes.forEach((n, r) -> {
|
||||
long time = lastNodeEvent.get(n);
|
||||
if (eventTime.get() > time) {
|
||||
eventTime.set(time);
|
||||
}
|
||||
});
|
||||
|
||||
final List<TriggerEvent.Op> ops = new ArrayList<>();
|
||||
|
||||
calculateHotOps(ops, searchableReplicationFactors, hotNodes, hotCollections, hotShards, hotReplicas);
|
||||
calculateColdOps(ops, clusterState, searchableReplicationFactors, coldNodes, coldCollections, coldShards, coldReplicas);
|
||||
|
||||
if (ops.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops,
|
||||
hotNodes, hotCollections, hotShards, hotReplicas,
|
||||
coldNodes, coldCollections, coldShards, coldReplicas))) {
|
||||
// update lastEvent times
|
||||
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||
coldNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||
coldCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||
hotShards.entrySet().forEach(e -> e.getValue()
|
||||
.forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
|
||||
coldShards.entrySet().forEach(e -> e.getValue()
|
||||
.forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
|
||||
hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
|
||||
coldReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
|
||||
}
|
||||
}
|
||||
|
||||
private void calculateHotOps(List<TriggerEvent.Op> ops,
|
||||
Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
|
||||
Map<String, Double> hotNodes,
|
||||
Map<String, Double> hotCollections,
|
||||
Map<String, Map<String, Double>> hotShards,
|
||||
List<ReplicaInfo> hotReplicas) {
|
||||
// calculate the number of replicas to add to each hot shard, based on how much the rate was
|
||||
// exceeded - but within limits.
|
||||
final List<TriggerEvent.Op> ops = new ArrayList<>();
|
||||
if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
|
||||
|
||||
// first resolve a situation when only a node is hot but no collection / shard / replica is hot
|
||||
// TODO: eventually we may want to commission a new node
|
||||
if (!hotNodes.isEmpty() && hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
|
||||
// move replicas around
|
||||
hotNodes.forEach((n, r) -> {
|
||||
ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.MOVEREPLICA, Suggester.Hint.SRC_NODE, n));
|
||||
});
|
||||
if (aboveNodeOp != null) {
|
||||
hotNodes.forEach((n, r) -> {
|
||||
ops.add(new TriggerEvent.Op(aboveNodeOp, Suggester.Hint.SRC_NODE, n));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// add replicas
|
||||
Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>();
|
||||
|
@ -283,7 +562,7 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
List<Pair<String, String>> perShard = hints
|
||||
.computeIfAbsent(coll, c -> new HashMap<>())
|
||||
.computeIfAbsent(s, sh -> new ArrayList<>());
|
||||
addHints(coll, s, r, replicationFactors.get(coll), perShard);
|
||||
addReplicaHints(coll, s, r, searchableReplicationFactors.get(coll).get(s).get(), perShard);
|
||||
}));
|
||||
hotReplicas.forEach(ri -> {
|
||||
double r = (Double)ri.getVariable(AutoScalingParams.RATE);
|
||||
|
@ -292,38 +571,120 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), sh -> new ArrayList<>());
|
||||
if (perShard.isEmpty()) {
|
||||
addHints(ri.getCollection(), ri.getShard(), r, replicationFactors.get(ri.getCollection()), perShard);
|
||||
addReplicaHints(ri.getCollection(), ri.getShard(), r, searchableReplicationFactors.get(ri.getCollection()).get(ri.getShard()).get(), perShard);
|
||||
}
|
||||
});
|
||||
|
||||
hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
|
||||
ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.ADDREPLICA, Suggester.Hint.COLL_SHARD, p));
|
||||
ops.add(new TriggerEvent.Op(aboveOp, Suggester.Hint.COLL_SHARD, p));
|
||||
})));
|
||||
}
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, hotNodes, hotCollections, hotShards, hotReplicas))) {
|
||||
// update lastEvent times
|
||||
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||
hotShards.entrySet().forEach(e -> e.getValue()
|
||||
.forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
|
||||
hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
|
||||
}
|
||||
}
|
||||
|
||||
private void addHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
|
||||
int numReplicas = (int)Math.round((r - rate) / (double) replicationFactor);
|
||||
/**
|
||||
* This method implements a primitive form of proportional controller with a limiter.
|
||||
*/
|
||||
private void addReplicaHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
|
||||
int numReplicas = (int)Math.round((r - aboveRate) / (double) replicationFactor);
|
||||
// in one event add at least 1 replica
|
||||
if (numReplicas < 1) {
|
||||
numReplicas = 1;
|
||||
}
|
||||
if (numReplicas > 3) {
|
||||
numReplicas = 3;
|
||||
// ... and at most maxOps replicas
|
||||
if (numReplicas > maxOps) {
|
||||
numReplicas = maxOps;
|
||||
}
|
||||
for (int i = 0; i < numReplicas; i++) {
|
||||
hints.add(new Pair(collection, shard));
|
||||
}
|
||||
}
|
||||
|
||||
private void calculateColdOps(List<TriggerEvent.Op> ops,
|
||||
ClusterState clusterState,
|
||||
Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
|
||||
Map<String, Double> coldNodes,
|
||||
Map<String, Double> coldCollections,
|
||||
Map<String, Map<String, Double>> coldShards,
|
||||
List<ReplicaInfo> coldReplicas) {
|
||||
// COLD COLLECTIONS
|
||||
// Probably can't do anything reasonable about whole cold collections
|
||||
// because they may be needed even if not used.
|
||||
|
||||
// COLD SHARDS:
|
||||
// Cold shards mean that there are too many replicas per shard - but it also
|
||||
// means that all replicas in these shards are cold too, so we can simply
|
||||
// address this by deleting cold replicas
|
||||
|
||||
// COLD REPLICAS:
|
||||
// Remove cold replicas but only when there's at least a minimum number of searchable
|
||||
// replicas still available (additional non-searchable replicas may exist, too)
|
||||
// NOTE: do this before adding ops for DELETENODE because we don't want to attempt
|
||||
// deleting replicas that have been already moved elsewhere
|
||||
Map<String, Map<String, List<ReplicaInfo>>> byCollectionByShard = new HashMap<>();
|
||||
coldReplicas.forEach(ri -> {
|
||||
byCollectionByShard.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
|
||||
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
|
||||
.add(ri);
|
||||
});
|
||||
byCollectionByShard.forEach((coll, shards) -> {
|
||||
shards.forEach((shard, replicas) -> {
|
||||
// only delete if there's at least minRF searchable replicas left
|
||||
int rf = searchableReplicationFactors.get(coll).get(shard).get();
|
||||
// we only really need a leader and we may be allowed to remove other replicas
|
||||
int minRF = 1;
|
||||
// but check the official RF and don't go below that
|
||||
Integer RF = clusterState.getCollection(coll).getReplicationFactor();
|
||||
if (RF != null) {
|
||||
minRF = RF;
|
||||
}
|
||||
// unless minReplicas is set explicitly
|
||||
if (minReplicas != null) {
|
||||
minRF = minReplicas;
|
||||
}
|
||||
if (minRF < 1) {
|
||||
minRF = 1;
|
||||
}
|
||||
if (rf > minRF) {
|
||||
// delete at most maxOps replicas at a time
|
||||
AtomicInteger limit = new AtomicInteger(Math.min(maxOps, rf - minRF));
|
||||
replicas.forEach(ri -> {
|
||||
if (limit.get() == 0) {
|
||||
return;
|
||||
}
|
||||
// don't delete a leader
|
||||
if (ri.getBool(ZkStateReader.LEADER_PROP, false)) {
|
||||
return;
|
||||
}
|
||||
TriggerEvent.Op op = new TriggerEvent.Op(belowOp,
|
||||
Suggester.Hint.COLL_SHARD, new Pair<>(ri.getCollection(), ri.getShard()));
|
||||
op.addHint(Suggester.Hint.REPLICA, ri.getName());
|
||||
ops.add(op);
|
||||
limit.decrementAndGet();
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// COLD NODES:
|
||||
// Unlike the case of hot nodes, if a node is cold then any monitored
|
||||
// collections / shards / replicas located on that node are cold, too.
|
||||
// HOWEVER, we check only non-pull replicas and only from selected collections / shards,
|
||||
// so deleting a cold node is dangerous because it may interfere with these
|
||||
// non-monitored resources - this is the reason the default belowNodeOp is null / ignored.
|
||||
//
|
||||
// Also, note that due to the way activity is measured only nodes that contain any
|
||||
// monitored resources are considered - there may be cold nodes in the cluster that don't
|
||||
// belong to the monitored collections and they will be ignored.
|
||||
if (belowNodeOp != null) {
|
||||
coldNodes.forEach((node, rate) -> {
|
||||
ops.add(new TriggerEvent.Op(belowNodeOp, Suggester.Hint.SRC_NODE, node));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
|
||||
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
|
||||
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
|
||||
|
@ -335,15 +696,25 @@ public class SearchRateTrigger extends TriggerBase {
|
|||
}
|
||||
|
||||
public static class SearchRateEvent extends TriggerEvent {
|
||||
public SearchRateEvent(String source, long eventTime, List<Op> ops, Map<String, Double> hotNodes,
|
||||
public SearchRateEvent(String source, long eventTime, List<Op> ops,
|
||||
Map<String, Double> hotNodes,
|
||||
Map<String, Double> hotCollections,
|
||||
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
|
||||
Map<String, Map<String, Double>> hotShards,
|
||||
List<ReplicaInfo> hotReplicas,
|
||||
Map<String, Double> coldNodes,
|
||||
Map<String, Double> coldCollections,
|
||||
Map<String, Map<String, Double>> coldShards,
|
||||
List<ReplicaInfo> coldReplicas) {
|
||||
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
|
||||
properties.put(TriggerEvent.REQUESTED_OPS, ops);
|
||||
properties.put(AutoScalingParams.COLLECTION, hotCollections);
|
||||
properties.put(AutoScalingParams.SHARD, hotShards);
|
||||
properties.put(AutoScalingParams.REPLICA, hotReplicas);
|
||||
properties.put(AutoScalingParams.NODE, hotNodes);
|
||||
properties.put(HOT_NODES, hotNodes);
|
||||
properties.put(HOT_COLLECTIONS, hotCollections);
|
||||
properties.put(HOT_SHARDS, hotShards);
|
||||
properties.put(HOT_REPLICAS, hotReplicas);
|
||||
properties.put(COLD_NODES, coldNodes);
|
||||
properties.put(COLD_COLLECTIONS, coldCollections);
|
||||
properties.put(COLD_SHARDS, coldShards);
|
||||
properties.put(COLD_REPLICAS, coldReplicas);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -88,9 +88,11 @@ public class CloudTestUtils {
|
|||
final CollectionStatePredicate predicate) throws InterruptedException, TimeoutException, IOException {
|
||||
TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource());
|
||||
long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
|
||||
ClusterState state = null;
|
||||
DocCollection coll = null;
|
||||
while (!timeout.hasTimedOut()) {
|
||||
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
|
||||
DocCollection coll = state.getCollectionOrNull(collection);
|
||||
state = cloudManager.getClusterStateProvider().getClusterState();
|
||||
coll = state.getCollectionOrNull(collection);
|
||||
// due to the way we manage collections in SimClusterStateProvider a null here
|
||||
// can mean that a collection is still being created but has no replicas
|
||||
if (coll == null) { // does not yet exist?
|
||||
|
@ -106,7 +108,7 @@ public class CloudTestUtils {
|
|||
log.trace("-- still not matching predicate: {}", state);
|
||||
}
|
||||
}
|
||||
throw new TimeoutException();
|
||||
throw new TimeoutException("last state: " + coll);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -475,7 +475,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
|
|||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '10m'," +
|
||||
"'enabled' : true," +
|
||||
"'rate': 'foo'," +
|
||||
"'aboveRate': 'foo'," +
|
||||
"'actions' : [" +
|
||||
"{" +
|
||||
"'name' : 'compute_plan'," +
|
||||
|
@ -489,7 +489,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
|
|||
} catch (HttpSolrClient.RemoteSolrException e) {
|
||||
// expected
|
||||
assertTrue(String.valueOf(getObjectByPath(((HttpSolrClient.RemoteExecutionException) e).getMetaData(),
|
||||
false, "error/details[0]/errorMessages[0]")).contains("rate=Invalid 'rate' configuration value: 'foo'"));
|
||||
false, "error/details[0]/errorMessages[0]")).contains("aboveRate=Invalid configuration value: 'foo'"));
|
||||
}
|
||||
|
||||
// unknown trigger action properties
|
||||
|
|
|
@ -22,10 +22,9 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
@ -37,12 +36,20 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.CloudTestUtils;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.util.LogLevel;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -51,20 +58,23 @@ import org.slf4j.LoggerFactory;
|
|||
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
|
||||
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
|
||||
|
||||
/**
|
||||
* Integration test for {@link SearchRateTrigger}
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
|
||||
@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
|
||||
@LuceneTestCase.Slow
|
||||
public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
|
||||
private static CountDownLatch listenerCreated = new CountDownLatch(1);
|
||||
private static int waitForSeconds = 1;
|
||||
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
|
||||
private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
|
||||
private static CountDownLatch finished = new CountDownLatch(1);
|
||||
private static CountDownLatch started = new CountDownLatch(1);
|
||||
private static SolrCloudManager cloudManager;
|
||||
|
||||
private int waitForSeconds;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
|
@ -79,74 +89,156 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
|||
SolrClient solrClient = cluster.getSolrClient();
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
// clear any persisted auto scaling configuration
|
||||
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
|
||||
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
|
||||
timeSource.sleep(5000);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
|
||||
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
|
||||
finished = new CountDownLatch(1);
|
||||
started = new CountDownLatch(1);
|
||||
listenerEvents = new HashMap<>();
|
||||
waitForSeconds = 3 + random().nextInt(5);
|
||||
}
|
||||
|
||||
private void deleteChildrenRecursively(String path) throws Exception {
|
||||
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSearchRate() throws Exception {
|
||||
public void testAboveSearchRate() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String COLL1 = "collection1";
|
||||
String COLL1 = "aboveRate_collection";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 1, 2);
|
||||
create.process(solrClient);
|
||||
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 2));
|
||||
|
||||
// the trigger is initially disabled so that we have the time to set up listeners
|
||||
// and generate the traffic
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger'," +
|
||||
"'name' : 'search_rate_trigger1'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'rate' : 1.0," +
|
||||
"'enabled' : false," +
|
||||
"'collections' : '" + COLL1 + "'," +
|
||||
"'aboveRate' : 1.0," +
|
||||
"'belowRate' : 0.1," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
|
||||
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand1 = "{" +
|
||||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'started'," +
|
||||
"'trigger' : 'search_rate_trigger1'," +
|
||||
"'stage' : ['STARTED']," +
|
||||
"'class' : '" + StartedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'srt'," +
|
||||
"'trigger' : 'search_rate_trigger'," +
|
||||
"'trigger' : 'search_rate_trigger1'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'afterAction': ['compute', 'execute', 'test']," +
|
||||
"'class' : '" + TestTriggerListener.class.getName() + "'" +
|
||||
"'afterAction': ['compute', 'execute']," +
|
||||
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'finished'," +
|
||||
"'trigger' : 'search_rate_trigger1'," +
|
||||
"'stage' : ['SUCCEEDED']," +
|
||||
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
SolrParams query = params(CommonParams.Q, "*:*");
|
||||
for (int i = 0; i < 500; i++) {
|
||||
solrClient.query(COLL1, query);
|
||||
}
|
||||
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
|
||||
|
||||
// enable the trigger
|
||||
String resumeTriggerCommand = "{" +
|
||||
"'resume-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger1'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
boolean await = started.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
Thread.sleep(5000);
|
||||
|
||||
await = finished.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not finish processing", await);
|
||||
|
||||
// suspend the trigger
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger1'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(5000);
|
||||
|
||||
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||
assertEquals(listenerEvents.toString(), 4, events.size());
|
||||
assertEquals(listenerEvents.toString(), 3, events.size());
|
||||
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
|
||||
assertEquals("compute", events.get(0).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
|
||||
assertEquals("execute", events.get(1).actionName);
|
||||
assertEquals("AFTER_ACTION", events.get(2).stage.toString());
|
||||
assertEquals("test", events.get(2).actionName);
|
||||
assertEquals("SUCCEEDED", events.get(3).stage.toString());
|
||||
assertNull(events.get(3).actionName);
|
||||
assertEquals("SUCCEEDED", events.get(2).stage.toString());
|
||||
assertNull(events.get(2).actionName);
|
||||
|
||||
CapturedEvent ev = events.get(0);
|
||||
long now = timeSource.getTimeNs();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
|
||||
Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
|
||||
assertNotNull("nodeRates", nodeRates);
|
||||
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
|
||||
AtomicDouble totalNodeRate = new AtomicDouble();
|
||||
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
|
||||
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
|
||||
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
|
||||
assertNotNull("replicaRates", replicaRates);
|
||||
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
|
||||
AtomicDouble totalReplicaRate = new AtomicDouble();
|
||||
|
@ -154,7 +246,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
|||
assertTrue(r.toString(), r.getVariable("rate") != null);
|
||||
totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
|
||||
});
|
||||
Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
|
||||
Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
|
||||
assertNotNull("shardRates", shardRates);
|
||||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
shardRates = (Map<String, Object>) shardRates.get(COLL1);
|
||||
|
@ -162,7 +254,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
|||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
AtomicDouble totalShardRate = new AtomicDouble();
|
||||
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
|
||||
Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
|
||||
Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
|
||||
assertNotNull("collectionRates", collectionRates);
|
||||
assertEquals(collectionRates.toString(), 1, collectionRates.size());
|
||||
Double collectionRate = collectionRates.get(COLL1);
|
||||
|
@ -181,27 +273,414 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TestSearchRateAction extends TriggerActionBase {
|
||||
@Test
|
||||
public void testBelowSearchRate() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String COLL1 = "belowRate_collection";
|
||||
// replicationFactor == 2
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 1, 2);
|
||||
create.process(solrClient);
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 2));
|
||||
|
||||
@Override
|
||||
public void process(TriggerEvent event, ActionContext context) throws Exception {
|
||||
try {
|
||||
events.add(event);
|
||||
long currentTimeNanos = timeSource.getTimeNs();
|
||||
long eventTimeNanos = event.getEventTime();
|
||||
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
|
||||
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
|
||||
fail(event.source + " was fired before the configured waitFor period");
|
||||
}
|
||||
triggerFiredLatch.countDown();
|
||||
} catch (Throwable t) {
|
||||
log.debug("--throwable", t);
|
||||
throw t;
|
||||
// add a couple of spare replicas above RF. Use different types.
|
||||
// these additional replicas will be placed on other nodes in the cluster
|
||||
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
|
||||
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
|
||||
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.PULL));
|
||||
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 5));
|
||||
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger2'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : false," +
|
||||
"'collections' : '" + COLL1 + "'," +
|
||||
"'aboveRate' : 1.0," +
|
||||
"'belowRate' : 0.1," +
|
||||
// do nothing but generate an op
|
||||
"'belowNodeOp' : 'none'," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'started'," +
|
||||
"'trigger' : 'search_rate_trigger2'," +
|
||||
"'stage' : ['STARTED']," +
|
||||
"'class' : '" + StartedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'srt'," +
|
||||
"'trigger' : 'search_rate_trigger2'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'afterAction': ['compute', 'execute']," +
|
||||
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'finished'," +
|
||||
"'trigger' : 'search_rate_trigger2'," +
|
||||
"'stage' : ['SUCCEEDED']," +
|
||||
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
// enable the trigger
|
||||
String resumeTriggerCommand = "{" +
|
||||
"'resume-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger2'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
boolean await = started.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
await = finished.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not finish processing", await);
|
||||
|
||||
// suspend the trigger
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger2'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(5000);
|
||||
|
||||
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||
assertEquals(events.toString(), 3, events.size());
|
||||
CapturedEvent ev = events.get(0);
|
||||
assertEquals(ev.toString(), "compute", ev.actionName);
|
||||
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
|
||||
// 4 cold nodes, 3 cold replicas
|
||||
assertEquals(ops.toString(), 7, ops.size());
|
||||
AtomicInteger coldNodes = new AtomicInteger();
|
||||
AtomicInteger coldReplicas = new AtomicInteger();
|
||||
ops.forEach(op -> {
|
||||
if (op.getAction().equals(CollectionParams.CollectionAction.NONE)) {
|
||||
coldNodes.incrementAndGet();
|
||||
} else if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
|
||||
coldReplicas.incrementAndGet();
|
||||
} else {
|
||||
fail("unexpected op: " + op);
|
||||
}
|
||||
}
|
||||
});
|
||||
assertEquals("cold nodes", 4, coldNodes.get());
|
||||
assertEquals("cold replicas", 3, coldReplicas.get());
|
||||
|
||||
// now the collection should be down to RF = 2
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 2));
|
||||
|
||||
listenerEvents.clear();
|
||||
finished = new CountDownLatch(1);
|
||||
started = new CountDownLatch(1);
|
||||
|
||||
// resume trigger
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
// there should be only coldNode ops now, and no coldReplica ops since searchable RF == collection RF
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
await = started.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
await = finished.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not finish processing", await);
|
||||
|
||||
// suspend trigger
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(5000);
|
||||
|
||||
events = listenerEvents.get("srt");
|
||||
assertEquals(events.toString(), 3, events.size());
|
||||
|
||||
ev = events.get(0);
|
||||
assertEquals(ev.toString(), "compute", ev.actionName);
|
||||
ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
|
||||
assertEquals(ops.toString(), 1, ops.size());
|
||||
assertEquals(ops.toString(), CollectionParams.CollectionAction.NONE, ops.get(0).getAction());
|
||||
|
||||
listenerEvents.clear();
|
||||
finished = new CountDownLatch(1);
|
||||
started = new CountDownLatch(1);
|
||||
|
||||
// now allow single replicas
|
||||
setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger2'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'collections' : '" + COLL1 + "'," +
|
||||
"'aboveRate' : 1.0," +
|
||||
"'belowRate' : 0.1," +
|
||||
"'minReplicas' : 1," +
|
||||
"'belowNodeOp' : 'none'," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
await = started.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
await = finished.await(60, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not finish processing", await);
|
||||
|
||||
// suspend trigger
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(5000);
|
||||
|
||||
events = listenerEvents.get("srt");
|
||||
assertEquals(events.toString(), 3, events.size());
|
||||
|
||||
ev = events.get(0);
|
||||
assertEquals(ev.toString(), "compute", ev.actionName);
|
||||
ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
|
||||
assertEquals(ops.toString(), 2, ops.size());
|
||||
AtomicInteger coldNodes2 = new AtomicInteger();
|
||||
AtomicInteger coldReplicas2 = new AtomicInteger();
|
||||
ops.forEach(op -> {
|
||||
if (op.getAction().equals(CollectionParams.CollectionAction.NONE)) {
|
||||
coldNodes2.incrementAndGet();
|
||||
} else if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
|
||||
coldReplicas2.incrementAndGet();
|
||||
} else {
|
||||
fail("unexpected op: " + op);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals("coldNodes", 1, coldNodes2.get());
|
||||
assertEquals("colReplicas", 1, coldReplicas2.get());
|
||||
|
||||
// now the collection should be at RF == 1, with one additional PULL replica
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 1));
|
||||
}
|
||||
|
||||
public static class TestTriggerListener extends TriggerListenerBase {
|
||||
@Test
|
||||
public void testDeleteNode() throws Exception {
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
String COLL1 = "deleteNode_collection";
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 1, 2);
|
||||
|
||||
create.process(solrClient);
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 2));
|
||||
|
||||
// add a couple of spare replicas above RF. Use different types to verify that only
|
||||
// searchable replicas are considered
|
||||
// these additional replicas will be placed on other nodes in the cluster
|
||||
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
|
||||
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
|
||||
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.PULL));
|
||||
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 5));
|
||||
|
||||
String setTriggerCommand = "{" +
|
||||
"'set-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger3'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : false," +
|
||||
"'collections' : '" + COLL1 + "'," +
|
||||
"'aboveRate' : 1.0," +
|
||||
"'belowRate' : 0.1," +
|
||||
// allow deleting all spare replicas
|
||||
"'minReplicas' : 1," +
|
||||
// allow requesting all deletions in one event
|
||||
"'maxOps' : 10," +
|
||||
// delete underutilised nodes
|
||||
"'belowNodeOp' : 'DELETENODE'," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
||||
"]" +
|
||||
"}}";
|
||||
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
|
||||
NamedList<Object> response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'started'," +
|
||||
"'trigger' : 'search_rate_trigger3'," +
|
||||
"'stage' : ['STARTED']," +
|
||||
"'class' : '" + StartedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'srt'," +
|
||||
"'trigger' : 'search_rate_trigger3'," +
|
||||
"'stage' : ['FAILED','SUCCEEDED']," +
|
||||
"'afterAction': ['compute', 'execute']," +
|
||||
"'class' : '" + CapturingTriggerListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'finished'," +
|
||||
"'trigger' : 'search_rate_trigger3'," +
|
||||
"'stage' : ['SUCCEEDED']," +
|
||||
"'class' : '" + FinishedProcessingListener.class.getName() + "'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
// enable the trigger
|
||||
String resumeTriggerCommand = "{" +
|
||||
"'resume-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger3'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
|
||||
|
||||
boolean await = started.await(20, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
await = finished.await(90, TimeUnit.SECONDS);
|
||||
assertTrue("The trigger did not finish processing", await);
|
||||
|
||||
// suspend the trigger
|
||||
String suspendTriggerCommand = "{" +
|
||||
"'suspend-trigger' : {" +
|
||||
"'name' : 'search_rate_trigger3'" +
|
||||
"}" +
|
||||
"}";
|
||||
req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
|
||||
response = solrClient.request(req);
|
||||
assertEquals(response.get("result").toString(), "success");
|
||||
|
||||
timeSource.sleep(5000);
|
||||
|
||||
List<CapturedEvent> events = listenerEvents.get("srt");
|
||||
assertEquals(events.toString(), 3, events.size());
|
||||
|
||||
CapturedEvent ev = events.get(0);
|
||||
assertEquals(ev.toString(), "compute", ev.actionName);
|
||||
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
|
||||
// 4 DELETEREPLICA, 4 DELETENODE
|
||||
assertEquals(ops.toString(), 8, ops.size());
|
||||
AtomicInteger replicas = new AtomicInteger();
|
||||
AtomicInteger nodes = new AtomicInteger();
|
||||
ops.forEach(op -> {
|
||||
if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
|
||||
replicas.incrementAndGet();
|
||||
} else if (op.getAction().equals(CollectionParams.CollectionAction.DELETENODE)) {
|
||||
nodes.incrementAndGet();
|
||||
} else {
|
||||
fail("unexpected op: " + op);
|
||||
}
|
||||
});
|
||||
assertEquals(ops.toString(), 4, replicas.get());
|
||||
assertEquals(ops.toString(), 4, nodes.get());
|
||||
// check status
|
||||
ev = events.get(1);
|
||||
assertEquals(ev.toString(), "execute", ev.actionName);
|
||||
List<NamedList<Object>> responses = (List<NamedList<Object>>)ev.context.get("properties.responses");
|
||||
assertNotNull(ev.toString(), responses);
|
||||
assertEquals(responses.toString(), 8, responses.size());
|
||||
replicas.set(0);
|
||||
nodes.set(0);
|
||||
responses.forEach(m -> {
|
||||
if (m.get("success") != null) {
|
||||
replicas.incrementAndGet();
|
||||
} else if (m.get("status") != null) {
|
||||
NamedList<Object> status = (NamedList<Object>)m.get("status");
|
||||
if ("completed".equals(status.get("state"))) {
|
||||
nodes.incrementAndGet();
|
||||
} else {
|
||||
fail("unexpected DELETENODE status: " + m);
|
||||
}
|
||||
} else {
|
||||
fail("unexpected status: " + m);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(responses.toString(), 4, replicas.get());
|
||||
assertEquals(responses.toString(), 4, nodes.get());
|
||||
|
||||
// we are left with one searchable replica
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 1));
|
||||
}
|
||||
|
||||
public static class CapturingTriggerListener extends TriggerListenerBase {
|
||||
@Override
|
||||
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
|
||||
super.configure(loader, cloudManager, config);
|
||||
|
@ -212,7 +691,26 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
|||
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
|
||||
ActionContext context, Throwable error, String message) {
|
||||
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
|
||||
lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
|
||||
CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
|
||||
log.info("=======> " + ev);
|
||||
lst.add(ev);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StartedProcessingListener extends TriggerListenerBase {
|
||||
|
||||
@Override
|
||||
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
|
||||
started.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public static class FinishedProcessingListener extends TriggerListenerBase {
|
||||
|
||||
@Override
|
||||
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
|
||||
finished.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,25 +18,37 @@ package org.apache.solr.cloud.autoscaling;
|
|||
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
|
||||
import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.CloudTestUtils;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.cloud.ZkDistributedQueueFactory;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -59,6 +71,23 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
|
|||
configureCluster(4)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void removeCollections() throws Exception {
|
||||
cluster.deleteAllCollections();
|
||||
if (cluster.getJettySolrRunners().size() < 4) {
|
||||
cluster.startJettySolrRunner();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrigger() throws Exception {
|
||||
SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
|
||||
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
|
||||
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
|
||||
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 2, 2);
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
@ -68,20 +97,15 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
|
|||
"conf", 2, 2);
|
||||
create.setMaxShardsPerNode(1);
|
||||
create.process(solrClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrigger() throws Exception {
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
|
||||
CloudTestUtils.waitForState(cloudManager, COLL2, 60, TimeUnit.SECONDS, clusterShape(2, 2));
|
||||
|
||||
double rate = 1.0;
|
||||
SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
|
||||
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
|
||||
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
|
||||
URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
|
||||
long waitForSeconds = 5 + random().nextInt(5);
|
||||
Map<String, Object> props = createTriggerProps(waitForSeconds, rate);
|
||||
Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, rate, -1);
|
||||
final List<TriggerEvent> events = new ArrayList<>();
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
||||
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger")) {
|
||||
trigger.configure(loader, cloudManager, props);
|
||||
|
@ -95,19 +119,21 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
|
|||
String url = baseUrl.toString() + "/" + coreName;
|
||||
try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
|
||||
SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
|
||||
for (int i = 0; i < 200; i++) {
|
||||
for (int i = 0; i < 500; i++) {
|
||||
simpleClient.query(query);
|
||||
}
|
||||
trigger.run();
|
||||
// waitFor delay
|
||||
assertEquals(0, events.size());
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
Thread.sleep(waitForSeconds * 1000);
|
||||
trigger.run();
|
||||
Thread.sleep(waitForSeconds * 1000);
|
||||
// should generate replica event
|
||||
trigger.run();
|
||||
assertEquals(1, events.size());
|
||||
TriggerEvent event = events.get(0);
|
||||
assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
|
||||
List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
|
||||
List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(SearchRateTrigger.HOT_REPLICAS);
|
||||
assertEquals(1, infos.size());
|
||||
ReplicaInfo info = infos.get(0);
|
||||
assertEquals(coreName, info.getCore());
|
||||
|
@ -120,12 +146,12 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
|
|||
for (int i = 0; i < 500; i++) {
|
||||
solrClient.query(COLL1, query);
|
||||
}
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
Thread.sleep(waitForSeconds * 1000);
|
||||
trigger.run();
|
||||
// should generate collection event
|
||||
assertEquals(1, events.size());
|
||||
TriggerEvent event = events.get(0);
|
||||
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
|
||||
assertEquals(1, hotCollections.size());
|
||||
Double Rate = hotCollections.get(COLL1);
|
||||
assertNotNull(Rate);
|
||||
|
@ -134,21 +160,20 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
|
|||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
solrClient.query(COLL2, query);
|
||||
solrClient.query(COLL1, query);
|
||||
}
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
Thread.sleep(waitForSeconds * 1000);
|
||||
trigger.run();
|
||||
// should generate node and collection event but not for COLL2 because of waitFor
|
||||
assertEquals(1, events.size());
|
||||
event = events.get(0);
|
||||
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
|
||||
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
|
||||
assertEquals(3, hotNodes.size());
|
||||
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
|
||||
hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
assertEquals(2, hotCollections.size());
|
||||
hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
|
||||
assertEquals(1, hotCollections.size());
|
||||
Rate = hotCollections.get(COLL1);
|
||||
assertNotNull(Rate);
|
||||
Rate = hotCollections.get(COLL2);
|
||||
assertNotNull(Rate);
|
||||
|
||||
events.clear();
|
||||
// assert that waitFor prevents new events from being generated
|
||||
|
@ -156,28 +181,154 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
|
|||
// should not generate any events
|
||||
assertEquals(0, events.size());
|
||||
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
Thread.sleep(waitForSeconds * 1000 * 2);
|
||||
trigger.run();
|
||||
// should generate node and collection event
|
||||
assertEquals(1, events.size());
|
||||
hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
event = events.get(0);
|
||||
hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
|
||||
assertEquals(2, hotCollections.size());
|
||||
Rate = hotCollections.get(COLL1);
|
||||
assertNotNull(Rate);
|
||||
Rate = hotCollections.get(COLL2);
|
||||
assertNotNull(Rate);
|
||||
hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
|
||||
hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
|
||||
assertEquals(3, hotNodes.size());
|
||||
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> createTriggerProps(long waitForSeconds, double rate) {
|
||||
private static final AtomicDouble mockRate = new AtomicDouble();
|
||||
|
||||
@Test
|
||||
public void testWaitForElapsed() throws Exception {
|
||||
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
SolrZkClient zkClient = solrClient.getZkStateReader().getZkClient();
|
||||
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), solrClient) {
|
||||
@Override
|
||||
public NodeStateProvider getNodeStateProvider() {
|
||||
return new SolrClientNodeStateProvider(solrClient) {
|
||||
@Override
|
||||
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
|
||||
Map<String, Object> values = super.getNodeValues(node, tags);
|
||||
values.keySet().forEach(k -> {
|
||||
values.replace(k, mockRate.get());
|
||||
});
|
||||
return values;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
TimeSource timeSource = cloudManager.getTimeSource();
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 2, 2);
|
||||
create.setMaxShardsPerNode(1);
|
||||
create.process(solrClient);
|
||||
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
|
||||
|
||||
long waitForSeconds = 5 + random().nextInt(5);
|
||||
Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, 1.0, 0.1);
|
||||
final List<TriggerEvent> events = new ArrayList<>();
|
||||
|
||||
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger1")) {
|
||||
trigger.configure(loader, cloudManager, props);
|
||||
trigger.init();
|
||||
trigger.setProcessor(noFirstRunProcessor);
|
||||
trigger.run();
|
||||
trigger.setProcessor(event -> events.add(event));
|
||||
|
||||
// set mock rates
|
||||
mockRate.set(2.0);
|
||||
TimeOut timeOut = new TimeOut(waitForSeconds + 2, TimeUnit.SECONDS, timeSource);
|
||||
// simulate ScheduledTriggers
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
trigger.run();
|
||||
timeSource.sleep(1000);
|
||||
}
|
||||
// violation persisted longer than waitFor - there should be events
|
||||
assertTrue(events.toString(), events.size() > 0);
|
||||
TriggerEvent event = events.get(0);
|
||||
assertEquals(event.toString(), TriggerEventType.SEARCHRATE, event.eventType);
|
||||
Map<String, Object> hotNodes, hotCollections, hotShards;
|
||||
List<ReplicaInfo> hotReplicas;
|
||||
hotNodes = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_NODES);
|
||||
hotCollections = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_COLLECTIONS);
|
||||
hotShards = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_SHARDS);
|
||||
hotReplicas = (List<ReplicaInfo>)event.properties.get(SearchRateTrigger.HOT_REPLICAS);
|
||||
assertFalse("no hot nodes?", hotNodes.isEmpty());
|
||||
assertFalse("no hot collections?", hotCollections.isEmpty());
|
||||
assertFalse("no hot shards?", hotShards.isEmpty());
|
||||
assertFalse("no hot replicas?", hotReplicas.isEmpty());
|
||||
}
|
||||
|
||||
mockRate.set(0.0);
|
||||
events.clear();
|
||||
|
||||
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
|
||||
trigger.configure(loader, cloudManager, props);
|
||||
trigger.init();
|
||||
trigger.setProcessor(noFirstRunProcessor);
|
||||
trigger.run();
|
||||
trigger.setProcessor(event -> events.add(event));
|
||||
|
||||
mockRate.set(2.0);
|
||||
trigger.run();
|
||||
// waitFor not elapsed
|
||||
assertTrue(events.toString(), events.isEmpty());
|
||||
Thread.sleep(1000);
|
||||
trigger.run();
|
||||
assertTrue(events.toString(), events.isEmpty());
|
||||
Thread.sleep(1000);
|
||||
mockRate.set(0.0);
|
||||
trigger.run();
|
||||
Thread.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds - 2, TimeUnit.SECONDS));
|
||||
trigger.run();
|
||||
|
||||
// violations persisted shorter than waitFor - there should be no events
|
||||
assertTrue(events.toString(), events.isEmpty());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultsAndBackcompat() throws Exception {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("rate", rate);
|
||||
props.put("rate", 1.0);
|
||||
props.put("collection", "test");
|
||||
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
|
||||
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
|
||||
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
|
||||
trigger.configure(loader, cloudManager, props);
|
||||
Map<String, Object> config = trigger.getConfig();
|
||||
Set<String> collections = (Set<String>)config.get(SearchRateTrigger.COLLECTIONS_PROP);
|
||||
assertEquals(collections.toString(), 1, collections.size());
|
||||
assertEquals("test", collections.iterator().next());
|
||||
assertEquals("#ANY", config.get(AutoScalingParams.SHARD));
|
||||
assertEquals("#ANY", config.get(AutoScalingParams.NODE));
|
||||
assertEquals(1.0, config.get(SearchRateTrigger.ABOVE_RATE_PROP));
|
||||
assertEquals(-1.0, config.get(SearchRateTrigger.BELOW_RATE_PROP));
|
||||
assertEquals(SearchRateTrigger.DEFAULT_METRIC, config.get(SearchRateTrigger.METRIC_PROP));
|
||||
assertEquals(SearchRateTrigger.DEFAULT_MAX_OPS, config.get(SearchRateTrigger.MAX_OPS_PROP));
|
||||
assertNull(config.get(SearchRateTrigger.MIN_REPLICAS_PROP));
|
||||
assertEquals(CollectionParams.CollectionAction.ADDREPLICA, config.get(SearchRateTrigger.ABOVE_OP_PROP));
|
||||
assertEquals(CollectionParams.CollectionAction.MOVEREPLICA, config.get(SearchRateTrigger.ABOVE_NODE_OP_PROP));
|
||||
assertEquals(CollectionParams.CollectionAction.DELETEREPLICA, config.get(SearchRateTrigger.BELOW_OP_PROP));
|
||||
assertNull(config.get(SearchRateTrigger.BELOW_NODE_OP_PROP));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> createTriggerProps(List<String> collections, long waitForSeconds, double aboveRate, double belowRate) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("aboveRate", aboveRate);
|
||||
props.put("belowRate", belowRate);
|
||||
props.put("event", "searchRate");
|
||||
props.put("waitFor", waitForSeconds);
|
||||
props.put("enabled", true);
|
||||
if (collections != null && !collections.isEmpty()) {
|
||||
props.put("collections", String.join(",", collections));
|
||||
}
|
||||
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||
Map<String, String> map = new HashMap<>(2);
|
||||
map.put("name", "compute_plan");
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.solr.cloud.CloudTestUtils;
|
|||
import org.apache.solr.cloud.autoscaling.ActionContext;
|
||||
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
||||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||
import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
|
||||
|
@ -549,7 +550,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
"'name' : 'search_rate_trigger'," +
|
||||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'rate' : 1.0," +
|
||||
"'aboveRate' : 1.0," +
|
||||
"'enabled' : true," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
|
||||
|
@ -581,7 +582,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
|
|||
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
|
||||
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||
assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType());
|
||||
Map<String, Number> m = (Map<String, Number>)ev.event.getProperty("node");
|
||||
Map<String, Number> m = (Map<String, Number>)ev.event.getProperty(SearchRateTrigger.HOT_NODES);
|
||||
assertNotNull(m);
|
||||
assertEquals(nodes.size(), m.size());
|
||||
assertEquals(nodes, m.keySet());
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.solr.cloud.autoscaling.ComputePlanAction;
|
|||
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
|
||||
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
|
||||
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
|
||||
import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerEvent;
|
||||
import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
|
||||
|
@ -1164,7 +1165,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
"'event' : 'searchRate'," +
|
||||
"'waitFor' : '" + waitForSeconds + "s'," +
|
||||
"'enabled' : true," +
|
||||
"'rate' : 1.0," +
|
||||
"'aboveRate' : 1.0," +
|
||||
"'actions' : [" +
|
||||
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}" +
|
||||
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
|
||||
|
@ -1216,12 +1217,12 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
long now = cluster.getTimeSource().getTimeNs();
|
||||
// verify waitFor
|
||||
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
|
||||
Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
|
||||
Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
|
||||
assertNotNull("nodeRates", nodeRates);
|
||||
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
|
||||
AtomicDouble totalNodeRate = new AtomicDouble();
|
||||
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
|
||||
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
|
||||
List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
|
||||
assertNotNull("replicaRates", replicaRates);
|
||||
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
|
||||
AtomicDouble totalReplicaRate = new AtomicDouble();
|
||||
|
@ -1229,7 +1230,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
assertTrue(r.toString(), r.getVariable("rate") != null);
|
||||
totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
|
||||
});
|
||||
Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
|
||||
Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
|
||||
assertNotNull("shardRates", shardRates);
|
||||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
shardRates = (Map<String, Object>)shardRates.get(COLL1);
|
||||
|
@ -1237,7 +1238,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
|
|||
assertEquals(shardRates.toString(), 1, shardRates.size());
|
||||
AtomicDouble totalShardRate = new AtomicDouble();
|
||||
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
|
||||
Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
|
||||
Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
|
||||
assertNotNull("collectionRates", collectionRates);
|
||||
assertEquals(collectionRates.toString(), 1, collectionRates.size());
|
||||
Double collectionRate = collectionRates.get(COLL1);
|
||||
|
|
|
@ -184,43 +184,92 @@ operation `NONE` (which still can be monitored and acted upon by an appropriate
|
|||
|
||||
== Search Rate Trigger
|
||||
|
||||
The search rate trigger can be used for monitoring 1-minute average search rates in a selected
|
||||
collection, and request that either replicas be moved to different nodes or new replicas be added
|
||||
to reduce the per-replica search rate for a collection or shard with search rate hot spots.
|
||||
(Future versions of Solr will also be able to automatically remove some replicas
|
||||
when search rate falls below the configured lower threshold).
|
||||
The search rate trigger can be used for monitoring search rates in a selected
|
||||
collection (1-min average rate by default), and request that either replicas be moved from
|
||||
"hot nodes" to different nodes, or new replicas be added to "hot shards" to reduce the
|
||||
per-replica search rate for a collection or shard with hot spots.
|
||||
|
||||
Similarly, if the search rate falls below a threshold then the trigger may request that some
|
||||
replicas are deleted from "cold" shards. It can also optionally issue node-level action requests
|
||||
when a cumulative node-level rate falls below a threshold.
|
||||
|
||||
Note: this trigger calculates node-level cumulative rates using per-replica rates reported by
|
||||
replicas that are part of monitored collections / shards. This means that it may report
|
||||
some nodes as "cold" (underutilized) because it ignores other, perhaps more active, replicas
|
||||
belonging to other collections. Also, nodes that don't host any of the monitored replicas or
|
||||
those that are explicitly excluded by `node` config property won't be reported at all.
|
||||
|
||||
|
||||
This trigger support the following configuration:
|
||||
|
||||
`collection`:: (string, optional) collection name to monitor, or any collection if empty.
|
||||
`collections`:: (string, optional) comma-separated list of collection names to monitor, or any collection if empty / not set.
|
||||
|
||||
`shard`:: (string, optional) shard name within the collection (requires `collection` to be set), or any shard if empty.
|
||||
`shard`:: (string, optional) shard name within the collection (requires `collections` to be set to exactly one name), or any shard if empty.
|
||||
|
||||
`node`:: (string, optional) node name to monitor, or any if empty.
|
||||
|
||||
`handler`:: (string, optional) handler name whose request rate represents the search rate
|
||||
(default is `/select`). This name is used for creating the full metric key, in
|
||||
this case `solr.core.<coreName>:QUERY./select.requestTimes:1minRate`.
|
||||
`metric`:: (string, optional) metric name that represents the search rate
|
||||
(default is `QUERY./select.requestTimes:1minRate`). This name has to identify a single numeric
|
||||
metric value, and it may use the colon syntax for selecting one property of a complex metric.
|
||||
|
||||
`rate`:: (double, required) the upper bound for the request rate metric value.
|
||||
`maxOps`:: (integer, optional) maximum number of add replica / delete replica operations
|
||||
requested in a single autoscaling event. The default value is 3 and it helps to smooth out
|
||||
the changes to the number of replicas during periods of large search rate fluctuations.
|
||||
|
||||
If a rate is exceeded for a node (but not for individual replicas placed on this node) then
|
||||
the action requested by this event is to move one replica (with the highest rate) to another
|
||||
node. If a rate is exceeded for a collection or shard then the action requested is to add some
|
||||
replicas - currently at least 1 and at most 3, depending on how much the rate is exceeded, proportional to
|
||||
the threshold rate and the current request rate.
|
||||
`minReplicas`:: (integer, optional) minimum acceptable number of searchable replicas (ie. replicas other
|
||||
than `PULL` type). The trigger will not generate any DELETEREPLICA requests when the number of
|
||||
searchable replicas in a shard reaches this threshold. When this value is not set (the default)
|
||||
the `replicationFactor` property of the collection is used, and if that property is not set then
|
||||
the value is set to 1. Note also that shard leaders are never deleted.
|
||||
|
||||
.Example: a search rate trigger that monitors collection "test" and adds new replicas if 1-minute average request rate of "/select" handler exceeds 100 requests/sec:
|
||||
`aboveRate`:: (float) the upper bound for the request rate metric value. At least one of
|
||||
`aboveRate` or `belowRate` must be set.
|
||||
|
||||
`belowRate`:: (float) the lower bound for the request rate metric value. At least one of
|
||||
`aboveRate` or `belowRate` must be set.
|
||||
|
||||
`aboveOp`:: collection action to request when the upper threshold for a shard or replica is
|
||||
exceeded. Default action is `ADDREPLICA` and the trigger will request 1 to `maxOps` operations
|
||||
per shard per event, depending how much the rate is exceeded. This property can be set to 'NONE'
|
||||
to effectively disable the action (but still report it to the listeners).
|
||||
|
||||
`aboveNodeOp`:: collection action to request when the upper threshold for a node is exceeded.
|
||||
Default action is `MOVEREPLICA`, and the trigger will request 1 replica operation per hot node per event.
|
||||
If both `aboveOp` and `aboveNodeOp` operations are requested then `aboveNodeOp` operations are
|
||||
always requested first. This property can be set to 'NONE' to effectively disable the action (but still report it to the listeners).
|
||||
|
||||
`belowOp`:: collection action to request when the lower threshold for a shard or replica is
|
||||
exceeded. Default action is `DELETEREPLICA`, and the trigger will request at most `maxOps` replicas
|
||||
to be deleted from eligible cold shards. This property can be set to 'NONE'
|
||||
to effectively disable the action (but still report it to the listeners).
|
||||
|
||||
`belowNodeOp`:: action to request when the lower threshold for a node is exceeded.
|
||||
Default action is null (not set) and the condition is ignored, because in many cases the
|
||||
trigger will monitor only some selected resources (non-pull replica types from selected
|
||||
collections / shards) so setting this by default to eg. `DELETENODE` could interfere with
|
||||
these non-monitored resources. The trigger will request 1 operation per cold node per event.
|
||||
If both `belowOp` and `belowNodeOp` operations are requested then `belowOp` operations are
|
||||
always requested first.
|
||||
|
||||
.Example:
|
||||
A search rate trigger that monitors collection "test" and adds new replicas if 5-minute
|
||||
average request rate of "/select" handler exceeds 100 requests/sec, and the condition persists
|
||||
for over 20 minutes. If the rate falls below 0.01 and persists for 20 min the trigger will
|
||||
request not only replica deletions (leaving at most 1 replica per shard) but also it may
|
||||
request node deletion.
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"set-trigger": {
|
||||
"name" : "search_rate_trigger",
|
||||
"event" : "searchRate",
|
||||
"collection" : "test",
|
||||
"handler" : "/select",
|
||||
"rate" : 100.0,
|
||||
"waitFor" : "1m",
|
||||
"collections" : "test",
|
||||
"metric" : "QUERY./select.requestTimes:5minRate",
|
||||
"aboveRate" : 100.0,
|
||||
"belowRate" : 0.01,
|
||||
"belowNodeOp" : "DELETENODE",
|
||||
"minReplicas" : 1,
|
||||
"waitFor" : "20m",
|
||||
"enabled" : true,
|
||||
"actions" : [
|
||||
{
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
|
||||
/**
|
||||
* This suggester produces a DELETENODE request using provided {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#SRC_NODE}.
|
||||
*/
|
||||
class DeleteNodeSuggester extends Suggester {
|
||||
|
||||
@Override
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return CollectionParams.CollectionAction.DELETENODE;
|
||||
}
|
||||
|
||||
@Override
|
||||
SolrRequest init() {
|
||||
Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
|
||||
if (srcNodes.isEmpty()) {
|
||||
throw new RuntimeException("delete-node requires 'src_node' hint");
|
||||
}
|
||||
if (srcNodes.size() > 1) {
|
||||
throw new RuntimeException("delete-node requires exactly one 'src_node' hint");
|
||||
}
|
||||
return CollectionAdminRequest.deleteNode(srcNodes.iterator().next());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
||||
/**
|
||||
* This suggester produces a DELETEREPLICA request using provided {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#COLL_SHARD} and
|
||||
* {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#NUMBER} hints to specify the collection, shard and number of replicas to delete.
|
||||
*/
|
||||
class DeleteReplicaSuggester extends Suggester {
|
||||
|
||||
@Override
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
}
|
||||
|
||||
@Override
|
||||
SolrRequest init() {
|
||||
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
|
||||
if (shards.isEmpty()) {
|
||||
throw new RuntimeException("delete-replica requires 'collection' and 'shard'");
|
||||
}
|
||||
if (shards.size() > 1) {
|
||||
throw new RuntimeException("delete-replica requires exactly one pair of 'collection' and 'shard'");
|
||||
}
|
||||
Pair<String, String> collShard = shards.iterator().next();
|
||||
Set<Number> counts = (Set<Number>) hints.getOrDefault(Hint.NUMBER, Collections.emptySet());
|
||||
Integer count = null;
|
||||
if (!counts.isEmpty()) {
|
||||
if (counts.size() > 1) {
|
||||
throw new RuntimeException("delete-replica allows at most one number hint specifying the number of replicas to delete");
|
||||
}
|
||||
Number n = counts.iterator().next();
|
||||
count = n.intValue();
|
||||
}
|
||||
Set<String> replicas = (Set<String>) hints.getOrDefault(Hint.REPLICA, Collections.emptySet());
|
||||
String replica = null;
|
||||
if (!replicas.isEmpty()) {
|
||||
if (replicas.size() > 1) {
|
||||
throw new RuntimeException("delete-replica allows at most one 'replica' hint");
|
||||
}
|
||||
replica = replicas.iterator().next();
|
||||
}
|
||||
if (replica == null && count == null) {
|
||||
throw new RuntimeException("delete-replica requires either 'replica' or 'number' hint");
|
||||
}
|
||||
if (replica != null) {
|
||||
return CollectionAdminRequest.deleteReplica(collShard.first(), collShard.second(), replica);
|
||||
} else {
|
||||
return CollectionAdminRequest.deleteReplica(collShard.first(), collShard.second(), count);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -468,10 +468,12 @@ public class Policy implements MapWriter {
|
|||
|
||||
static {
|
||||
ops.put(CollectionAction.ADDREPLICA, AddReplicaSuggester::new);
|
||||
ops.put(CollectionAction.DELETEREPLICA, () -> new UnsupportedSuggester(CollectionAction.DELETEREPLICA));
|
||||
ops.put(CollectionAction.DELETEREPLICA, DeleteReplicaSuggester::new);
|
||||
ops.put(CollectionAction.DELETENODE, DeleteNodeSuggester::new);
|
||||
ops.put(CollectionAction.MOVEREPLICA, MoveReplicaSuggester::new);
|
||||
ops.put(CollectionAction.SPLITSHARD, SplitShardSuggester::new);
|
||||
ops.put(CollectionAction.MERGESHARDS, () -> new UnsupportedSuggester(CollectionAction.MERGESHARDS));
|
||||
ops.put(CollectionAction.NONE, () -> new UnsupportedSuggester(CollectionAction.NONE));
|
||||
}
|
||||
|
||||
public Map<String, List<Clause>> getPolicies() {
|
||||
|
|
|
@ -138,6 +138,24 @@ public class ReplicaInfo implements MapWriter {
|
|||
return variables.get(name);
|
||||
}
|
||||
|
||||
public Object getVariable(String name, Object defValue) {
|
||||
Object o = variables.get(name);
|
||||
if (o != null) {
|
||||
return o;
|
||||
} else {
|
||||
return defValue;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getBool(String name, boolean defValue) {
|
||||
Object o = getVariable(name, defValue);
|
||||
if (o instanceof Boolean) {
|
||||
return (Boolean)o;
|
||||
} else {
|
||||
return Boolean.parseBoolean(String.valueOf(o));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Set;
|
|||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
|
||||
/**
|
||||
|
@ -28,6 +29,11 @@ import org.apache.solr.common.util.Pair;
|
|||
*/
|
||||
class SplitShardSuggester extends Suggester {
|
||||
|
||||
@Override
|
||||
public CollectionParams.CollectionAction getAction() {
|
||||
return CollectionParams.CollectionAction.SPLITSHARD;
|
||||
}
|
||||
|
||||
@Override
|
||||
SolrRequest init() {
|
||||
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
|
||||
|
|
|
@ -255,7 +255,7 @@ public abstract class Suggester implements MapWriter {
|
|||
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
|
||||
for (Object o : c) {
|
||||
if (!(o instanceof Pair)) {
|
||||
throw new RuntimeException("SHARD hint must use a Pair");
|
||||
throw new RuntimeException("COLL_SHARD hint must use a Pair");
|
||||
}
|
||||
Pair p = (Pair) o;
|
||||
if (p.first() == null || p.second() == null) {
|
||||
|
@ -288,7 +288,11 @@ public abstract class Suggester implements MapWriter {
|
|||
Double actualFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.second(), false);
|
||||
if (actualFreediskInGb == null) return false;
|
||||
return actualFreediskInGb > hintFreediskInGb;
|
||||
});
|
||||
}),
|
||||
NUMBER(true, o -> {
|
||||
if (!(o instanceof Number)) throw new RuntimeException("NUMBER hint must be a number");
|
||||
}),
|
||||
REPLICA(true);
|
||||
|
||||
public final boolean multiValued;
|
||||
public final Consumer<Object> validator;
|
||||
|
|
|
@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class SolrClientCloudManager implements SolrCloudManager {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final CloudSolrClient solrClient;
|
||||
protected final CloudSolrClient solrClient;
|
||||
private final ZkDistribStateManager stateManager;
|
||||
private final DistributedQueueFactory queueFactory;
|
||||
private final ZkStateReader zkStateReader;
|
||||
|
|
|
@ -553,6 +553,13 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a SolrRequest to delete a node.
|
||||
*/
|
||||
public static DeleteNode deleteNode(String node) {
|
||||
return new DeleteNode(node);
|
||||
}
|
||||
|
||||
public static class DeleteNode extends AsyncCollectionAdminRequest {
|
||||
String node;
|
||||
|
||||
|
@ -1667,6 +1674,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
checkNotNull(CoreAdminParams.REPLICA, replica));
|
||||
}
|
||||
|
||||
public static DeleteReplica deleteReplica(String collection, String shard, int count) {
|
||||
return new DeleteReplica(collection, checkNotNull(CoreAdminParams.SHARD, shard), count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a SolrRequest to remove a number of replicas from a specific shard
|
||||
*/
|
||||
|
|
|
@ -135,6 +135,7 @@ public class Replica extends ZkNodeProps {
|
|||
return name.equals(replica.name);
|
||||
}
|
||||
|
||||
/** Also known as coreNodeName. */
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
@ -146,6 +147,7 @@ public class Replica extends ZkNodeProps {
|
|||
return getStr(ZkStateReader.BASE_URL_PROP);
|
||||
}
|
||||
|
||||
/** SolrCore name. */
|
||||
public String getCoreName() {
|
||||
return getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue