mirror of https://github.com/apache/lucene.git
SOLR-11072: Implement trigger for searchRate event type.
This commit is contained in:
parent
e001f35289
commit
d3e949c07b
|
@ -49,6 +49,8 @@ New Features
|
|||
----------------------
|
||||
* SOLR-11448: Implement an option in collection commands to wait for final results. (ab)
|
||||
|
||||
* SOLR-11072: Implement trigger for searchRate event type. (ab)
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -20,12 +20,17 @@ package org.apache.solr.cloud.autoscaling;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
|
@ -89,8 +94,37 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
.hint(Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
|
||||
log.debug("NODELOST Created suggester with srcNode: {}", event.getProperty(TriggerEvent.NODE_NAMES));
|
||||
break;
|
||||
case SEARCHRATE:
|
||||
Map<String, Map<String, Double>> hotShards = (Map<String, Map<String, Double>>)event.getProperty(AutoScalingParams.SHARD);
|
||||
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
List<ReplicaInfo> hotReplicas = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
|
||||
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
|
||||
|
||||
if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
|
||||
// node -> MOVEREPLICA
|
||||
if (hotNodes.isEmpty()) {
|
||||
log.warn("Neither hot replicas / collection nor nodes are reported in event: " + event);
|
||||
return NoneSuggester.INSTANCE;
|
||||
}
|
||||
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA);
|
||||
for (String node : hotNodes.keySet()) {
|
||||
suggester = suggester.hint(Suggester.Hint.SRC_NODE, node);
|
||||
}
|
||||
} else {
|
||||
// collection || shard || replica -> ADDREPLICA
|
||||
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
|
||||
Set<String> collections = new HashSet<>();
|
||||
// XXX improve this when AddReplicaSuggester supports coll_shard hint
|
||||
hotReplicas.forEach(r -> collections.add(r.getCollection()));
|
||||
hotShards.forEach((coll, shards) -> collections.add(coll));
|
||||
hotCollections.forEach((coll, rate) -> collections.add(coll));
|
||||
for (String coll : collections) {
|
||||
suggester = suggester.hint(Suggester.Hint.COLL, coll);
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("No support for events other than nodeAdded and nodeLost, received: " + event.getEventType());
|
||||
throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost and searchRate, received: " + event.getEventType());
|
||||
}
|
||||
return suggester;
|
||||
}
|
||||
|
|
|
@ -53,23 +53,17 @@ public class NodeAddedTrigger extends TriggerBase {
|
|||
|
||||
public NodeAddedTrigger(String name, Map<String, Object> properties,
|
||||
SolrResourceLoader loader,
|
||||
SolrCloudManager dataProvider) {
|
||||
super(TriggerEventType.NODEADDED, name, properties, loader, dataProvider);
|
||||
SolrCloudManager cloudManager) {
|
||||
super(TriggerEventType.NODEADDED, name, properties, loader, cloudManager);
|
||||
this.timeSource = TimeSource.CURRENT_TIME;
|
||||
lastLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes());
|
||||
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
log.debug("Initial livenodes: {}", lastLiveNodes);
|
||||
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
|
||||
if (o != null && !o.isEmpty()) {
|
||||
for (int i = 0; i < o.size(); i++) {
|
||||
Map<String, String> map = o.get(i);
|
||||
actions.get(i).init(map);
|
||||
}
|
||||
}
|
||||
super.init();
|
||||
// pick up added nodes for which marker paths were created
|
||||
try {
|
||||
List<String> added = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
|
@ -136,7 +130,7 @@ public class NodeAddedTrigger extends TriggerBase {
|
|||
}
|
||||
log.debug("Running NodeAddedTrigger {}", name);
|
||||
|
||||
Set<String> newLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes());
|
||||
Set<String> newLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
log.debug("Found livenodes: {}", newLiveNodes);
|
||||
|
||||
// have any nodes that we were tracking been removed from the cluster?
|
||||
|
|
|
@ -126,7 +126,7 @@ public class NodeLostTrigger extends TriggerBase {
|
|||
}
|
||||
}
|
||||
|
||||
Set<String> newLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes());
|
||||
Set<String> newLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
|
||||
log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes);
|
||||
|
||||
// have any nodes that we were tracking been added to the cluster?
|
||||
|
|
|
@ -0,0 +1,265 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.metrics.SolrCoreMetricManager;
|
||||
import org.apache.solr.util.TimeSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Trigger for the {@link org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType#SEARCHRATE} event.
|
||||
*/
|
||||
public class SearchRateTrigger extends TriggerBase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final TimeSource timeSource;
|
||||
private final String handler;
|
||||
private final String collection;
|
||||
private final String shard;
|
||||
private final String node;
|
||||
private final double rate;
|
||||
private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
|
||||
private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
|
||||
private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
|
||||
private final Map<String, Long> lastReplicaEvent = new ConcurrentHashMap<>();
|
||||
private final Map<String, Object> state = new HashMap<>();
|
||||
|
||||
public SearchRateTrigger(String name, Map<String, Object> properties,
|
||||
SolrResourceLoader loader,
|
||||
SolrCloudManager cloudManager) {
|
||||
super(TriggerEventType.SEARCHRATE, name, properties, loader, cloudManager);
|
||||
this.timeSource = TimeSource.CURRENT_TIME;
|
||||
this.state.put("lastCollectionEvent", lastCollectionEvent);
|
||||
this.state.put("lastNodeEvent", lastNodeEvent);
|
||||
this.state.put("lastShardEvent", lastShardEvent);
|
||||
this.state.put("lastReplicaEvent", lastReplicaEvent);
|
||||
|
||||
// parse config options
|
||||
collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
|
||||
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
|
||||
if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
|
||||
throw new IllegalArgumentException("When 'shard' is other than #ANY collection name must be also other than #ANY");
|
||||
}
|
||||
node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
|
||||
handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
|
||||
|
||||
if (properties.get("rate") == null) {
|
||||
throw new IllegalArgumentException("No 'rate' specified in configuration");
|
||||
}
|
||||
String rateString = String.valueOf(properties.get("rate"));
|
||||
try {
|
||||
rate = Double.parseDouble(rateString);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Invalid 'rate' configuration value: '" + rateString + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, Object> getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setState(Map<String, Object> state) {
|
||||
lastCollectionEvent.clear();
|
||||
lastNodeEvent.clear();
|
||||
lastShardEvent.clear();
|
||||
lastReplicaEvent.clear();
|
||||
Map<String, Long> collTimes = (Map<String, Long>)state.get("lastCollectionEvent");
|
||||
if (collTimes != null) {
|
||||
lastCollectionEvent.putAll(collTimes);
|
||||
}
|
||||
Map<String, Long> nodeTimes = (Map<String, Long>)state.get("lastNodeEvent");
|
||||
if (nodeTimes != null) {
|
||||
lastNodeEvent.putAll(nodeTimes);
|
||||
}
|
||||
Map<String, Long> shardTimes = (Map<String, Long>)state.get("lastShardEvent");
|
||||
if (shardTimes != null) {
|
||||
lastShardEvent.putAll(shardTimes);
|
||||
}
|
||||
Map<String, Long> replicaTimes = (Map<String, Long>)state.get("lastReplicaEvent");
|
||||
if (replicaTimes != null) {
|
||||
lastReplicaEvent.putAll(replicaTimes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(AutoScaling.Trigger old) {
|
||||
assert old.isClosed();
|
||||
if (old instanceof SearchRateTrigger) {
|
||||
SearchRateTrigger that = (SearchRateTrigger)old;
|
||||
assert this.name.equals(that.name);
|
||||
this.lastCollectionEvent.clear();
|
||||
this.lastNodeEvent.clear();
|
||||
this.lastShardEvent.clear();
|
||||
this.lastReplicaEvent.clear();
|
||||
this.lastCollectionEvent.putAll(that.lastCollectionEvent);
|
||||
this.lastNodeEvent.putAll(that.lastNodeEvent);
|
||||
this.lastShardEvent.putAll(that.lastShardEvent);
|
||||
this.lastReplicaEvent.putAll(that.lastReplicaEvent);
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
||||
"Unable to restore state from an unknown type of trigger");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
AutoScaling.TriggerEventProcessor processor = processorRef.get();
|
||||
if (processor == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
|
||||
Map<String, AtomicDouble> nodeRates = new HashMap<>();
|
||||
|
||||
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
|
||||
Map<String, ReplicaInfo> metricTags = new HashMap<>();
|
||||
// coll, shard, replica
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
|
||||
infos.forEach((coll, shards) -> {
|
||||
shards.forEach((sh, replicas) -> {
|
||||
replicas.forEach(replica -> {
|
||||
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
|
||||
String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore());
|
||||
if (replicaName == null) { // should never happen???
|
||||
replicaName = replica.getName(); // which is actually coreNode name...
|
||||
}
|
||||
String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
|
||||
String tag = "metrics:" + registry
|
||||
+ ":QUERY." + handler + ".requestTimes:1minRate";
|
||||
metricTags.put(tag, replica);
|
||||
});
|
||||
});
|
||||
});
|
||||
Map<String, Object> rates = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet());
|
||||
rates.forEach((tag, rate) -> {
|
||||
ReplicaInfo info = metricTags.get(tag);
|
||||
if (info == null) {
|
||||
log.warn("Missing replica info for response tag " + tag);
|
||||
} else {
|
||||
Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>());
|
||||
List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>());
|
||||
info.getVariables().put(AutoScalingParams.RATE, rate);
|
||||
perShard.add(info);
|
||||
AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble());
|
||||
perNode.addAndGet((Double)rate);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
long now = timeSource.getTime();
|
||||
// check for exceeded rates and filter out those with less than waitFor from previous events
|
||||
Map<String, Double> hotNodes = nodeRates.entrySet().stream()
|
||||
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
|
||||
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
|
||||
.filter(entry -> entry.getValue().get() > rate)
|
||||
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
|
||||
|
||||
Map<String, Map<String, Double>> hotShards = new HashMap<>();
|
||||
List<ReplicaInfo> hotReplicas = new ArrayList<>();
|
||||
collectionRates.forEach((coll, shardRates) -> {
|
||||
shardRates.forEach((sh, replicaRates) -> {
|
||||
double shardRate = replicaRates.stream()
|
||||
.map(r -> {
|
||||
if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) &&
|
||||
((Double)r.getVariable(AutoScalingParams.RATE) > rate)) {
|
||||
hotReplicas.add(r);
|
||||
}
|
||||
return r;
|
||||
})
|
||||
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
|
||||
if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
|
||||
(shardRate > rate) &&
|
||||
(collection.equals(Policy.ANY) || collection.equals(coll)) &&
|
||||
(shard.equals(Policy.ANY) || shard.equals(sh))) {
|
||||
hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Map<String, Double> hotCollections = new HashMap<>();
|
||||
collectionRates.forEach((coll, shardRates) -> {
|
||||
double total = shardRates.entrySet().stream()
|
||||
.mapToDouble(e -> e.getValue().stream()
|
||||
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
|
||||
if (waitForElapsed(coll, now, lastCollectionEvent) &&
|
||||
(total > rate) &&
|
||||
(collection.equals(Policy.ANY) || collection.equals(coll))) {
|
||||
hotCollections.put(coll, total);
|
||||
}
|
||||
});
|
||||
|
||||
if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// generate event
|
||||
|
||||
if (processor.process(new SearchRateEvent(getName(), now, hotNodes, hotCollections, hotShards, hotReplicas))) {
|
||||
// update lastEvent times
|
||||
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
|
||||
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
|
||||
hotShards.entrySet().forEach(e -> e.getValue()
|
||||
.forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
|
||||
hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
|
||||
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
|
||||
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
|
||||
log.info("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed);
|
||||
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class SearchRateEvent extends TriggerEvent {
|
||||
public SearchRateEvent(String source, long eventTime, Map<String, Double> hotNodes,
|
||||
Map<String, Double> hotCollections,
|
||||
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
|
||||
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
|
||||
properties.put(AutoScalingParams.COLLECTION, hotCollections);
|
||||
properties.put(AutoScalingParams.SHARD, hotShards);
|
||||
properties.put(AutoScalingParams.REPLICA, hotReplicas);
|
||||
properties.put(AutoScalingParams.NODE, hotNodes);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -50,7 +50,7 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
protected final String name;
|
||||
protected final SolrCloudManager dataProvider;
|
||||
protected final SolrCloudManager cloudManager;
|
||||
protected final DistribStateManager stateManager;
|
||||
protected final Map<String, Object> properties = new HashMap<>();
|
||||
protected final TriggerEventType eventType;
|
||||
|
@ -62,11 +62,11 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
|
|||
protected boolean isClosed;
|
||||
|
||||
|
||||
protected TriggerBase(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager dataProvider) {
|
||||
protected TriggerBase(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager cloudManager) {
|
||||
this.eventType = eventType;
|
||||
this.name = name;
|
||||
this.dataProvider = dataProvider;
|
||||
this.stateManager = dataProvider.getDistribStateManager();
|
||||
this.cloudManager = cloudManager;
|
||||
this.stateManager = cloudManager.getDistribStateManager();
|
||||
if (properties != null) {
|
||||
this.properties.putAll(properties);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Collections;
|
|||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -139,14 +140,18 @@ public class MetricsHandler extends RequestHandlerBase implements PermissionName
|
|||
continue;
|
||||
}
|
||||
MetricUtils.PropertyFilter propertyFilter = MetricUtils.PropertyFilter.ALL;
|
||||
boolean simple = false;
|
||||
if (propertyName != null) {
|
||||
propertyFilter = (name) -> name.equals(propertyName);
|
||||
simple = true;
|
||||
// use escaped versions
|
||||
key = parts[0] + ":" + parts[1];
|
||||
}
|
||||
MetricUtils.convertMetric(key, m, propertyFilter, false, true, true, simple, ":", (k, v) -> result.add(k, v));
|
||||
MetricUtils.convertMetric(key, m, propertyFilter, false, true, true, false, ":", (k, v) -> {
|
||||
if ((v instanceof Map) && propertyName != null) {
|
||||
((Map)v).forEach((k1, v1) -> result.add(k + ":" + k1, v1));
|
||||
} else {
|
||||
result.add(k, v);
|
||||
}
|
||||
});
|
||||
}
|
||||
rsp.getValues().add("metrics", result);
|
||||
if (errors.size() > 0) {
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.cloud.ZkDistributedQueueFactory;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class SearchRateTriggerTest extends SolrCloudTestCase {
|
||||
|
||||
private static final String PREFIX = SearchRateTriggerTest.class.getSimpleName() + "-";
|
||||
private static final String COLL1 = PREFIX + "collection1";
|
||||
private static final String COLL2 = PREFIX + "collection2";
|
||||
|
||||
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
|
||||
fail("Did not expect the listener to fire on first run!");
|
||||
return true;
|
||||
};
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(4)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
|
||||
"conf", 2, 2);
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
create.setMaxShardsPerNode(1);
|
||||
create.process(solrClient);
|
||||
create = CollectionAdminRequest.createCollection(COLL2,
|
||||
"conf", 2, 2);
|
||||
create.setMaxShardsPerNode(1);
|
||||
create.process(solrClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrigger() throws Exception {
|
||||
double rate = 1.0;
|
||||
SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
|
||||
SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
|
||||
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
|
||||
URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
|
||||
long waitForSeconds = 5 + random().nextInt(5);
|
||||
Map<String, Object> props = createTriggerProps(waitForSeconds, rate);
|
||||
final List<TriggerEvent> events = new ArrayList<>();
|
||||
CloudSolrClient solrClient = cluster.getSolrClient();
|
||||
|
||||
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger", props, loader, cloudManager)) {
|
||||
trigger.setProcessor(noFirstRunProcessor);
|
||||
trigger.run();
|
||||
trigger.setProcessor(event -> events.add(event));
|
||||
|
||||
// generate replica traffic
|
||||
String coreName = container.getLoadedCoreNames().iterator().next();
|
||||
String url = baseUrl.toString() + "/" + coreName;
|
||||
try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
|
||||
SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
|
||||
for (int i = 0; i < 200; i++) {
|
||||
simpleClient.query(query);
|
||||
}
|
||||
trigger.run();
|
||||
// waitFor delay
|
||||
assertEquals(0, events.size());
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
// should generate replica event
|
||||
trigger.run();
|
||||
assertEquals(1, events.size());
|
||||
TriggerEvent event = events.get(0);
|
||||
assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
|
||||
List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
|
||||
assertEquals(1, infos.size());
|
||||
ReplicaInfo info = infos.get(0);
|
||||
assertEquals(coreName, info.getCore());
|
||||
assertTrue((Double)info.getVariable(AutoScalingParams.RATE) > rate);
|
||||
}
|
||||
// close that jetty to remove the violation - alternatively wait for 1 min...
|
||||
cluster.stopJettySolrRunner(1);
|
||||
events.clear();
|
||||
SolrParams query = params(CommonParams.Q, "*:*");
|
||||
for (int i = 0; i < 500; i++) {
|
||||
solrClient.query(COLL1, query);
|
||||
}
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
trigger.run();
|
||||
// should generate collection event
|
||||
assertEquals(1, events.size());
|
||||
TriggerEvent event = events.get(0);
|
||||
Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
assertEquals(1, hotCollections.size());
|
||||
Double Rate = hotCollections.get(COLL1);
|
||||
assertNotNull(Rate);
|
||||
assertTrue(Rate > rate);
|
||||
events.clear();
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
solrClient.query(COLL2, query);
|
||||
}
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
trigger.run();
|
||||
// should generate node and collection event but not for COLL2 because of waitFor
|
||||
assertEquals(1, events.size());
|
||||
event = events.get(0);
|
||||
Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
|
||||
assertEquals(3, hotNodes.size());
|
||||
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
|
||||
hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
assertEquals(2, hotCollections.size());
|
||||
Rate = hotCollections.get(COLL1);
|
||||
assertNotNull(Rate);
|
||||
Rate = hotCollections.get(COLL2);
|
||||
assertNotNull(Rate);
|
||||
|
||||
events.clear();
|
||||
// assert that waitFor prevents new events from being generated
|
||||
trigger.run();
|
||||
// should not generate any events
|
||||
assertEquals(0, events.size());
|
||||
|
||||
Thread.sleep(waitForSeconds * 1000 + 2000);
|
||||
trigger.run();
|
||||
// should generate node and collection event
|
||||
assertEquals(1, events.size());
|
||||
hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
|
||||
assertEquals(2, hotCollections.size());
|
||||
Rate = hotCollections.get(COLL1);
|
||||
assertNotNull(Rate);
|
||||
Rate = hotCollections.get(COLL2);
|
||||
assertNotNull(Rate);
|
||||
hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
|
||||
assertEquals(3, hotNodes.size());
|
||||
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> createTriggerProps(long waitForSeconds, double rate) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("rate", rate);
|
||||
props.put("event", "searchRate");
|
||||
props.put("waitFor", waitForSeconds);
|
||||
props.put("enabled", true);
|
||||
List<Map<String, String>> actions = new ArrayList<>(3);
|
||||
Map<String, String> map = new HashMap<>(2);
|
||||
map.put("name", "compute_plan");
|
||||
map.put("class", "solr.ComputePlanAction");
|
||||
actions.add(map);
|
||||
map = new HashMap<>(2);
|
||||
map.put("name", "execute_plan");
|
||||
map.put("class", "solr.ExecutePlanAction");
|
||||
actions.add(map);
|
||||
props.put("actions", actions);
|
||||
return props;
|
||||
}
|
||||
}
|
|
@ -277,6 +277,15 @@ public class MetricsHandlerTest extends SolrTestCaseJ4 {
|
|||
val = values.findRecursive("metrics", key3);
|
||||
assertNotNull(val);
|
||||
|
||||
String key4 = "solr.core.collection1:QUERY./select.requestTimes:1minRate";
|
||||
resp = new SolrQueryResponse();
|
||||
handler.handleRequestBody(req(CommonParams.QT, "/admin/metrics", CommonParams.WT, "json",
|
||||
MetricsHandler.KEY_PARAM, key4), resp);
|
||||
values = resp.getValues();
|
||||
val = values.findRecursive("metrics", key4);
|
||||
assertNotNull(val);
|
||||
assertTrue(val instanceof Number);
|
||||
|
||||
// test errors
|
||||
|
||||
// invalid keys
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
|
|||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
|
||||
public class NoneSuggester extends Suggester {
|
||||
public static final NoneSuggester INSTANCE = new NoneSuggester();
|
||||
|
||||
@Override
|
||||
SolrRequest init() {
|
||||
return null;
|
||||
|
|
|
@ -37,6 +37,12 @@ public interface AutoScalingParams {
|
|||
String BEFORE_ACTION = "beforeAction";
|
||||
String AFTER_ACTION = "afterAction";
|
||||
String TIMEOUT = "timeout";
|
||||
String COLLECTION = "collection";
|
||||
String SHARD = "shard";
|
||||
String REPLICA = "replica";
|
||||
String NODE = "node";
|
||||
String HANDLER = "handler";
|
||||
String RATE = "rate";
|
||||
String REMOVE_LISTENERS = "removeListeners";
|
||||
String ZK_VERSION = "zkVersion";
|
||||
|
||||
|
|
Loading…
Reference in New Issue